提交 e57d1c82 编写于 作者: K kssenii

Better shutdown

上级 647cf571
......@@ -210,6 +210,28 @@ void StorageRabbitMQ::loopingFunc()
}
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
*/
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
{
if (task_mutex.try_lock())
{
if (stop_loop)
event_handler->updateLoopState(Loop::STOP);
task->deactivate();
task_mutex.unlock();
}
else if (wait)
{
/// Wait only if deactivating from shutdown
std::lock_guard lock(task_mutex);
task->deactivate();
}
}
void StorageRabbitMQ::initExchange()
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
......@@ -326,7 +348,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
if (reconnecting)
{
heartbeat_task->deactivate();
deactivateTask(heartbeat_task, 0, 0);
connection->close(); /// Connection might be unusable, but not closed
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
......@@ -346,7 +368,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
......@@ -504,11 +526,9 @@ void StorageRabbitMQ::shutdown()
stream_cancelled = true;
wait_confirm.store(false);
streaming_task->deactivate();
heartbeat_task->deactivate();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
deactivateTask(streaming_task, 1, 1);
deactivateTask(heartbeat_task, 1, 0);
deactivateTask(looping_task, 1, 1);
connection->close();
......@@ -695,14 +715,11 @@ bool StorageRabbitMQ::streamToViews()
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
*/
if (event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
}
deactivateTask(looping_task, 0, 1);
if (!event_handler->connectionRunning())
{
if (restoreConnection(true))
if (!stream_cancelled && restoreConnection(true))
{
for (auto & stream : streams)
stream->as<RabbitMQBlockInputStream>()->updateChannel();
......@@ -710,13 +727,13 @@ bool StorageRabbitMQ::streamToViews()
}
else
{
/// Reschedule if unable to connect to rabbitmq
/// Reschedule if unable to connect to rabbitmq or quit if cancelled
return false;
}
}
else
{
heartbeat_task->deactivate();
deactivateTask(heartbeat_task, 0, 0);
/// Commit
for (auto & stream : streams)
......
......@@ -101,7 +101,7 @@ private:
size_t num_created_consumers = 0;
Poco::Semaphore semaphore;
std::mutex mutex;
std::mutex mutex, task_mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
String unique_strbase;
......@@ -128,6 +128,7 @@ private:
AMQP::ExchangeType defineExchangeType(String exchange_type_);
size_t getMaxBlockSize();
String getTableBasedName(String name, const StorageID & table_id);
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void initExchange();
void bindExchange();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册