提交 053f31cb 编写于 作者: K kssenii

Better confirmListener

上级 d5b1332b
......@@ -13,11 +13,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
}
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 20;
static const auto BATCH = 512;
......@@ -133,23 +128,29 @@ bool WriteBufferToRabbitMQProducer::setupConnection()
void WriteBufferToRabbitMQProducer::setupChannel()
{
producer_channel = std::make_unique<AMQP::TcpChannel>(connection.get());
producer_channel->onError([&](const char * message)
{
LOG_DEBUG(log, "Producer error: {}. Currently {} messages have not been confirmed yet, {} messages are waiting to be published",
message, delivery_tags_record.size(), payloads.size());
LOG_ERROR(log, "Producer error: {}", message);
/// Means channel ends up in an error state and is not usable anymore.
producer_channel->close();
for (auto record = delivery_record.begin(); record != delivery_record.end(); record++)
returned.tryPush(record->second);
LOG_DEBUG(log, "Currently {} messages have not been confirmed yet, {} waiting to be published, {} will be republished",
delivery_record.size(), payloads.size(), returned.size());
/// Delivery tags are scoped per channel.
delivery_record.clear();
delivery_tag = 0;
});
producer_channel->onReady([&]()
{
LOG_DEBUG(log, "Producer channel is ready");
/// Delivery tags are scoped per channel.
delivery_tags_record.clear();
delivery_tag = 0;
if (use_transactional_channel)
{
producer_channel->startTransaction();
......@@ -157,56 +158,76 @@ void WriteBufferToRabbitMQProducer::setupChannel()
else
{
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails,
* it will be requed in returned_callback. If persistent == false, message is confirmed the moment it is enqueued. If fails, it
* is not requeued. First option is two times slower than the second, so default is second and the first is turned on in table
* setting. Persistent message is not requeued if it is unroutable, i.e. no queues are bound to given exchange with the given
* routing key - this is a responsibility of a client. It can be requeued in this case if AMQP::mandatory is set, but pointless.
* onNack() is received. If persistent == false, message is confirmed the moment it is enqueued. First option is two times
* slower than the second, so default is second and the first is turned on in table setting.
*
* "Publisher confirms" are implemented similar to strategy#3 here https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
*/
producer_channel->confirmSelect()
.onAck([&](uint64_t acked_delivery_tag, bool multiple)
{
removeConfirmed(acked_delivery_tag, multiple);
removeConfirmed(acked_delivery_tag, multiple, false);
})
.onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */)
{
if (!persistent)
removeConfirmed(nacked_delivery_tag, multiple);
removeConfirmed(nacked_delivery_tag, multiple, true);
});
}
});
}
void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple)
void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish)
{
/// Same as here https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
std::lock_guard lock(mutex);
auto found_tag_pos = delivery_tags_record.find(received_delivery_tag);
if (found_tag_pos != delivery_tags_record.end())
auto record_iter = delivery_record.find(received_delivery_tag);
if (record_iter != delivery_record.end())
{
/// If multiple is true, then all delivery tags up to and including current are confirmed.
if (multiple)
{
++found_tag_pos;
delivery_tags_record.erase(delivery_tags_record.begin(), found_tag_pos);
/// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack).
++record_iter;
if (republish)
for (auto record = delivery_record.begin(); record != record_iter; ++record)
returned.tryPush(record->second);
/// Delete the records even in case when republished because new delivery tags will be assigned by the server.
delivery_record.erase(delivery_record.begin(), record_iter);
//LOG_DEBUG(log, "Confirmed all delivery tags up to {}", received_delivery_tag);
}
else
{
delivery_tags_record.erase(found_tag_pos);
if (republish)
returned.tryPush(record_iter->second);
delivery_record.erase(record_iter);
//LOG_DEBUG(log, "Confirmed delivery tag {}", received_delivery_tag);
}
}
/// else is theoretically not possible
}
void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<String> & messages)
void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<String> & messages, bool republishing)
{
String payload;
while (!messages.empty())
while (!messages.empty() && producer_channel->usable())
{
messages.pop(payload);
AMQP::Envelope envelope(payload.data(), payload.size());
AMQP::Table message_settings = key_arguments;
/* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the
* server, then it means that publisher will never now whether those messages were delivered or not, and therefore those records
* that received no ack/nack before connection loss will be republished, so there might be duplicates. To let consumer know that
* received message might be a possible duplicate - a "republished" field is added to message metadata.
*/
message_settings["republished"] = std::to_string(republishing);
envelope.setHeaders(message_settings);
/// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse.
if (persistent)
......@@ -214,79 +235,45 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<String> & mes
if (exchange_type == AMQP::ExchangeType::consistent_hash)
{
producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope).onReturned(returned_callback);
producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope);
}
else if (exchange_type == AMQP::ExchangeType::headers)
{
envelope.setHeaders(key_arguments);
producer_channel->publish(exchange_name, "", envelope).onReturned(returned_callback);
producer_channel->publish(exchange_name, "", envelope);
}
else
{
producer_channel->publish(exchange_name, routing_keys[0], envelope).onReturned(returned_callback);
producer_channel->publish(exchange_name, routing_keys[0], envelope);
}
if (producer_channel->usable())
{
++delivery_tag;
delivery_tags_record.insert(delivery_tags_record.end(), delivery_tag);
++delivery_tag;
delivery_record.insert(delivery_record.end(), {delivery_tag, payload});
if (delivery_tag % BATCH == 0)
break;
}
else
{
/// Need to break to let event loop run, because no publishing actually happend before looping.
if (delivery_tag % BATCH == 0)
break;
}
}
iterateEventLoop();
}
/* Currently implemented “asynchronous publisher confirms” - does not stop after each publish to wait for each individual confirm. An
* asynchronous publisher may have any number of messages in-flight (unconfirmed) at a time.
* Synchronous publishing is where after each publish need to wait for the acknowledgement (ack/nack - see confirmSelect() in channel
* declaration), which is very slow because takes starting event loop and waiting for corresponding callback - can really take a while.
*
* Async publishing works well in all failure cases except for connection failure, because if connection fails - not all Ack/Nack might be
* receieved from the server (and even if all messages were successfully delivered, publisher will not be able to know it). Also in this
* case onReturned callback will not be received, so loss is possible for messages that were published but have not received confirm from
* server before connection loss, because then publisher won't know if message was delivered or not.
*
* To make it a delivery with no loss and minimal possible amount of duplicates - need to use synchronous publishing (which is too slow).
* With async publishing at-least-once delivery is achieved with (batch) publishing and manual republishing in case when not all delivery
* tags were confirmed (ack/nack) before connection loss. Here the maximum number of possible duplicates is no more than batch size.
* (Manual last batch republishing is only for case of connection loss, in all other failure cases - onReturned callback will be received.)
*
* So currently implemented async batch publishing, but for now without manual republishing (because still in doubt how to do it nicely,
* but current idea is to store in delivery_tags_record not just delivery tags, but pair: (delivery_tag, message). As currently once the
* publisher receives acknowledgement from the server that the message was sucessfully delivered - a "confirmListener" will delete its
* delivery tag from the set of pending acknowledgemens, then we can as well delete the payload. If connection fails, undeleted delivery
* tags indicate messages, whose fate is unknown, so corresponding payloads should be republished.)
*/
void WriteBufferToRabbitMQProducer::writingFunc()
{
returned_callback = [&](const AMQP::Message & message, int16_t code, const std::string & description)
{
returned.tryPush(std::string(message.body(), message.size()));
LOG_DEBUG(log, "Message returned with code: {}, description: {}. Republishing", code, description);
/* Here can be added a value to AMQP::Table field of AMQP::Envelope (and then it should be queue<AMQP::Envelope> instead of
* queue<String>) - to indicate that message was republished. Later a consumer will be able to extract this field and understand
* that this message was republished and can probably be a duplicate (as RabbitMQ does not guarantee exactly-once delivery).
*/
};
while (!payloads.empty() || wait_all)
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned.queue never grows too big
* and returned messages are republished as fast as possible. Also payloads.queue is fixed size and push attemt would block thread
* in countRow() once there is no space - that is intended.
*/
if (!returned.empty() && producer_channel->usable())
publish(returned);
else if (!payloads.empty() && delivery_tags_record.empty() && producer_channel->usable())
publish(payloads);
publish(returned, true);
else if (!payloads.empty() && producer_channel->usable())
publish(payloads, false);
iterateEventLoop();
if (wait_num.load() && delivery_tags_record.empty() && payloads.empty())
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all.store(false);
else if ((!producer_channel->usable() && connection->usable()) || (!connection->usable() && setupConnection()))
setupChannel();
......
......@@ -45,8 +45,8 @@ private:
void writingFunc();
bool setupConnection();
void setupChannel();
void removeConfirmed(UInt64 received_delivery_tag, bool multiple);
void publish(ConcurrentBoundedQueue<String> & message);
void removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish);
void publish(ConcurrentBoundedQueue<String> & message, bool republishing);
std::pair<String, UInt16> parsed_address;
const std::pair<String, String> login_password;
......@@ -68,10 +68,8 @@ private:
UInt64 delivery_tag = 0;
std::atomic<bool> wait_all = true;
std::atomic<UInt64> wait_num = 0;
std::set<UInt64> delivery_tags_record;
std::mutex mutex;
UInt64 payload_counter = 0;
std::function<void(const AMQP::Message &, int16_t, const std::string &)> returned_callback;
std::map<UInt64, String> delivery_record;
Poco::Logger * log;
const std::optional<char> delim;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册