diff --git a/programs/server/config.xml b/programs/server/config.xml index 21605edeb36e4f0cef5c872d3d1257ca7d5df927..b39ee180466a8bd4a932eee6521e14e333773c4d 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -51,7 +51,8 @@ 8443 9440 --> - + root + clickhouse diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 1a3ede7942031800f044191a69d4fdfe81ba2ba0..34a77489faaddbcd206a9f344b59043cf7c887b4 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -6,8 +6,7 @@ namespace DB enum { - Lock_timeout = 50, - Max_threads_to_pass = 10 + Lock_timeout = 50 }; @@ -50,17 +49,6 @@ void RabbitMQHandler::start(std::atomic & check_param) mutex_before_event_loop.unlock(); } - else - { - if (++count_passed == Max_threads_to_pass) - { - /* Event loop is blocking to the thread that started it and it is not good to block one single thread as it loops - * untill there are no active events, but there can be too many of them for one thread to be blocked for so long. - */ - stop(); - count_passed = 0; - } - } } void RabbitMQHandler::stop() diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index f8259ce8c4c06277cb937d06da9ae1fee350b55a..1bd2c7831ff24f1ede031761184449109eae5d1e 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -44,6 +44,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , stopped(stopped_) , exchange_declared(false) , false_param(false) + , loop_attempt(false) { messages.clear(); current = messages.begin(); @@ -225,7 +226,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) message_received += row_delimiter; //LOG_TRACE(log, "Consumer {} received a message", channel_id); - + bool stop_loop = false; /// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl() (below). @@ -236,7 +237,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) /* As event loop is blocking to the thread that started it and a single thread should not be blocked while * executing all callbacks on the connection (not only its own), then there should be some point to unblock */ - if (received.size() >= Received_max_to_stop_loop) + if (!loop_attempt && received.size() % Received_max_to_stop_loop == 0) { stop_loop = true; } @@ -284,7 +285,9 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() if (received.empty()) { /// Run the onReceived callbacks to save the messages that have been received by now + loop_attempt = true; startEventLoop(false_param); + loop_attempt = false; } if (received.empty()) diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 55adb39bdce9382a5718901a0f8ca8db49582458..97eca73cecee1aca850e478437f50445d9400696 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -66,6 +66,7 @@ private: String current_exchange_name; size_t count_subscribed = 0; size_t count_bound_queues = 0; + std::atomic loop_attempt; Messages received; Messages messages; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 7cbfb164a2d2207855394cb084b788d8a9b307e6..481314a38c2f123e1db5fa7252efabf32827d47c 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -74,10 +74,14 @@ StorageRabbitMQ::StorageRabbitMQ( , hash_exchange(hash_exchange_) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) , semaphore(0, num_consumers_) + , login_password(std::make_pair( + rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"), + rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse"))) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , evbase(event_base_new()) , eventHandler(evbase, log) - , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/")) + , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, + AMQP::Login(login_password.first, login_password.second), "/")) { size_t cnt_retries = 0; while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max) @@ -208,14 +212,14 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() ChannelPtr consumer_channel = std::make_shared(&connection); - return std::make_shared(consumer_channel, eventHandler, exchange_name, - routing_key, next_channel_id, log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled); + return std::make_shared(consumer_channel, eventHandler, exchange_name, routing_key, + next_channel_id, log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled); } ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { - return std::make_shared(parsed_address, routing_key, exchange_name, + return std::make_shared(parsed_address, login_password, routing_key, exchange_name, log, num_consumers * num_queues, bind_by_id, hash_exchange, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 635d53e6cf02db3798efe6e043e863524f3027b2..563f37ae6f17db19fdae8da39295b2b323ff3d37 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -86,7 +86,8 @@ private: const bool hash_exchange; Poco::Logger * log; - std::pair parsed_address; + std::pair parsed_address; + std::pair login_password; event_base * evbase; RabbitMQHandler eventHandler; diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index e61a8e1ccd81099f986871af2b5add645f71170c..7c0764853c7c11dc12c872228311c9dacad0254b 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -20,7 +20,8 @@ enum }; WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( - std::pair & parsed_address, + std::pair & parsed_address, + std::pair & login_password_, const String & routing_key_, const String & exchange_, Poco::Logger * log_, @@ -31,6 +32,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( size_t rows_per_message, size_t chunk_size_) : WriteBuffer(nullptr, 0) + , login_password(login_password_) , routing_key(routing_key_) , exchange_name(exchange_) , log(log_) @@ -42,7 +44,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , chunk_size(chunk_size_) , producerEvbase(event_base_new()) , eventHandler(producerEvbase, log) - , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/")) + , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, + AMQP::Login(login_password.first, login_password.second), "/")) { /* The reason behind making a separate connection for each concurrent producer is explained here: * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index c61a76a3e74e96220dc7707aa6f39727c48ff96a..e0c48556239c1b96db26aed338ae442b2e3eda10 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -18,7 +19,8 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer { public: WriteBufferToRabbitMQProducer( - std::pair & parsed_address, + std::pair & parsed_address, + std::pair & login_password_, const String & routing_key_, const String & exchange_, Poco::Logger * log_, @@ -40,6 +42,7 @@ private: void checkExchange(); void startEventLoop(std::atomic & check_param); + std::pair & login_password; const String routing_key; const String exchange_name; const bool bind_by_id;