未验证 提交 f9688a43 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #17784 from ClickHouse/backport/20.12/17681

Backport #17681 to 20.12: In mysqlxx::Pool: fix for reconnection problem
......@@ -104,6 +104,11 @@ void Connection::connect(const char* db,
if (mysql_options(driver.get(), MYSQL_OPT_LOCAL_INFILE, &enable_local_infile_arg))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Enables auto-reconnect.
bool reconnect = true;
if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Specifies particular ssl key and certificate if it needs
if (mysql_ssl_set(driver.get(), ifNotEmpty(ssl_key), ifNotEmpty(ssl_cert), ifNotEmpty(ssl_ca), nullptr, nullptr))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
......@@ -115,11 +120,6 @@ void Connection::connect(const char* db,
if (mysql_set_character_set(driver.get(), "UTF8"))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Enables auto-reconnect.
bool reconnect = true;
if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
is_connected = true;
}
......
......@@ -26,6 +26,7 @@ void Pool::Entry::incrementRefCount()
mysql_thread_init();
}
void Pool::Entry::decrementRefCount()
{
if (!data)
......@@ -150,28 +151,39 @@ Pool::Entry Pool::tryGet()
initialize();
/// Searching for connection which was established but wasn't used.
for (auto & connection : connections)
/// Try to pick an idle connection from already allocated
for (auto connection_it = connections.cbegin(); connection_it != connections.cend();)
{
if (connection->ref_count == 0)
Connection * connection_ptr = *connection_it;
/// Fixme: There is a race condition here b/c we do not synchronize with Pool::Entry's copy-assignment operator
if (connection_ptr->ref_count == 0)
{
Entry res(connection, this);
return res.tryForceConnected() ? res : Entry();
Entry res(connection_ptr, this);
if (res.tryForceConnected()) /// Tries to reestablish connection as well
return res;
auto & logger = Poco::Util::Application::instance().logger();
logger.information("Idle connection to mysql server cannot be recovered, dropping it.");
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
connection_it = connections.erase(connection_it);
::delete connection_ptr; /// TODO: Manual memory management is awkward (matches allocConnection() method)
}
else
++connection_it;
}
/// Throws if pool is overflowed.
if (connections.size() >= max_connections)
throw Poco::Exception("mysqlxx::Pool is full");
/// Allocates new connection.
Connection * conn = allocConnection(true);
if (conn)
return Entry(conn, this);
Connection * connection_ptr = allocConnection(true);
if (connection_ptr)
return {connection_ptr, this};
return Entry();
return {};
}
void Pool::removeConnection(Connection* connection)
{
std::lock_guard<std::mutex> lock(mutex);
......@@ -199,11 +211,9 @@ void Pool::Entry::forceConnected() const
throw Poco::RuntimeException("Tried to access NULL database connection.");
Poco::Util::Application & app = Poco::Util::Application::instance();
if (data->conn.ping())
return;
bool first = true;
do
while (!tryForceConnected())
{
if (first)
first = false;
......@@ -225,7 +235,26 @@ void Pool::Entry::forceConnected() const
pool->rw_timeout,
pool->enable_local_infile);
}
while (!data->conn.ping());
}
bool Pool::Entry::tryForceConnected() const
{
auto * const mysql_driver = data->conn.getDriver();
const auto prev_connection_id = mysql_thread_id(mysql_driver);
if (data->conn.ping()) /// Attempts to reestablish lost connection
{
const auto current_connection_id = mysql_thread_id(mysql_driver);
if (prev_connection_id != current_connection_id)
{
auto & logger = Poco::Util::Application::instance().logger();
logger.information("Connection to mysql server has been reestablished. Connection id changed: %d -> %d",
prev_connection_id, current_connection_id);
}
return true;
}
return false;
}
......
......@@ -127,10 +127,7 @@ public:
void forceConnected() const;
/// Connects to database. If connection is failed then returns false.
bool tryForceConnected() const
{
return data->conn.ping();
}
bool tryForceConnected() const;
void incrementRefCount();
void decrementRefCount();
......
add_executable (mysqlxx_test mysqlxx_test.cpp)
target_link_libraries (mysqlxx_test PRIVATE mysqlxx)
add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp)
target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx)
#include <mysqlxx/mysqlxx.h>
#include <chrono>
#include <iostream>
#include <sstream>
#include <thread>
namespace
{
mysqlxx::Pool::Entry getWithFailover(mysqlxx::Pool & connections_pool)
{
using namespace std::chrono;
constexpr size_t max_tries = 3;
mysqlxx::Pool::Entry worker_connection;
for (size_t try_no = 1; try_no <= max_tries; ++try_no)
{
try
{
worker_connection = connections_pool.tryGet();
if (!worker_connection.isNull())
{
return worker_connection;
}
}
catch (const Poco::Exception & e)
{
if (e.displayText().find("mysqlxx::Pool is full") != std::string::npos)
{
std::cerr << e.displayText() << std::endl;
}
std::cerr << "Connection to " << connections_pool.getDescription() << " failed: " << e.displayText() << std::endl;
}
std::clog << "Connection to all replicas failed " << try_no << " times" << std::endl;
std::this_thread::sleep_for(1s);
}
std::stringstream message;
message << "Connections to all replicas failed: " << connections_pool.getDescription();
throw Poco::Exception(message.str());
}
}
int main(int, char **)
{
using namespace std::chrono;
const char * remote_mysql = "localhost";
const std::string test_query = "SHOW DATABASES";
mysqlxx::Pool mysql_conn_pool("", remote_mysql, "default", "10203040", 3306);
size_t iteration = 0;
while (++iteration)
{
std::clog << "Iteration: " << iteration << std::endl;
try
{
std::clog << "Acquiring DB connection ...";
mysqlxx::Pool::Entry worker = getWithFailover(mysql_conn_pool);
std::clog << "ok" << std::endl;
std::clog << "Preparing query (5s sleep) ...";
std::this_thread::sleep_for(5s);
mysqlxx::Query query = worker->query();
query << test_query;
std::clog << "ok" << std::endl;
std::clog << "Querying result (5s sleep) ...";
std::this_thread::sleep_for(5s);
mysqlxx::UseQueryResult result = query.use();
std::clog << "ok" << std::endl;
std::clog << "Fetching result data (5s sleep) ...";
std::this_thread::sleep_for(5s);
size_t rows_count = 0;
while (result.fetch())
++rows_count;
std::clog << "ok" << std::endl;
std::clog << "Read " << rows_count << " rows." << std::endl;
}
catch (const Poco::Exception & e)
{
std::cerr << "Iteration FAILED:\n" << e.displayText() << std::endl;
}
std::clog << "====================" << std::endl;
std::this_thread::sleep_for(3s);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册