Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions src/address.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "address.hpp"

#include "macros.hpp"

#include <assert.h>
#include <sstream>

Expand All @@ -24,13 +26,18 @@ namespace cass {
const Address Address::EMPTY_KEY("0.0.0.0", 0);
const Address Address::DELETED_KEY("0.0.0.0", 1);

const Address Address::BIND_ANY_IPV4("0.0.0.0", 0);
const Address Address::BIND_ANY_IPV6("::", 0);

Address::Address() {
init();
}

Address::Address(const std::string& ip, int port) {
init();
from_string(ip, port, this);
bool result = from_string(ip, port, this);
UNUSED_(result);
assert(result);
}

bool Address::from_string(const std::string& ip, int port, Address* output) {
Expand All @@ -47,7 +54,7 @@ bool Address::from_string(const std::string& ip, int port, Address* output) {
#else
uv_ip4_addr(ip.c_str(), port, &addr);
#endif
output->init(copy_cast<struct sockaddr_in*, struct sockaddr*>(&addr));
output->init(&addr);
}
return true;
#if UV_VERSION_MAJOR == 0
Expand All @@ -62,64 +69,86 @@ bool Address::from_string(const std::string& ip, int port, Address* output) {
#else
uv_ip6_addr(ip.c_str(), port, &addr);
#endif
output->init(copy_cast<struct sockaddr_in6*, struct sockaddr*>(&addr));
output->init(&addr);
}
return true;
} else {
return false;
}
}

void Address::from_inet(const char* data, size_t size, int port, Address* output) {
bool Address::from_inet(const char* data, size_t size, int port, Address* output) {

assert(size == 4 || size == 16);
if (size == 4) {
char buf[INET_ADDRSTRLEN];
uv_inet_ntop(AF_INET, data, buf, sizeof(buf));
#if UV_VERSION_MAJOR == 0
if (uv_inet_ntop(AF_INET, data, buf, sizeof(buf)).code != UV_OK) {
#else
if (uv_inet_ntop(AF_INET, data, buf, sizeof(buf)) != 0) {
#endif
return false;
}
if (output != NULL) {
struct sockaddr_in addr;
#if UV_VERSION_MAJOR == 0
addr = uv_ip4_addr(buf, port);
#else
uv_ip4_addr(buf, port, &addr);
#endif
output->init(copy_cast<struct sockaddr_in*, struct sockaddr*>(&addr));
output->init(&addr);
}

return true;
} else {
char buf[INET6_ADDRSTRLEN];
uv_inet_ntop(AF_INET6, data, buf, sizeof(buf));
#if UV_VERSION_MAJOR == 0
if (uv_inet_ntop(AF_INET6, data, buf, sizeof(buf)).code != UV_OK) {
#else
if (uv_inet_ntop(AF_INET6, data, buf, sizeof(buf)) != 0) {
#endif
return false;
}
if (output != NULL) {
struct sockaddr_in6 addr;
#if UV_VERSION_MAJOR == 0
addr = uv_ip6_addr(buf, port);
#else
uv_ip6_addr(buf, port, &addr);
#endif
output->init(copy_cast<struct sockaddr_in6*, struct sockaddr*>(&addr));
output->init(&addr);
}

return true;
}
return false;
}

bool Address::init(const sockaddr* addr) {
if (addr->sa_family == AF_INET) {
memcpy(&addr_, addr, sizeof(struct sockaddr_in));
memcpy(addr_in(), addr, sizeof(struct sockaddr_in));
return true;
} else if (addr->sa_family == AF_INET6) {
memcpy(&addr_, addr, sizeof(struct sockaddr_in6));
memcpy(addr_in6(), addr, sizeof(struct sockaddr_in6));
return true;
}
return false;
}

void Address::init(const struct sockaddr_in* addr) {
*addr_in() = *addr;
}

void Address::init(const struct sockaddr_in6* addr) {
*addr_in6() = *addr;
}

int Address::port() const {
if (family() == AF_INET) {
return htons(addr_in()->sin_port);
} else if (family() == AF_INET6) {
return htons(addr_in6()->sin6_port);
} else {
assert(false);
return -1;
}
return -1;
}

std::string Address::to_string(bool with_port) const {
Expand All @@ -136,8 +165,6 @@ std::string Address::to_string(bool with_port) const {
if (with_port) ss << "[";
ss << host;
if (with_port) ss << "]:" << port();
} else {
assert(false);
}
return ss.str();
}
Expand All @@ -146,17 +173,18 @@ uint8_t Address::to_inet(uint8_t* data) const {
if (family() == AF_INET) {
memcpy(data, &addr_in()->sin_addr, 4);
return 4;
} else {
} else if (family() == AF_INET6) {
memcpy(data, &addr_in6()->sin6_addr, 16);
return 16;
}
return 0;
}

int Address::compare(const Address& a) const {
int Address::compare(const Address& a, bool with_port) const {
if (family() != a.family()) {
return family() < a.family() ? -1 : 1;
}
if (port() != a.port()) {
if (with_port && port() != a.port()) {
return port() < a.port() ? -1 : 1;
}
if (family() == AF_INET) {
Expand All @@ -166,11 +194,8 @@ int Address::compare(const Address& a) const {
} else if (family() == AF_INET6) {
return memcmp(&(addr_in6()->sin6_addr), &(a.addr_in6()->sin6_addr),
sizeof(addr_in6()->sin6_addr));
} else {
assert(false);
return -1;
}
return 0;
}

}
} // namespace cass
73 changes: 40 additions & 33 deletions src/address.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
#define __CASS_ADDRESS_HPP_INCLUDED__

#include "hash.hpp"
#include "utils.hpp"

#include <sparsehash/dense_hash_set>

#include <ostream>
#include <uv.h>
#include <set>
#include <string.h>
#include <string>
#include <vector>
Expand All @@ -35,62 +34,70 @@ class Address {
static const Address EMPTY_KEY;
static const Address DELETED_KEY;

Address();

Address(const std::string& ip, int port);
static const Address BIND_ANY_IPV4;
static const Address BIND_ANY_IPV6;

Address();
Address(const std::string& ip, int port); // Tests only

static bool from_string(const std::string& ip, int port,
Address* output = NULL);

static void from_inet(const char* data, size_t size, int port,
static bool from_inet(const char* data, size_t size, int port,
Address* output = NULL);

bool init(const struct sockaddr* addr);

const struct sockaddr* addr() const {
return copy_cast<const struct sockaddr_storage*, const struct sockaddr*>(&addr_);
}

struct sockaddr_in* addr_in() {
return copy_cast<struct sockaddr_storage*, struct sockaddr_in*>(&addr_);
}

const struct sockaddr_in* addr_in() const {
return copy_cast<const struct sockaddr_storage*, const sockaddr_in*>(&addr_);
}

struct sockaddr_in6* addr_in6() {
return copy_cast<struct sockaddr_storage*, sockaddr_in6*>(&addr_);
}

const struct sockaddr_in6* addr_in6() const {
return copy_cast<const struct sockaddr_storage*, const sockaddr_in6*>(&addr_);
}

int family() const { return addr_.ss_family; }
#ifdef _WIN32
const struct sockaddr* addr() const { return reinterpret_cast<const struct sockaddr*>(&addr_); }
const struct sockaddr_in* addr_in() const { return reinterpret_cast<const struct sockaddr_in*>(&addr_); }
const struct sockaddr_in6* addr_in6() const { return reinterpret_cast<const struct sockaddr_in6*>(&addr_); }
#else
const struct sockaddr* addr() const { return &addr_; }
const struct sockaddr_in* addr_in() const { return &addr_in_; }
const struct sockaddr_in6* addr_in6() const { return &addr_in6_; }
#endif

int family() const { return addr()->sa_family; }
int port() const;

std::string to_string(bool with_port = false) const;
uint8_t to_inet(uint8_t* data) const;

int compare(const Address& a) const;
int compare(const Address& a, bool with_port = true) const;

private:
void init() { memset(&addr_, 0, sizeof(addr_)); }
void init() { addr()->sa_family = AF_UNSPEC; }
void init(const struct sockaddr_in* addr);
void init(const struct sockaddr_in6* addr);

#ifdef _WIN32
struct sockaddr* addr() { return reinterpret_cast<struct sockaddr*>(&addr_); }
struct sockaddr_in* addr_in() { return reinterpret_cast<struct sockaddr_in*>(&addr_); }
struct sockaddr_in6* addr_in6() { return reinterpret_cast<struct sockaddr_in6*>(&addr_); }

struct sockaddr_storage addr_;
#else
struct sockaddr* addr() { return &addr_; }
struct sockaddr_in* addr_in() { return &addr_in_; }
struct sockaddr_in6* addr_in6() { return &addr_in6_; }

union {
struct sockaddr addr_;
struct sockaddr_in addr_in_;
struct sockaddr_in6 addr_in6_;
};
#endif
};

struct AddressHash {
std::size_t operator()(const cass::Address& a) const {
if (a.family() == AF_INET) {
return cass::hash::fnv1a(reinterpret_cast<const char*>(a.addr_in()),
sizeof(struct sockaddr_in));
return cass::hash::fnv1a(reinterpret_cast<const char*>(a.addr()),
sizeof(struct sockaddr_in));
} else if (a.family() == AF_INET6) {
return cass::hash::fnv1a(reinterpret_cast<const char*>(a.addr_in6()),
sizeof(struct sockaddr_in6));
return cass::hash::fnv1a(reinterpret_cast<const char*>(a.addr()),
sizeof(struct sockaddr_in6));
}
return 0;
}
Expand Down
4 changes: 1 addition & 3 deletions src/async_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#ifndef __CASS_ASYNC_QUEUE_HPP_INCLUDED__
#define __CASS_ASYNC_QUEUE_HPP_INCLUDED__

#include "utils.hpp"

#include <uv.h>

namespace cass {
Expand All @@ -35,7 +33,7 @@ class AsyncQueue {
}

void close_handles() {
uv_close(copy_cast<uv_async_t*, uv_handle_t*>(&async_), NULL);
uv_close(reinterpret_cast<uv_handle_t*>(&async_), NULL);
}

void send() {
Expand Down
19 changes: 9 additions & 10 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "error_response.hpp"
#include "event_response.hpp"
#include "logger.hpp"
#include "utils.hpp"

#ifdef HAVE_NOSIGPIPE
#include <sys/socket.h>
Expand Down Expand Up @@ -344,14 +343,14 @@ void Connection::internal_close(ConnectionState close_state) {

if (state_ != CONNECTION_STATE_CLOSE &&
state_ != CONNECTION_STATE_CLOSE_DEFUNCT) {
uv_handle_t* handle = copy_cast<uv_tcp_t*, uv_handle_t*>(&socket_);
uv_handle_t* handle = reinterpret_cast<uv_handle_t*>(&socket_);
if (!uv_is_closing(handle)) {
heartbeat_timer_.stop();
terminate_timer_.stop();
connect_timer_.stop();
if (state_ == CONNECTION_STATE_CONNECTED ||
state_ == CONNECTION_STATE_READY) {
uv_read_stop(copy_cast<uv_tcp_t*, uv_stream_t*>(&socket_));
uv_read_stop(reinterpret_cast<uv_stream_t*>(&socket_));
}
set_state(close_state);
uv_close(handle, on_close);
Expand Down Expand Up @@ -530,23 +529,23 @@ void Connection::on_connect(Connector* connector) {
connection->host_->address_string().c_str(),
static_cast<void*>(connection));

#ifdef HAVE_NOSIGPIPE
#if defined(HAVE_NOSIGPIPE) && UV_VERSION_MAJOR >= 1
// This must be done after connection for the socket file descriptor to be
// valid.
uv_os_fd_t fd = 0;
int enabled = 1;
if (uv_fileno(copy_cast<uv_tcp_t*, uv_handle_t*>(&connection->socket_), &fd) != 0 ||
if (uv_fileno(reinterpret_cast<uv_handle_t*>(&connection->socket_), &fd) != 0 ||
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&enabled, sizeof(int)) != 0) {
LOG_WARN("Unable to set socket option SO_NOSIGPIPE for host %s",
connection->host_->address_string().c_str());
}
#endif

if (connection->ssl_session_) {
uv_read_start(copy_cast<uv_tcp_t*, uv_stream_t*>(&connection->socket_),
uv_read_start(reinterpret_cast<uv_stream_t*>(&connection->socket_),
Connection::alloc_buffer_ssl, Connection::on_read_ssl);
} else {
uv_read_start(copy_cast<uv_tcp_t*, uv_stream_t*>(&connection->socket_),
uv_read_start(reinterpret_cast<uv_stream_t*>(&connection->socket_),
Connection::alloc_buffer, Connection::on_read);
}

Expand Down Expand Up @@ -1036,7 +1035,7 @@ void Connection::PendingWrite::flush() {
}

is_flushed_ = true;
uv_stream_t* sock_stream = copy_cast<uv_tcp_t*, uv_stream_t*>(&connection_->socket_);
uv_stream_t* sock_stream = reinterpret_cast<uv_stream_t*>(&connection_->socket_);
uv_write(&req_, sock_stream, bufs.data(), bufs.size(), PendingWrite::on_write);
}
}
Expand Down Expand Up @@ -1106,7 +1105,7 @@ void Connection::PendingWriteSsl::flush() {

LOG_TRACE("Sending %u encrypted bytes", static_cast<unsigned int>(encrypted_size_));

uv_stream_t* sock_stream = copy_cast<uv_tcp_t*, uv_stream_t*>(&connection_->socket_);
uv_stream_t* sock_stream = reinterpret_cast<uv_stream_t*>(&connection_->socket_);
uv_write(&req_, sock_stream, bufs.data(), bufs.size(), PendingWriteSsl::on_write);

is_flushed_ = true;
Expand All @@ -1123,7 +1122,7 @@ void Connection::PendingWriteSsl::on_write(uv_write_t* req, int status) {

bool Connection::SslHandshakeWriter::write(Connection* connection, char* buf, size_t buf_size) {
SslHandshakeWriter* writer = new SslHandshakeWriter(connection, buf, buf_size);
uv_stream_t* stream = copy_cast<uv_tcp_t*, uv_stream_t*>(&connection->socket_);
uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&connection->socket_);

int rc = uv_write(&writer->req_, stream, &writer->uv_buf_, 1, SslHandshakeWriter::on_write);
if (rc != 0) {
Expand Down
Loading