-
Notifications
You must be signed in to change notification settings - Fork 1.7k
add host based connection pool map #156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6e23e69
02b8718
943c4d8
2dd31c1
ca5cf69
5caac19
e29426d
e9667db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,7 +100,6 @@ class http_client_config | |
#endif | ||
, m_set_user_nativehandle_options([](native_handle)->void{}) | ||
#if !defined(_WIN32) && !defined(__cplusplus_winrt) | ||
, m_ssl_context_callback([](boost::asio::ssl::context&)->void{}) | ||
, m_tlsext_sni_enabled(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we initialize |
||
#endif | ||
#if defined(_WIN32) && !defined(__cplusplus_winrt) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,12 @@ class asio_connection | |
} | ||
} | ||
|
||
asio_connection(boost::asio::io_service& io_service, const std::string &pool_key, bool start_with_ssl, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback) : | ||
asio_connection(io_service, start_with_ssl, ssl_context_callback) | ||
{ | ||
m_pool_key = pool_key; | ||
} | ||
|
||
~asio_connection() | ||
{ | ||
close(); | ||
|
@@ -103,7 +109,10 @@ class asio_connection | |
boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23); | ||
ssl_context.set_default_verify_paths(); | ||
ssl_context.set_options(boost::asio::ssl::context::default_workarounds); | ||
m_ssl_context_callback(ssl_context); | ||
if (m_ssl_context_callback) | ||
{ | ||
m_ssl_context_callback(ssl_context); | ||
} | ||
m_ssl_stream = utility::details::make_unique<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>>(m_socket, ssl_context); | ||
} | ||
|
||
|
@@ -136,6 +145,9 @@ class asio_connection | |
void set_keep_alive(bool keep_alive) { m_keep_alive = keep_alive; } | ||
bool keep_alive() const { return m_keep_alive; } | ||
bool is_ssl() const { return m_ssl_stream ? true : false; } | ||
const std::string &pool_key() const { return m_pool_key; } | ||
const std::string &nonce() const { return m_nonce; } | ||
void generate_nonce() { m_nonce = m_nonce_generator.generate(); } | ||
|
||
template <typename Iterator, typename Handler> | ||
void async_connect(const Iterator &begin, const Handler &handler) | ||
|
@@ -228,6 +240,7 @@ class asio_connection | |
{ | ||
cancel_pool_timer(); | ||
m_is_reused = true; | ||
generate_nonce(); | ||
} | ||
|
||
// Guards concurrent access to socket/ssl::stream. This is necessary | ||
|
@@ -242,17 +255,19 @@ class asio_connection | |
boost::asio::deadline_timer m_pool_timer; | ||
bool m_is_reused; | ||
bool m_keep_alive; | ||
std::string m_pool_key; | ||
std::string m_nonce; | ||
utility::nonce_generator m_nonce_generator; | ||
}; | ||
|
||
class asio_connection_pool | ||
{ | ||
public: | ||
|
||
asio_connection_pool(boost::asio::io_service& io_service, bool start_with_ssl, const std::chrono::seconds &idle_timeout, const std::function<void(boost::asio::ssl::context&)> &ssl_context_callback) : | ||
asio_connection_pool(boost::asio::io_service& io_service, const std::chrono::seconds &idle_timeout, bool is_shared) : | ||
m_io_service(io_service), | ||
m_timeout_secs(static_cast<int>(idle_timeout.count())), | ||
m_start_with_ssl(start_with_ssl), | ||
m_ssl_context_callback(ssl_context_callback) | ||
m_is_shared(is_shared) | ||
{} | ||
|
||
~asio_connection_pool() | ||
|
@@ -271,89 +286,193 @@ class asio_connection_pool | |
{ | ||
connection->cancel(); | ||
|
||
std::lock_guard<std::mutex> lock(m_connections_mutex); | ||
// This will destroy and remove the connection from pool after the set timeout. | ||
// We use 'this' because async calls to timer handler only occur while the pool exists. | ||
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::handle_pool_timer, this, boost::asio::placeholders::error, connection)); | ||
m_connections.push_back(connection); | ||
if (m_is_shared) | ||
{ | ||
std::lock_guard<std::mutex> lock(m_connections_mutex); | ||
auto it = m_shared_connections.insert(std::make_pair(connection->pool_key(), connection)); | ||
// This will destroy and remove the connection from pool after the set timeout. | ||
// We use 'this' because async calls to timer handler only occur while the pool exists. | ||
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::free_shared_connection, this, boost::asio::placeholders::error, it, std::weak_ptr<asio_connection>(connection), connection->nonce())); | ||
} | ||
else | ||
{ | ||
std::lock_guard<std::mutex> lock(m_connections_mutex); | ||
auto pair = m_connections.insert(connection); | ||
if (pair.second) | ||
{ | ||
// This will destroy and remove the connection from pool after the set timeout. | ||
// We use 'this' because async calls to timer handler only occur while the pool exists. | ||
connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::free_connection, this, boost::asio::placeholders::error, pair.first, std::weak_ptr<asio_connection>(connection), connection->nonce())); | ||
} | ||
} | ||
} | ||
// Otherwise connection is not put to the pool and it will go out of scope. | ||
} | ||
|
||
std::shared_ptr<asio_connection> obtain() | ||
std::shared_ptr<asio_connection> obtain(const std::string &pool_key, bool start_with_ssl, const std::function<void(boost::asio::ssl::context&)>& ssl_context_callback) | ||
{ | ||
std::unique_lock<std::mutex> lock(m_connections_mutex); | ||
if (m_connections.empty()) | ||
if (m_is_shared) | ||
{ | ||
lock.unlock(); | ||
std::unique_lock<std::mutex> lock(m_connections_mutex); | ||
auto it = m_shared_connections.find(pool_key); | ||
if (it == m_shared_connections.end()) | ||
{ | ||
lock.unlock(); | ||
|
||
// No connections in pool => create a new connection instance. | ||
return std::make_shared<asio_connection>(m_io_service, pool_key, start_with_ssl, ssl_context_callback); | ||
} | ||
else | ||
{ | ||
// Reuse connection from pool. | ||
auto connection = it->second; | ||
m_shared_connections.erase(it); | ||
connection->start_reuse(); | ||
lock.unlock(); | ||
|
||
// No connections in pool => create a new connection instance. | ||
return std::make_shared<asio_connection>(m_io_service, m_start_with_ssl, m_ssl_context_callback); | ||
return connection; | ||
} | ||
} | ||
else | ||
{ | ||
// Reuse connection from pool. | ||
auto connection = m_connections.back(); | ||
m_connections.pop_back(); | ||
lock.unlock(); | ||
std::unique_lock<std::mutex> lock(m_connections_mutex); | ||
if (m_connections.empty()) | ||
{ | ||
lock.unlock(); | ||
|
||
connection->start_reuse(); | ||
return connection; | ||
// No connections in pool => create a new connection instance. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that if I make multiple clients with the same host but different SSL context callbacks, the second client's callback will get dropped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! I would make the pool not added to the map if ssl_context_callback is used. |
||
return std::make_shared<asio_connection>(m_io_service, start_with_ssl, ssl_context_callback); | ||
} | ||
else | ||
{ | ||
// Reuse connection from pool. | ||
auto it = m_connections.begin(); | ||
auto connection = *it; | ||
m_connections.erase(it); | ||
connection->start_reuse(); | ||
lock.unlock(); | ||
|
||
return connection; | ||
} | ||
} | ||
} | ||
|
||
static std::shared_ptr<asio_connection_pool> shared_instance(); | ||
|
||
private: | ||
|
||
// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope. | ||
void handle_pool_timer(const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection) | ||
void free_shared_connection(const boost::system::error_code& ec, std::multimap<std::string, std::shared_ptr<asio_connection>>::iterator it, const std::weak_ptr<asio_connection> &connection, const std::string &nonce) | ||
{ | ||
if (!ec) | ||
{ | ||
auto connection_shared = connection.lock(); | ||
if (connection_shared) | ||
// Compare nonce here to ensure the iterator is valid, the connection not been reused. | ||
if (connection_shared && (connection_shared->nonce() == nonce)) | ||
{ | ||
std::lock_guard<std::mutex> lock(m_connections_mutex); | ||
const auto &iter = std::find(m_connections.begin(), m_connections.end(), connection_shared); | ||
if (iter != m_connections.end()) | ||
{ | ||
m_connections.erase(iter); | ||
} | ||
m_shared_connections.erase(it); | ||
} | ||
} | ||
} | ||
|
||
// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope. | ||
void free_connection(const boost::system::error_code& ec, std::set<std::shared_ptr<asio_connection>>::iterator it, const std::weak_ptr<asio_connection> &connection, const std::string &nonce) | ||
{ | ||
if (!ec) | ||
{ | ||
auto connection_shared = connection.lock(); | ||
// Compare nonce here to ensure the iterator is valid, the connection not been reused. | ||
if (connection_shared && (connection_shared->nonce() == nonce)) | ||
{ | ||
std::lock_guard<std::mutex> lock(m_connections_mutex); | ||
m_connections.erase(it); | ||
} | ||
} | ||
} | ||
|
||
boost::asio::io_service& m_io_service; | ||
const int m_timeout_secs; | ||
const bool m_start_with_ssl; | ||
const std::function<void(boost::asio::ssl::context&)>& m_ssl_context_callback; | ||
std::vector<std::shared_ptr<asio_connection> > m_connections; | ||
bool m_is_shared; | ||
std::multimap<std::string, std::shared_ptr<asio_connection>> m_shared_connections; | ||
std::set<std::shared_ptr<asio_connection>> m_connections; | ||
std::mutex m_connections_mutex; | ||
}; | ||
|
||
std::shared_ptr<asio_connection_pool> asio_connection_pool::shared_instance() | ||
{ | ||
const std::chrono::seconds idle_timeout(30); // Unused sockets are kept in pool for 30 seconds. | ||
static std::shared_ptr<asio_connection_pool> s_instance = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(), idle_timeout, true); | ||
|
||
return s_instance; | ||
} | ||
|
||
class asio_client : public _http_client_communicator | ||
{ | ||
public: | ||
asio_client(http::uri address, http_client_config client_config) | ||
: _http_client_communicator(std::move(address), std::move(client_config)) | ||
, m_pool(crossplat::threadpool::shared_instance().service(), | ||
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(), | ||
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds. | ||
this->client_config().get_ssl_context_callback()) | ||
, m_resolver(crossplat::threadpool::shared_instance().service()) | ||
{} | ||
{ | ||
m_start_with_ssl = base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(); | ||
m_ssl_context_callback = this->client_config().get_ssl_context_callback(); | ||
const std::chrono::seconds idle_timeout(30); // Unused sockets are kept in pool for 30 seconds. | ||
|
||
if (m_ssl_context_callback) | ||
{ | ||
// We will use a private connection pool because there is no better approaches to compare callback functors. | ||
m_pool = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(), idle_timeout, false); | ||
} | ||
else | ||
{ | ||
init_pool_key(); | ||
m_pool = asio_connection_pool::shared_instance(); | ||
} | ||
} | ||
|
||
void send_request(const std::shared_ptr<request_context> &request_ctx) override; | ||
|
||
unsigned long open() override { return 0; } | ||
|
||
virtual pplx::task<http_response> propagate(http_request request) override; | ||
|
||
asio_connection_pool m_pool; | ||
void init_pool_key(); | ||
|
||
std::shared_ptr<asio_connection> obtain_connection(); | ||
|
||
std::shared_ptr<asio_connection_pool> m_pool; | ||
tcp::resolver m_resolver; | ||
bool m_start_with_ssl; | ||
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback; | ||
std::string m_pool_key; | ||
}; | ||
|
||
void asio_client::init_pool_key() | ||
{ | ||
m_pool_key = base_uri().to_string(); | ||
|
||
auto &credentials = _http_client_communicator::client_config().credentials(); | ||
if (credentials.is_set()) | ||
{ | ||
m_pool_key.append(credentials.username()); | ||
} | ||
|
||
auto &proxy = _http_client_communicator::client_config().proxy(); | ||
if (proxy.is_specified()) | ||
{ | ||
m_pool_key.append(proxy.address().to_string()); | ||
if (proxy.credentials().is_set()) | ||
{ | ||
m_pool_key.append(proxy.credentials().username()); | ||
} | ||
} | ||
} | ||
|
||
std::shared_ptr<asio_connection> asio_client::obtain_connection() | ||
{ | ||
return m_pool->obtain(m_pool_key, m_start_with_ssl, m_ssl_context_callback); | ||
} | ||
|
||
class asio_context : public request_context, public std::enable_shared_from_this<asio_context> | ||
{ | ||
friend class asio_client; | ||
|
@@ -375,13 +494,13 @@ class asio_context : public request_context, public std::enable_shared_from_this | |
{ | ||
m_timer.stop(); | ||
// Release connection back to the pool. If connection was not closed, it will be put to the pool for reuse. | ||
std::static_pointer_cast<asio_client>(m_http_client)->m_pool.release(m_connection); | ||
std::static_pointer_cast<asio_client>(m_http_client)->m_pool->release(m_connection); | ||
} | ||
|
||
static std::shared_ptr<request_context> create_request_context(std::shared_ptr<_http_client_communicator> &client, http_request &request) | ||
{ | ||
auto client_cast(std::static_pointer_cast<asio_client>(client)); | ||
auto connection(client_cast->m_pool.obtain()); | ||
auto connection(client_cast->obtain_connection()); | ||
auto ctx = std::make_shared<asio_context>(client, request, connection); | ||
ctx->m_timer.set_ctx(std::weak_ptr<asio_context>(ctx)); | ||
return ctx; | ||
|
@@ -458,7 +577,7 @@ class asio_context : public request_context, public std::enable_shared_from_this | |
m_context->m_timer.reset(); | ||
//// Replace the connection. This causes old connection object to go out of scope. | ||
auto client = std::static_pointer_cast<asio_client>(m_context->m_http_client); | ||
m_context->m_connection = client->m_pool.obtain(); | ||
m_context->m_connection = client->obtain_connection(); | ||
|
||
auto endpoint = *endpoints; | ||
m_context->m_connection->async_connect(endpoint, boost::bind(&ssl_proxy_tunnel::handle_tcp_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints)); | ||
|
@@ -477,7 +596,7 @@ class asio_context : public request_context, public std::enable_shared_from_this | |
{ | ||
m_context->report_error("Failed to send connect request to proxy.", err, httpclient_errorcode_context::writebody); | ||
} | ||
} | ||
} | ||
|
||
void handle_status_line(const boost::system::error_code& ec) | ||
{ | ||
|
@@ -811,7 +930,7 @@ class asio_context : public request_context, public std::enable_shared_from_this | |
{ | ||
// Replace the connection. This causes old connection object to go out of scope. | ||
auto client = std::static_pointer_cast<asio_client>(m_http_client); | ||
m_connection = client->m_pool.obtain(); | ||
m_connection = client->obtain_connection(); | ||
|
||
auto endpoint = *endpoints; | ||
m_connection->async_connect(endpoint, boost::bind(&asio_context::handle_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that the
ssl_context_callback
does not have a valid target.