diff --git a/src/address_factory.cpp b/src/address_factory.cpp index 9ec274097..5166b6af1 100644 --- a/src/address_factory.cpp +++ b/src/address_factory.cpp @@ -20,8 +20,8 @@ using namespace datastax::internal::core; -bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host, - Address* output) { +bool AddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host, + Address* output) { Address connected_address = connected_host->address(); const Value* peer_value = peers_row->get_by_name("peer"); const Value* rpc_value = peers_row->get_by_name("rpc_address"); @@ -59,6 +59,12 @@ bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connec return true; } +bool AddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host, + const Address& expected) { + Address address; + return create(peers_row, connected_host, &address) && address == expected; +} + bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) { CassUuid host_id; @@ -78,3 +84,14 @@ bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_ connected_host->address().port(), to_string(host_id)); return true; } + +bool SniAddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host, + const Address& expected) { + const Value* value = peers_row->get_by_name("rpc_address"); + Address rpc_address; + if (!value || + !value->decoder().as_inet(value->size(), connected_host->address().port(), &rpc_address)) { + return false; + } + return rpc_address == expected; +} diff --git a/src/address_factory.hpp b/src/address_factory.hpp index 1310d5d14..140a92b5c 100644 --- a/src/address_factory.hpp +++ b/src/address_factory.hpp @@ -26,20 +26,15 @@ namespace datastax { namespace internal { namespace core { class Row; /** - * An interface for constructing `Address` from `system.local`/`system.peers` row data. + * An address factory that creates `Address` using the `rpc_address` column. */ class AddressFactory : public RefCounted { public: typedef SharedRefPtr Ptr; virtual ~AddressFactory() {} - virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) = 0; -}; - -/** - * An address factory that creates `Address` using the `rpc_address` column. - */ -class DefaultAddressFactory : public AddressFactory { virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output); + virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host, + const Address& expected); }; /** @@ -48,13 +43,15 @@ class DefaultAddressFactory : public AddressFactory { */ class SniAddressFactory : public AddressFactory { virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output); + virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host, + const Address& expected); }; inline AddressFactory* create_address_factory_from_config(const Config& config) { if (config.cloud_secure_connection_config().is_loaded()) { return new SniAddressFactory(); } else { - return new DefaultAddressFactory(); + return new AddressFactory(); } } diff --git a/src/cluster.cpp b/src/cluster.cpp index ac700715e..fdac570d2 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -169,7 +169,17 @@ LockedHostMap::LockedHostMap(const HostMap& hosts) LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); } LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const { - return hosts_.find(address); + HostMap::const_iterator it = hosts_.find(address); + if (it == hosts_.end()) { + // If this is from an event (not SNI) and we're using SNI addresses then fallback to using the + // "rpc_address" to compare. + for (HostMap::const_iterator i = hosts_.begin(), end = hosts_.end(); i != end; ++i) { + if (i->second->rpc_address() == address) { + return i; + } + } + } + return it; } Host::Ptr LockedHostMap::get(const Address& address) const { @@ -633,7 +643,7 @@ void Cluster::notify_host_remove(const Address& address) { notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host)); } - hosts_.erase(address); + hosts_.erase(it->first); for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(), end = load_balancing_policies_.end(); it != end; ++it) { diff --git a/src/control_connection.cpp b/src/control_connection.cpp index 2129f9f7e..524ef0d91 100644 --- a/src/control_connection.cpp +++ b/src/control_connection.cpp @@ -307,7 +307,7 @@ static NopControlConnectionListener nop_listener__; ControlConnectionSettings::ControlConnectionSettings() : use_schema(CASS_DEFAULT_USE_SCHEMA) , use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING) - , address_factory(new DefaultAddressFactory()) {} + , address_factory(new AddressFactory()) {} ControlConnectionSettings::ControlConnectionSettings(const Config& config) : connection_settings(config) @@ -395,17 +395,16 @@ void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) { const Row* row = NULL; ResultIterator rows(callback->result().get()); - while (rows.next() && !found_host) { - row = rows.row(); - if (callback->is_all_peers) { - Address address; - bool is_valid_address = settings_.address_factory->create(row, connection_->host(), &address); - if (is_valid_address && callback->address == address) { + if (callback->is_all_peers) { + while (!found_host && rows.next()) { + row = rows.row(); + if (settings_.address_factory->is_peer(row, connection_->host(), callback->address)) { found_host = true; } - } else { - found_host = true; } + } else if (rows.next()) { + row = rows.row(); + found_host = true; } if (!found_host) { diff --git a/src/host.cpp b/src/host.cpp index 1229c1ead..355231db0 100644 --- a/src/host.cpp +++ b/src/host.cpp @@ -123,9 +123,6 @@ void Host::set(const Row* row, bool use_tokens) { if (dse_server_version_ < VersionNumber(6, 7, 0)) { server_version_ = VersionNumber(3, 11, 0); } - } else { - LOG_WARN("Invalid DSE version string \"%s\" on host %s", dse_version_str.c_str(), - address().to_string().c_str()); } } @@ -153,6 +150,18 @@ void Host::set(const Row* row, bool use_tokens) { "If this is incorrect you should configure a specific interface for rpc_address on " "the server.", address_string_.c_str()); + v = row->get_by_name("listen_address"); // Available in system.local + if (v && !v->is_null()) { + v->decoder().as_inet(v->size(), address_.port(), &rpc_address_); + } else { + v = row->get_by_name("peer"); // Available in system.peers + if (v && !v->is_null()) { + v->decoder().as_inet(v->size(), address_.port(), &rpc_address_); + } + } + if (!rpc_address_.is_valid()) { + LOG_WARN("Unable to set rpc_address from either listen_address or peer"); + } } } else { LOG_WARN("No rpc_address for host %s in system.local or system.peers.", @@ -160,30 +169,32 @@ void Host::set(const Row* row, bool use_tokens) { } } +static CassInet to_inet(const Host::Ptr& host) { + CassInet address; + if (host->address().is_resolved()) { + address.address_length = host->address().to_inet(address.address); + } else { + address.address_length = host->rpc_address().to_inet(&address.address); + } + return address; +} + ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback, void* data) : callback_(callback) , data_(data) {} void ExternalHostListener::on_host_up(const Host::Ptr& host) { - CassInet address; - address.address_length = host->address().to_inet(address.address); - callback_(CASS_HOST_LISTENER_EVENT_UP, address, data_); + callback_(CASS_HOST_LISTENER_EVENT_UP, to_inet(host), data_); } void ExternalHostListener::on_host_down(const Host::Ptr& host) { - CassInet address; - address.address_length = host->address().to_inet(address.address); - callback_(CASS_HOST_LISTENER_EVENT_DOWN, address, data_); + callback_(CASS_HOST_LISTENER_EVENT_DOWN, to_inet(host), data_); } void ExternalHostListener::on_host_added(const Host::Ptr& host) { - CassInet address; - address.address_length = host->address().to_inet(address.address); - callback_(CASS_HOST_LISTENER_EVENT_ADD, address, data_); + callback_(CASS_HOST_LISTENER_EVENT_ADD, to_inet(host), data_); } void ExternalHostListener::on_host_removed(const Host::Ptr& host) { - CassInet address; - address.address_length = host->address().to_inet(address.address); - callback_(CASS_HOST_LISTENER_EVENT_REMOVE, address, data_); + callback_(CASS_HOST_LISTENER_EVENT_REMOVE, to_inet(host), data_); } diff --git a/src/request_processor.cpp b/src/request_processor.cpp index 8ea7ff30c..44843bebe 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -146,7 +146,7 @@ RequestProcessorSettings::RequestProcessorSettings() , max_tracing_wait_time_ms(CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS) , retry_tracing_wait_time_ms(CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS) , tracing_consistency(CASS_DEFAULT_TRACING_CONSISTENCY) - , address_factory(new DefaultAddressFactory()) { + , address_factory(new AddressFactory()) { profiles.set_empty_key(""); } diff --git a/src/wkt.cpp b/src/wkt.cpp index 36af7fac2..623e4afbe 100644 --- a/src/wkt.cpp +++ b/src/wkt.cpp @@ -1,3 +1,4 @@ +// clang-format off #line 1 "wkt.rl" /*