From 972611e31b3c6f1ad18f94898372590eafd8e509 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 4 Jun 2020 06:22:53 +0000 Subject: [PATCH] Fix consumer --- src/Storages/RabbitMQ/RabbitMQHandler.cpp | 54 +++++++++++++------ src/Storages/RabbitMQ/RabbitMQHandler.h | 4 +- .../ReadBufferFromRabbitMQConsumer.cpp | 50 +++++++++++++++-- .../RabbitMQ/ReadBufferFromRabbitMQConsumer.h | 3 ++ 4 files changed, 91 insertions(+), 20 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 775db87a1f..1a3ede7942 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -4,6 +4,13 @@ namespace DB { +enum +{ + Lock_timeout = 50, + Max_threads_to_pass = 10 +}; + + RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) : LibEventHandler(evbase_), evbase(evbase_), @@ -16,10 +23,9 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes { LOG_ERROR(log, "Library error report: {}", message); - if (connection->closed()) + if (!connection->usable() || !connection->ready()) { - std::cerr << "Connection lost, no recovery is possible"; - throw; + LOG_ERROR(log, "Connection lost completely"); } stop(); @@ -28,24 +34,42 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes void RabbitMQHandler::start(std::atomic & check_param) { - /* The object of this class is shared between concurrent consumers, who call this method repeatedly at the same time. - * But the loop should not be attempted to start if it is already running. Also note that the loop is blocking to - * the thread that has started it. + /* The object of this class is shared between concurrent consumers (who share the same connection == share the same + * event loop). But the loop should not be attempted to start if it is already running. */ - std::lock_guard lock(mutex); - - /* The callback, which changes this variable, could have already been activated by another thread while we waited for the - * mutex to unlock (as it runs all active events on the connection). This means that there is no need to start event loop again. - */ - if (check_param) - return; + if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout))) + { + /* The callback, which changes this variable, could have already been activated by another thread while we waited + * for the mutex to unlock (as it runs all active events on the connection). This means that there is no need to + * start event loop again. + */ + if (!check_param) + { + event_base_loop(evbase, EVLOOP_NONBLOCK); + } - event_base_loop(evbase, EVLOOP_NONBLOCK); + mutex_before_event_loop.unlock(); + } + else + { + if (++count_passed == Max_threads_to_pass) + { + /* Event loop is blocking to the thread that started it and it is not good to block one single thread as it loops + * untill there are no active events, but there can be too many of them for one thread to be blocked for so long. + */ + stop(); + count_passed = 0; + } + } } void RabbitMQHandler::stop() { - event_base_loopbreak(evbase); + if (mutex_before_loop_stop.try_lock_for(std::chrono::milliseconds(0))) + { + event_base_loopbreak(evbase); + mutex_before_loop_stop.unlock(); + } } } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 117f80d26f..39fccd4dac 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -26,7 +26,9 @@ private: event_base * evbase; Poco::Logger * log; - std::mutex mutex; + size_t count_passed = 0; + std::timed_mutex mutex_before_event_loop; + std::timed_mutex mutex_before_loop_stop; }; } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 27c5ab800f..f8259ce8c4 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -13,6 +13,11 @@ namespace DB { +enum +{ + Received_max_to_stop_loop = 10000 // Explained below +}; + ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, RabbitMQHandler & eventHandler_, @@ -117,7 +122,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) std::atomic bindings_created = false, bindings_error = false; - consumer_channel->declareQueue(AMQP::exclusive) + consumer_channel->declareQueue(AMQP::durable) .onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */) { queues.emplace_back(queue_name_); @@ -145,6 +150,12 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) .onSuccess([&] { bindings_created = true; + + /// Unblock current thread so that it does not continue to execute all callbacks on the connection + if (++count_bound_queues == num_queues) + { + stopEventLoop(); + } }) .onError([&](const char * message) { @@ -196,6 +207,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) consumer_created = true; LOG_TRACE(log, "Consumer " + std::to_string(channel_id) + " is subscribed to queue " + queue_name); + + /// Unblock current thread so that it does not continue to execute all callbacks on the connection + if (++count_subscribed == queues.size()) + { + stopEventLoop(); + } }) .onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */) { @@ -207,15 +224,34 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) if (row_delimiter != '\0') message_received += row_delimiter; + //LOG_TRACE(log, "Consumer {} received a message", channel_id); + + bool stop_loop = false; + /// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl() (below). - std::lock_guard lock(mutex); - received.push_back(message_received); + { + std::lock_guard lock(mutex); + received.push_back(message_received); + + /* As event loop is blocking to the thread that started it and a single thread should not be blocked while + * executing all callbacks on the connection (not only its own), then there should be some point to unblock + */ + if (received.size() >= Received_max_to_stop_loop) + { + stop_loop = true; + } + } + + if (stop_loop) + { + stopEventLoop(); + } } }) .onError([&](const char * message) { consumer_error = true; - LOG_ERROR(log, "Consumer failed: {}", message); + LOG_ERROR(log, "Consumer {} failed: {}", channel_id, message); }); while (!consumer_created && !consumer_error) @@ -226,6 +262,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) } +void ReadBufferFromRabbitMQConsumer::stopEventLoop() +{ + eventHandler.stop(); +} + + void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic & check_param) { eventHandler.start(check_param); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 31babc5033..55adb39bdc 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -64,6 +64,8 @@ private: Queues queues; bool subscribed = false; String current_exchange_name; + size_t count_subscribed = 0; + size_t count_bound_queues = 0; Messages received; Messages messages; @@ -77,6 +79,7 @@ private: void initQueueBindings(const size_t queue_id); void subscribe(const String & queue_name); void startEventLoop(std::atomic & check_param); + void stopEventLoop(); }; } -- GitLab