提交 36eb2c30 编写于 作者: A alesapin

Simplify code around locks

上级 d5847d29
......@@ -39,14 +39,11 @@ void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler). But the loop should not be attempted to start if it is already running.
*/
if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout)))
{
loop_started.store(true);
stop_scheduled = false;
std::lock_guard lock(mutex_before_event_loop);
loop_started.store(true);
stop_scheduled = false;
uv_run(loop, UV_RUN_NOWAIT);
mutex_before_event_loop.unlock();
}
uv_run(loop, UV_RUN_NOWAIT);
}
......@@ -58,11 +55,8 @@ void RabbitMQHandler::startProducerLoop()
void RabbitMQHandler::stop()
{
if (mutex_before_loop_stop.try_lock())
{
uv_stop(loop);
mutex_before_loop_stop.unlock();
}
std::lock_guard lock(mutex_before_loop_stop);
uv_stop(loop);
}
......
......@@ -31,7 +31,7 @@ private:
timeval tv;
std::atomic<bool> stop_scheduled = false;
std::timed_mutex mutex_before_event_loop;
std::mutex mutex_before_event_loop;
std::mutex mutex_before_loop_stop;
};
......
......@@ -46,7 +46,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, eventHandler(eventHandler_)
, event_handler(eventHandler_)
, exchange_name(exchange_name_)
, routing_keys(routing_keys_)
, channel_id(channel_id_)
......@@ -372,7 +372,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
*/
if (!loop_started.load() && !eventHandler.checkStopIsScheduled())
if (!loop_started.load() && !event_handler.checkStopIsScheduled())
{
stopEventLoopWithTimeout();
}
......@@ -415,19 +415,19 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
void ReadBufferFromRabbitMQConsumer::stopEventLoop()
{
eventHandler.stop();
event_handler.stop();
}
void ReadBufferFromRabbitMQConsumer::stopEventLoopWithTimeout()
{
eventHandler.stopWithTimeout();
event_handler.stopWithTimeout();
}
void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic<bool> & loop_started)
{
eventHandler.startConsumerLoop(loop_started);
event_handler.startConsumerLoop(loop_started);
}
......
......@@ -23,7 +23,7 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & eventHandler_,
RabbitMQHandler & event_handler_,
const String & exchange_name_,
const Names & routing_keys_,
const size_t channel_id_,
......@@ -46,7 +46,7 @@ private:
using Messages = std::vector<String>;
ChannelPtr consumer_channel;
RabbitMQHandler & eventHandler;
RabbitMQHandler & event_handler;
const String & exchange_name;
const Names & routing_keys;
......
......@@ -485,6 +485,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
while True:
result = instance.query('SELECT count() FROM test.view')
print("Result", result, "Expected", batch_messages * rabbitmq_messages)
if int(result) == batch_messages * rabbitmq_messages:
break
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册