diff --git a/include/cassandra.h b/include/cassandra.h index b3ed2b2da..63341bb3c 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -227,6 +227,14 @@ typedef struct CassResult_ CassResult; */ typedef struct CassErrorResult_ CassErrorResult; + +/** + * An object that represents a cluster node. + * + * @struct CassNode + */ +typedef struct CassNode_ CassNode; + /** * An object used to iterate over a group of rows, columns or collection values. * @@ -5002,6 +5010,21 @@ cass_future_custom_payload_item(CassFuture* future, const cass_byte_t** value, size_t* value_size); +/** + * Gets the node that acted as coordinator for this query. If the future is not + * ready this method will wait for the future to be set. + * + * @param future + * @return The coordinator node that handled the query. The lifetime of this + * object is the same as the result object it came from. NULL can be returned + * if the future is not a response future or if an error occurs before a + * coordinator responds. + * + * @see cass_statement_set_node() + */ +CASS_EXPORT const CassNode* +cass_future_coordinator(CassFuture* future); + /*********************************************************************************** * * Statement @@ -5395,6 +5418,21 @@ cass_statement_set_host_inet(CassStatement* statement, const CassInet* host, int port); +/** + * Same as cass_statement_set_host(), but using the `CassNode` type. This can + * be used to re-query the same coordinator when used with the result of + * `cass_future_coordinator()` + * + * @param statement + * @param address + * @return CASS_OK if successful, otherwise an error occurred. + * + * @see cass_future_coordinator() + */ +CASS_EXPORT CassError +cass_statement_set_node(CassStatement* statement, + const CassNode* node); + /** * Binds null to a query or bound statement at the specified index. * diff --git a/src/address.hpp b/src/address.hpp index 969ad03ad..c74c6253a 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -20,6 +20,7 @@ #include "allocated.hpp" #include "callback.hpp" #include "dense_hash_set.hpp" +#include "external.hpp" #include "string.hpp" #include "vector.hpp" @@ -168,4 +169,6 @@ inline std::ostream& operator<<(std::ostream& os, const datastax::internal::core } // namespace std +EXTERNAL_TYPE(datastax::internal::core::Address, CassNode) + #endif diff --git a/src/future.cpp b/src/future.cpp index 9bb6c6ec0..1d4ad5347 100644 --- a/src/future.cpp +++ b/src/future.cpp @@ -163,6 +163,14 @@ CassError cass_future_custom_payload_item(CassFuture* future, size_t index, cons return CASS_OK; } +const CassNode* cass_future_coordinator(CassFuture* future) { + if (future->type() != Future::FUTURE_TYPE_RESPONSE) { + return NULL; + } + const Address& node = static_cast(future->from())->address(); + return node.is_valid() ? CassNode::to(&node) : NULL; +} + } // extern "C" bool Future::set_callback(Future::Callback callback, void* data) { diff --git a/src/request_handler.hpp b/src/request_handler.hpp index c7f65a5f6..14fd18286 100644 --- a/src/request_handler.hpp +++ b/src/request_handler.hpp @@ -119,7 +119,7 @@ class ResponseFuture : public Future { return false; } - Address address() { + const Address& address() { ScopedMutex lock(&mutex_); internal_wait(lock); return address_; diff --git a/src/statement.cpp b/src/statement.cpp index b0081f3d2..ae1d42ba0 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -171,6 +171,14 @@ CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet* return CASS_OK; } +CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) { + if (node == NULL) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + statement->set_host(*node->from()); + return CASS_OK; +} + #define CASS_STATEMENT_BIND(Name, Params, Value) \ CassError cass_statement_bind_##Name(CassStatement* statement, size_t index Params) { \ return statement->set(index, Value); \ diff --git a/tests/src/integration/objects/result.hpp b/tests/src/integration/objects/result.hpp index 104925282..b0242331c 100644 --- a/tests/src/integration/objects/result.hpp +++ b/tests/src/integration/objects/result.hpp @@ -220,6 +220,8 @@ class Result : public Object { return std::string(token, token_length); } + const CassNode* coordinator() { return cass_future_coordinator(future_.get()); } + private: /** * Future wrapped object diff --git a/tests/src/integration/objects/statement.hpp b/tests/src/integration/objects/statement.hpp index 23feb3923..4a7512d66 100644 --- a/tests/src/integration/objects/statement.hpp +++ b/tests/src/integration/objects/statement.hpp @@ -260,6 +260,13 @@ class Statement : public Object { ASSERT_EQ(CASS_OK, cass_statement_set_host_inet(get(), host, port)); } + /** + * Set node to run statement on use `CassNode` type. + * + * @param node + */ + void set_node(const CassNode* node) { ASSERT_EQ(CASS_OK, cass_statement_set_node(get(), node)); } + /** * Set the paging size for the statement. * diff --git a/tests/src/integration/tests/test_statement.cpp b/tests/src/integration/tests/test_statement.cpp index e51d5dd90..8af82463c 100644 --- a/tests/src/integration/tests/test_statement.cpp +++ b/tests/src/integration/tests/test_statement.cpp @@ -68,6 +68,31 @@ CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetHostInet) { } } +/** + * Set node on a statement and verify that query goes to the correct node. + * + * @test_category configuration + * @expected_result The local "rpc_address" matches a second query to the same + * coordinator. + */ +CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetNode) { + CHECK_FAILURE; + + Statement statement("SELECT rpc_address FROM system.local"); + Result result1 = session_.execute(statement); + Inet rpc_address1 = result1.first_row().column_by_name("rpc_address"); + const CassNode* node = result1.coordinator(); + ASSERT_TRUE(node != NULL); + + statement.set_node(node); + + for (int i = 0; i < 4; ++i) { + Result result2 = session_.execute(statement); + Inet rpc_address2 = result1.first_row().column_by_name("rpc_address"); + ASSERT_EQ(rpc_address1, rpc_address2); + } +} + /** * Set a host on a statement that has an invalid port. *