提交 68b94c5c 编写于 作者: K kssenii

Fixes

上级 972611e3
......@@ -51,7 +51,8 @@
<https_port>8443</https_port>
<tcp_port_secure>9440</tcp_port_secure>
-->
<rabbitmq_username>root</rabbitmq_username>
<rabbitmq_password>clickhouse</rabbitmq_password>
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
......
......@@ -6,8 +6,7 @@ namespace DB
enum
{
Lock_timeout = 50,
Max_threads_to_pass = 10
Lock_timeout = 50
};
......@@ -50,17 +49,6 @@ void RabbitMQHandler::start(std::atomic<bool> & check_param)
mutex_before_event_loop.unlock();
}
else
{
if (++count_passed == Max_threads_to_pass)
{
/* Event loop is blocking to the thread that started it and it is not good to block one single thread as it loops
* untill there are no active events, but there can be too many of them for one thread to be blocked for so long.
*/
stop();
count_passed = 0;
}
}
}
void RabbitMQHandler::stop()
......
......@@ -44,6 +44,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, stopped(stopped_)
, exchange_declared(false)
, false_param(false)
, loop_attempt(false)
{
messages.clear();
current = messages.begin();
......@@ -225,7 +226,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
message_received += row_delimiter;
//LOG_TRACE(log, "Consumer {} received a message", channel_id);
bool stop_loop = false;
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl() (below).
......@@ -236,7 +237,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
* executing all callbacks on the connection (not only its own), then there should be some point to unblock
*/
if (received.size() >= Received_max_to_stop_loop)
if (!loop_attempt && received.size() % Received_max_to_stop_loop == 0)
{
stop_loop = true;
}
......@@ -284,7 +285,9 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
if (received.empty())
{
/// Run the onReceived callbacks to save the messages that have been received by now
loop_attempt = true;
startEventLoop(false_param);
loop_attempt = false;
}
if (received.empty())
......
......@@ -66,6 +66,7 @@ private:
String current_exchange_name;
size_t count_subscribed = 0;
size_t count_bound_queues = 0;
std::atomic<bool> loop_attempt;
Messages received;
Messages messages;
......
......@@ -74,10 +74,14 @@ StorageRabbitMQ::StorageRabbitMQ(
, hash_exchange(hash_exchange_)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, num_consumers_)
, login_password(std::make_pair(
rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"),
rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse")))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, evbase(event_base_new())
, eventHandler(evbase, log)
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/"))
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), "/"))
{
size_t cnt_retries = 0;
while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max)
......@@ -208,14 +212,14 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
return std::make_shared<ReadBufferFromRabbitMQConsumer>(consumer_channel, eventHandler, exchange_name,
routing_key, next_channel_id, log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled);
return std::make_shared<ReadBufferFromRabbitMQConsumer>(consumer_channel, eventHandler, exchange_name, routing_key,
next_channel_id, log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled);
}
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, routing_key, exchange_name,
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, login_password, routing_key, exchange_name,
log, num_consumers * num_queues, bind_by_id, hash_exchange,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}
......
......@@ -86,7 +86,8 @@ private:
const bool hash_exchange;
Poco::Logger * log;
std::pair<std::string, UInt16> parsed_address;
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
event_base * evbase;
RabbitMQHandler eventHandler;
......
......@@ -20,7 +20,8 @@ enum
};
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<std::string, UInt16> & parsed_address,
std::pair<String, UInt16> & parsed_address,
std::pair<String, String> & login_password_,
const String & routing_key_,
const String & exchange_,
Poco::Logger * log_,
......@@ -31,6 +32,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
size_t rows_per_message,
size_t chunk_size_)
: WriteBuffer(nullptr, 0)
, login_password(login_password_)
, routing_key(routing_key_)
, exchange_name(exchange_)
, log(log_)
......@@ -42,7 +44,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, chunk_size(chunk_size_)
, producerEvbase(event_base_new())
, eventHandler(producerEvbase, log)
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/"))
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), "/"))
{
/* The reason behind making a separate connection for each concurrent producer is explained here:
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
......
......@@ -7,6 +7,7 @@
#include <atomic>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Interpreters/Context.h>
namespace DB
{
......@@ -18,7 +19,8 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer
{
public:
WriteBufferToRabbitMQProducer(
std::pair<std::string, UInt16> & parsed_address,
std::pair<String, UInt16> & parsed_address,
std::pair<String, String> & login_password_,
const String & routing_key_,
const String & exchange_,
Poco::Logger * log_,
......@@ -40,6 +42,7 @@ private:
void checkExchange();
void startEventLoop(std::atomic<bool> & check_param);
std::pair<String, String> & login_password;
const String routing_key;
const String exchange_name;
const bool bind_by_id;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册