提交 0ee54c8a 编写于 作者: K kssenii

Fix build, async acks -> sync acks, fix tests

上级 92efb847
......@@ -124,11 +124,15 @@ Block RabbitMQBlockInputStream::readImpl()
auto new_rows = read_rabbitmq_message();
if (new_rows)
{
auto exchange_name = storage.getExchange();
auto consumer_tag = buffer->getConsumerTag();
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
buffer->updateNextDeliveryTag(delivery_tag);
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(exchange_name);
......@@ -137,10 +141,9 @@ Block RabbitMQBlockInputStream::readImpl()
virtual_columns[3]->insert(redelivered);
}
if (delivery_tag > last_inserted_delivery_tag)
last_inserted_delivery_tag = delivery_tag;
total_rows = total_rows + new_rows;
}
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
......@@ -167,7 +170,7 @@ void RabbitMQBlockInputStream::readSuffixImpl()
if (!buffer)
return;
buffer->ackMessages(last_inserted_delivery_tag);
buffer->ackMessages();
}
}
......@@ -39,7 +39,6 @@ private:
const Block virtual_header;
ConsumerBufferPtr buffer;
UInt64 last_inserted_delivery_tag;
};
}
......@@ -14,7 +14,7 @@
namespace DB
{
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
static const auto QUEUE_SIZE = 50000;
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
......@@ -57,10 +57,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
if (ack.load() && max_tag && consumer_channel)
consumer_channel->ack(max_tag, AMQP::multiple);
consumer_channel->close();
received.clear();
BufferBase::set(nullptr, 0, 0);
}
......@@ -68,7 +65,7 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
bool bindings_created = false, bindings_error = false;
std::atomic<bool> bindings_created = false, bindings_error = false;
auto success_callback = [&](const std::string & queue_name_, int msgcount, int /* consumercount */)
{
......@@ -220,13 +217,6 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
message_received += row_delimiter;
received.push({deliveryTag, message_received, redelivered});
std::lock_guard lock(wait_ack);
if (ack.exchange(false) && prev_tag && prev_tag <= max_tag && consumer_channel)
{
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag);
}
}
})
.onError([&](const char * message)
......@@ -239,7 +229,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
void ReadBufferFromRabbitMQConsumer::checkSubscription()
{
if (count_subscribed == num_queues)
if (count_subscribed == num_queues || !consumer_channel->usable())
return;
wait_subscribed = num_queues;
......@@ -264,13 +254,14 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
}
void ReadBufferFromRabbitMQConsumer::ackMessages(UInt64 last_inserted_delivery_tag)
void ReadBufferFromRabbitMQConsumer::ackMessages()
{
if (last_inserted_delivery_tag > prev_tag)
UInt64 delivery_tag = last_inserted_delivery_tag;
if (delivery_tag && delivery_tag > prev_tag)
{
std::lock_guard lock(wait_ack); /// See onReceived() callback.
prev_tag = last_inserted_delivery_tag;
ack.store(true);
prev_tag = delivery_tag;
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag);
}
}
......@@ -291,7 +282,6 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
max_tag = current.delivery_tag;
return true;
}
......
......@@ -50,7 +50,8 @@ public:
void allowNext() { allowed = true; } // Allow to read next message.
void checkSubscription();
void ackMessages(UInt64 last_inserted_delivery_tag);
void updateNextDeliveryTag(UInt64 delivery_tag) { last_inserted_delivery_tag = delivery_tag; }
void ackMessages();
auto getConsumerTag() const { return consumer_tag; }
auto getDeliveryTag() const { return current.delivery_tag; }
......@@ -80,18 +81,16 @@ private:
String consumer_tag;
ConcurrentBoundedQueue<MessageData> received;
UInt64 prev_tag = 0, max_tag = 0;
UInt64 last_inserted_delivery_tag = 0, prev_tag = 0;
MessageData current;
std::vector<String> queues;
std::unordered_map<String, bool> subscribed_queue;
std::atomic<bool> ack = false;
std::mutex wait_ack;
bool nextImpl() override;
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void iterateEventLoop();
};
}
......@@ -284,9 +284,6 @@ void StorageRabbitMQ::unbindExchange()
{
std::call_once(flag, [&]()
{
if (exchange_removed.load())
return;
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
......
......@@ -150,6 +150,7 @@ void WriteBufferToRabbitMQProducer::countRow()
void WriteBufferToRabbitMQProducer::writingFunc()
{
String payload;
current = 0;
auto returned_callback = [&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册