diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 6d3ed41cf78747024d07f06d903a82a679987da8..2fdb142423fcc4bcbf908d4a26ff443f6d7951f1 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -1,10 +1,14 @@ #include +#include #include namespace DB { -static const auto Lock_timeout = 50; +namespace ErrorCodes +{ + extern const int CANNOT_CONNECT_RABBITMQ; +} /* The object of this class is shared between concurrent consumers (who share the same connection == share the same * event loop and handler). @@ -23,15 +27,14 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes if (!connection->usable() || !connection->ready()) { - LOG_ERROR(log, "Connection lost completely"); + throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ); } - - stop(); } -void RabbitMQHandler::startLoop() +void RabbitMQHandler::startBackgroundLoop() { + /// stop_loop variable is updated in a separate thread while (!stop_loop) { uv_run(loop, UV_RUN_NOWAIT); @@ -39,24 +42,13 @@ void RabbitMQHandler::startLoop() } -void RabbitMQHandler::startConsumerLoop(std::atomic & loop_started) -{ - std::lock_guard lock(mutex_before_event_loop); - uv_run(loop, UV_RUN_NOWAIT); -} - - -void RabbitMQHandler::startProducerLoop() -{ - uv_run(loop, UV_RUN_NOWAIT); -} - - -void RabbitMQHandler::stop() +void RabbitMQHandler::startLoop() { - //std::lock_guard lock(mutex_before_loop_stop); - //uv_stop(loop); - stop_loop = true; + if (starting_loop.try_lock()) + { + uv_run(loop, UV_RUN_NOWAIT); + starting_loop.unlock(); + } } } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 60cfd5c4868714afe9f32ef34712aad47f023748..eb358da94254414ecdb4d7a7c0db8dfeaf81396d 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -16,22 +16,18 @@ class RabbitMQHandler : public AMQP::LibUvHandler public: RabbitMQHandler(uv_loop_t * evbase_, Poco::Logger * log_); - void onError(AMQP::TcpConnection * connection, const char * message) override; - void startConsumerLoop(std::atomic & loop_started); - void startProducerLoop(); - void stop(); + + void stop() { stop_loop.store(true); } + void startBackgroundLoop(); void startLoop(); private: uv_loop_t * loop; Poco::Logger * log; - std::atomic stop_loop = false, running_loop = false; - + std::atomic stop_loop = false; std::timed_mutex starting_loop; - std::mutex mutex_before_event_loop; - std::mutex mutex_before_loop_stop; }; } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index dbb91bf19c4412b705f719a7de035bb7fe408ca2..78a8b3c69e3541b724d540d8528928dd04df2746 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -30,7 +30,7 @@ namespace ExchangeType static const String HEADERS = "headers"; } -static const auto QUEUE_SIZE = 50000; /// Equals capacity of single rabbitmq queue +static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, @@ -60,7 +60,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT) , local_hash_exchange(local_exchange + "_" + ExchangeType::HASH) , stopped(stopped_) - , messages(QUEUE_SIZE) + , messages(QUEUE_SIZE * num_queues) { exchange_type_set = exchange_type != ExchangeType::DEFAULT; @@ -125,7 +125,7 @@ void ReadBufferFromRabbitMQConsumer::initExchange() /* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which * will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only - * option to avoid getting the same messages more than once - is having only one consumer with one queue, which is not good.) + * option to avoid getting the same messages more than once - is having only one consumer with one queue) */ consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message) { @@ -243,7 +243,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) }); /* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because - * if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise + * if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise * consumer might fail to subscribe and no resubscription will help. */ subscribe(queues.back()); @@ -327,7 +327,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) */ while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error)) { - startEventLoop(loop_started); + startEventLoop(); } } @@ -378,7 +378,7 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() /// These variables are updated in a separate thread. while (count_subscribed != wait_subscribed && !consumer_error) { - startEventLoop(loop_started); + startEventLoop(); } LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed); @@ -395,15 +395,9 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() } -void ReadBufferFromRabbitMQConsumer::stopEventLoop() +void ReadBufferFromRabbitMQConsumer::startEventLoop() { - event_handler->stop(); -} - - -void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic & loop_started) -{ - event_handler->startConsumerLoop(loop_started); + event_handler->startLoop(); } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 21f18491ca47694aa462b9a4f36d7538d93b2440..c9452fb249d4968bc664d0e4b522cd5d02f210a8 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -44,8 +44,6 @@ public: auto getExchange() const { return exchange_name; } private: - using Messages = std::vector; - ChannelPtr consumer_channel; HandlerPtr event_handler; @@ -70,7 +68,7 @@ private: bool local_exchange_declared = false, local_hash_exchange_declared = false; bool exchange_type_set = false, hash_exchange = false; - std::atomic loop_started = false, consumer_error = false; + std::atomic consumer_error = false; std::atomic count_subscribed = 0, wait_subscribed; ConcurrentBoundedQueue messages; @@ -78,21 +76,12 @@ private: std::vector queues; std::unordered_map subscribed_queue; - /* Note: as all consumers share the same connection => they also share the same - * event loop, which can be started by any consumer and the loop is blocking only to the thread that - * started it, and the loop executes ALL active callbacks on the connection => in case num_consumers > 1, - * at most two threads will be present: main thread and the one that executes callbacks (1 thread if - * main thread is the one that started the loop). - */ - std::mutex mutex; - bool nextImpl() override; void initExchange(); void initQueueBindings(const size_t queue_id); void subscribe(const String & queue_name); - void startEventLoop(std::atomic & loop_started); - void stopEventLoop(); + void startEventLoop(); }; } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 8a704661882dcd28839f26c52a29349af1206c31..7083da56f9ae871b877cf2c0739cced408f352f3 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -138,7 +138,7 @@ void StorageRabbitMQ::heartbeatFunc() void StorageRabbitMQ::loopingFunc() { LOG_DEBUG(log, "Starting event looping iterations"); - event_handler->startLoop(); + event_handler->startBackgroundLoop(); } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index a4cdc09d4b902464391a2a17e7cc9d33907051b0..9b6bf39e6cd71b8c59cf0c9a4e6fa950c3646040 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -18,7 +18,7 @@ namespace ErrorCodes extern const int CANNOT_CONNECT_RABBITMQ; } -static const auto QUEUE_SIZE = 100000; +static const auto QUEUE_SIZE = 50000; static const auto CONNECT_SLEEP = 200; static const auto RETRIES_MAX = 1000; static const auto LOOP_WAIT = 10; @@ -47,7 +47,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , delim(delimiter) , max_rows(rows_per_message) , chunk_size(chunk_size_) - , payloads(QUEUE_SIZE) + , payloads(QUEUE_SIZE * num_queues) { loop = std::make_unique(); @@ -127,6 +127,7 @@ void WriteBufferToRabbitMQProducer::countRow() void WriteBufferToRabbitMQProducer::writingFunc() { String payload; + while (!stop_loop || !payloads.empty()) { while (!payloads.empty()) @@ -142,10 +143,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() { producer_channel->publish(exchange_name, routing_key, payload); } - - ++message_counter; } - startEventLoop(); } } @@ -183,7 +181,7 @@ void WriteBufferToRabbitMQProducer::finilizeProducer() if (use_transactional_channel) { - std::atomic answer_received = false; + std::atomic answer_received = false, wait_rollback = false; producer_channel->commitTransaction() .onSuccess([&]() { @@ -193,12 +191,22 @@ void WriteBufferToRabbitMQProducer::finilizeProducer() .onError([&](const char * message) { answer_received = true; - LOG_TRACE(log, "None of messages were publishd: {}", message); - /// Probably should do something here + wait_rollback = true; + LOG_TRACE(log, "Publishing not successful: {}", message); + producer_channel->rollbackTransaction() + .onSuccess([&]() + { + wait_rollback = false; + }) + .onError([&](const char * message) + { + LOG_ERROR(log, "Failed to rollback transaction: {}", message); + wait_rollback = false; + }); }); size_t count_retries = 0; - while (!answer_received && ++count_retries != RETRIES_MAX) + while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX) { startEventLoop(); std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT)); @@ -217,7 +225,7 @@ void WriteBufferToRabbitMQProducer::nextImpl() void WriteBufferToRabbitMQProducer::startEventLoop() { - event_handler->startProducerLoop(); + event_handler->startLoop(); } } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 2b16403fc448d52dc153352f75fcd2c9520d9c10..8339fc0abb36aecbe4ac52c926a8e7088836a121 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -14,8 +14,7 @@ namespace DB { -using ProducerPtr = std::shared_ptr; -using Messages = std::vector; +using ChannelPtr = std::shared_ptr; class WriteBufferToRabbitMQProducer : public WriteBuffer { @@ -60,7 +59,7 @@ private: std::unique_ptr loop; std::unique_ptr event_handler; std::unique_ptr connection; - ProducerPtr producer_channel; + ChannelPtr producer_channel; ConcurrentBoundedQueue payloads; size_t next_queue = 0;