Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ typedef struct CassResult_ CassResult;
*/
typedef struct CassErrorResult_ CassErrorResult;


/**
* An object that represents a cluster node.
*
* @struct CassNode
*/
typedef struct CassNode_ CassNode;

/**
* An object used to iterate over a group of rows, columns or collection values.
*
Expand Down Expand Up @@ -5002,6 +5010,21 @@ cass_future_custom_payload_item(CassFuture* future,
const cass_byte_t** value,
size_t* value_size);

/**
* Gets the node that acted as coordinator for this query. If the future is not
* ready this method will wait for the future to be set.
*
* @param future
* @return The coordinator node that handled the query. The lifetime of this
* object is the same as the result object it came from. NULL can be returned
* if the future is not a response future or if an error occurs before a
* coordinator responds.
*
* @see cass_statement_set_node()
*/
CASS_EXPORT const CassNode*
cass_future_coordinator(CassFuture* future);

/***********************************************************************************
*
* Statement
Expand Down Expand Up @@ -5395,6 +5418,21 @@ cass_statement_set_host_inet(CassStatement* statement,
const CassInet* host,
int port);

/**
* Same as cass_statement_set_host(), but using the `CassNode` type. This can
* be used to re-query the same coordinator when used with the result of
* `cass_future_coordinator()`
*
* @param statement
* @param address
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_future_coordinator()
*/
CASS_EXPORT CassError
cass_statement_set_node(CassStatement* statement,
const CassNode* node);

/**
* Binds null to a query or bound statement at the specified index.
*
Expand Down
3 changes: 3 additions & 0 deletions src/address.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "allocated.hpp"
#include "callback.hpp"
#include "dense_hash_set.hpp"
#include "external.hpp"
#include "string.hpp"
#include "vector.hpp"

Expand Down Expand Up @@ -168,4 +169,6 @@ inline std::ostream& operator<<(std::ostream& os, const datastax::internal::core

} // namespace std

EXTERNAL_TYPE(datastax::internal::core::Address, CassNode)

#endif
8 changes: 8 additions & 0 deletions src/future.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ CassError cass_future_custom_payload_item(CassFuture* future, size_t index, cons
return CASS_OK;
}

const CassNode* cass_future_coordinator(CassFuture* future) {
if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
return NULL;
}
const Address& node = static_cast<ResponseFuture*>(future->from())->address();
return node.is_valid() ? CassNode::to(&node) : NULL;
}

} // extern "C"

bool Future::set_callback(Future::Callback callback, void* data) {
Expand Down
2 changes: 1 addition & 1 deletion src/request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class ResponseFuture : public Future {
return false;
}

Address address() {
const Address& address() {
ScopedMutex lock(&mutex_);
internal_wait(lock);
return address_;
Expand Down
8 changes: 8 additions & 0 deletions src/statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet*
return CASS_OK;
}

CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) {
if (node == NULL) {
return CASS_ERROR_LIB_BAD_PARAMS;
}
statement->set_host(*node->from());
return CASS_OK;
}

#define CASS_STATEMENT_BIND(Name, Params, Value) \
CassError cass_statement_bind_##Name(CassStatement* statement, size_t index Params) { \
return statement->set(index, Value); \
Expand Down
2 changes: 2 additions & 0 deletions tests/src/integration/objects/result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class Result : public Object<const CassResult, cass_result_free> {
return std::string(token, token_length);
}

const CassNode* coordinator() { return cass_future_coordinator(future_.get()); }

private:
/**
* Future wrapped object
Expand Down
7 changes: 7 additions & 0 deletions tests/src/integration/objects/statement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ class Statement : public Object<CassStatement, cass_statement_free> {
ASSERT_EQ(CASS_OK, cass_statement_set_host_inet(get(), host, port));
}

/**
* Set node to run statement on use `CassNode` type.
*
* @param node
*/
void set_node(const CassNode* node) { ASSERT_EQ(CASS_OK, cass_statement_set_node(get(), node)); }

/**
* Set the paging size for the statement.
*
Expand Down
25 changes: 25 additions & 0 deletions tests/src/integration/tests/test_statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetHostInet) {
}
}

/**
* Set node on a statement and verify that query goes to the correct node.
*
* @test_category configuration
* @expected_result The local "rpc_address" matches a second query to the same
* coordinator.
*/
CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetNode) {
CHECK_FAILURE;

Statement statement("SELECT rpc_address FROM system.local");
Result result1 = session_.execute(statement);
Inet rpc_address1 = result1.first_row().column_by_name<Inet>("rpc_address");
const CassNode* node = result1.coordinator();
ASSERT_TRUE(node != NULL);

statement.set_node(node);

for (int i = 0; i < 4; ++i) {
Result result2 = session_.execute(statement);
Inet rpc_address2 = result1.first_row().column_by_name<Inet>("rpc_address");
ASSERT_EQ(rpc_address1, rpc_address2);
}
}

/**
* Set a host on a statement that has an invalid port.
*
Expand Down