提交 972611e3 编写于 作者: K kssenii

Fix consumer

上级 56240661
......@@ -4,6 +4,13 @@
namespace DB
{
enum
{
Lock_timeout = 50,
Max_threads_to_pass = 10
};
RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) :
LibEventHandler(evbase_),
evbase(evbase_),
......@@ -16,10 +23,9 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
{
LOG_ERROR(log, "Library error report: {}", message);
if (connection->closed())
if (!connection->usable() || !connection->ready())
{
std::cerr << "Connection lost, no recovery is possible";
throw;
LOG_ERROR(log, "Connection lost completely");
}
stop();
......@@ -28,24 +34,42 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
void RabbitMQHandler::start(std::atomic<bool> & check_param)
{
/* The object of this class is shared between concurrent consumers, who call this method repeatedly at the same time.
* But the loop should not be attempted to start if it is already running. Also note that the loop is blocking to
* the thread that has started it.
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop). But the loop should not be attempted to start if it is already running.
*/
std::lock_guard lock(mutex);
/* The callback, which changes this variable, could have already been activated by another thread while we waited for the
* mutex to unlock (as it runs all active events on the connection). This means that there is no need to start event loop again.
*/
if (check_param)
return;
if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout)))
{
/* The callback, which changes this variable, could have already been activated by another thread while we waited
* for the mutex to unlock (as it runs all active events on the connection). This means that there is no need to
* start event loop again.
*/
if (!check_param)
{
event_base_loop(evbase, EVLOOP_NONBLOCK);
}
event_base_loop(evbase, EVLOOP_NONBLOCK);
mutex_before_event_loop.unlock();
}
else
{
if (++count_passed == Max_threads_to_pass)
{
/* Event loop is blocking to the thread that started it and it is not good to block one single thread as it loops
* untill there are no active events, but there can be too many of them for one thread to be blocked for so long.
*/
stop();
count_passed = 0;
}
}
}
void RabbitMQHandler::stop()
{
event_base_loopbreak(evbase);
if (mutex_before_loop_stop.try_lock_for(std::chrono::milliseconds(0)))
{
event_base_loopbreak(evbase);
mutex_before_loop_stop.unlock();
}
}
}
......@@ -26,7 +26,9 @@ private:
event_base * evbase;
Poco::Logger * log;
std::mutex mutex;
size_t count_passed = 0;
std::timed_mutex mutex_before_event_loop;
std::timed_mutex mutex_before_loop_stop;
};
}
......@@ -13,6 +13,11 @@
namespace DB
{
enum
{
Received_max_to_stop_loop = 10000 // Explained below
};
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & eventHandler_,
......@@ -117,7 +122,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
std::atomic<bool> bindings_created = false, bindings_error = false;
consumer_channel->declareQueue(AMQP::exclusive)
consumer_channel->declareQueue(AMQP::durable)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
......@@ -145,6 +150,12 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onSuccess([&]
{
bindings_created = true;
/// Unblock current thread so that it does not continue to execute all callbacks on the connection
if (++count_bound_queues == num_queues)
{
stopEventLoop();
}
})
.onError([&](const char * message)
{
......@@ -196,6 +207,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
consumer_created = true;
LOG_TRACE(log, "Consumer " + std::to_string(channel_id) + " is subscribed to queue " + queue_name);
/// Unblock current thread so that it does not continue to execute all callbacks on the connection
if (++count_subscribed == queues.size())
{
stopEventLoop();
}
})
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
{
......@@ -207,15 +224,34 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
if (row_delimiter != '\0')
message_received += row_delimiter;
//LOG_TRACE(log, "Consumer {} received a message", channel_id);
bool stop_loop = false;
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl() (below).
std::lock_guard lock(mutex);
received.push_back(message_received);
{
std::lock_guard lock(mutex);
received.push_back(message_received);
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
* executing all callbacks on the connection (not only its own), then there should be some point to unblock
*/
if (received.size() >= Received_max_to_stop_loop)
{
stop_loop = true;
}
}
if (stop_loop)
{
stopEventLoop();
}
}
})
.onError([&](const char * message)
{
consumer_error = true;
LOG_ERROR(log, "Consumer failed: {}", message);
LOG_ERROR(log, "Consumer {} failed: {}", channel_id, message);
});
while (!consumer_created && !consumer_error)
......@@ -226,6 +262,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
}
void ReadBufferFromRabbitMQConsumer::stopEventLoop()
{
eventHandler.stop();
}
void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic<bool> & check_param)
{
eventHandler.start(check_param);
......
......@@ -64,6 +64,8 @@ private:
Queues queues;
bool subscribed = false;
String current_exchange_name;
size_t count_subscribed = 0;
size_t count_bound_queues = 0;
Messages received;
Messages messages;
......@@ -77,6 +79,7 @@ private:
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void startEventLoop(std::atomic<bool> & check_param);
void stopEventLoop();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册