diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 1b48232aa52f7d20dcbb36f3a05995baee0f9543..82cb3f2311d62903ab1def1f8e9bebd389e47863 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -150,7 +150,7 @@ void WriteBufferToRabbitMQProducer::countRow() void WriteBufferToRabbitMQProducer::writingFunc() { String payload; - current = 0; + UInt64 message_id = 0; auto returned_callback = [&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */) { @@ -164,7 +164,10 @@ void WriteBufferToRabbitMQProducer::writingFunc() { payloads.pop(payload); AMQP::Envelope envelope(payload.data(), payload.size()); - current = wait_num ? ++current % wait_num : ++current; + + ++message_id; + if (wait_num) + message_id %= wait_num; /// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse. if (persistent) @@ -172,7 +175,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() if (exchange_type == AMQP::ExchangeType::consistent_hash) { - producer_channel->publish(exchange_name, std::to_string(current), envelope).onReturned(returned_callback); + producer_channel->publish(exchange_name, std::to_string(message_id), envelope).onReturned(returned_callback); } else if (exchange_type == AMQP::ExchangeType::headers) { @@ -184,7 +187,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() producer_channel->publish(exchange_name, routing_keys[0], envelope).onReturned(returned_callback); } - if (current % BATCH == 0) + if (message_id % BATCH == 0) iterateEventLoop(); } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index ebeb21075bf533d612a696363afd2eebfcc9867d..30e647af47126e81c470a88ff8eaf5dbd8e5b7f3 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -64,7 +64,7 @@ private: ChannelPtr producer_channel; ConcurrentBoundedQueue payloads; - UInt64 delivery_tag = 0, current = 0; + UInt64 delivery_tag = 0; std::atomic wait_all = true; std::atomic wait_num = 0, last_processed = 0;