提交 db03cd7d 编写于 作者: K kssenii

Much more optimal queues setup

上级 7beddcea
......@@ -51,7 +51,7 @@ Optional parameters:
- `rabbitmq_row_delimiter` – Delimiter character, which ends the message.
- `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` – The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_num_queues` – Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
......
......@@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `rabbitmq_row_delimiter` – символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` – количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` – количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_num_queues` – количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.
......
......@@ -17,33 +17,30 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, queue_base(queue_base_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, deadletter_exchange(deadletter_exchange_)
, log(log_)
......@@ -52,9 +49,6 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, stopped(stopped_)
, received(queue_size * num_queues)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);
setupChannel();
}
......@@ -65,67 +59,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
}
void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
{
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message),
queue_name, exchange_name);
});
};
auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
queue_settings["x-max-length"] = queue_size;
queue_settings["x-overflow"] = "reject-publish";
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created)
{
iterateEventLoop();
}
}
void ReadBufferFromRabbitMQConsumer::subscribe()
{
for (const auto & queue_name : queues)
......
......@@ -24,15 +24,14 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
......@@ -79,19 +78,17 @@ public:
private:
bool nextImpl() override;
void bindQueue(size_t queue_id);
void subscribe();
void iterateEventLoop();
ChannelPtr consumer_channel;
ChannelPtr setup_channel;
HandlerPtr event_handler;
const String exchange_name;
std::vector<String> queues;
const String channel_base;
const size_t channel_id_base;
const String queue_base;
const bool hash_exchange;
const size_t num_queues;
const String deadletter_exchange;
Poco::Logger * log;
......@@ -102,7 +99,6 @@ private:
String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
std::vector<String> queues;
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;
......
......@@ -52,6 +52,7 @@ namespace ErrorCodes
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}
namespace ExchangeType
......@@ -385,6 +386,67 @@ void StorageRabbitMQ::bindExchange()
}
void StorageRabbitMQ::bindQueue(size_t queue_id)
{
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
});
};
auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
queue_settings["x-max-length"] = queue_size;
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
else
queue_settings["x-overflow"] = "reject-publish";
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created)
{
event_handler->iterateLoop();
}
}
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
size_t cnt_retries = 0;
......@@ -444,6 +506,7 @@ void StorageRabbitMQ::unbindExchange()
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
......@@ -458,6 +521,8 @@ void StorageRabbitMQ::unbindExchange()
{
event_handler->iterateLoop();
}
setup_channel->close();
});
}
......@@ -536,6 +601,13 @@ void StorageRabbitMQ::startup()
initExchange();
bindExchange();
for (size_t i = 1; i <= num_queues; ++i)
{
bindQueue(i);
}
setup_channel->close();
for (size_t i = 0; i < num_consumers; ++i)
{
try
......@@ -617,8 +689,8 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id,
unique_strbase, queue_base, log, row_delimiter, hash_exchange, num_queues,
consumer_channel, event_handler, consumer_exchange, queues, ++consumer_id,
unique_strbase, queue_base, log, row_delimiter, num_queues,
deadletter_exchange, queue_size, stream_cancelled);
}
......@@ -665,6 +737,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
try
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
......@@ -757,7 +830,6 @@ bool StorageRabbitMQ::streamToViews()
std::atomic<bool> stub = {false};
/// Loop could run untill this point only if select query was made
if (!event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::RUN);
......@@ -831,7 +903,7 @@ bool StorageRabbitMQ::streamToViews()
}
}
if ((queue_empty == num_queues) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
{
connection->heartbeat();
read_attempts = 0;
......
......@@ -114,6 +114,7 @@ private:
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false;
ChannelPtr setup_channel;
std::vector<String> queues;
std::once_flag flag; /// remove exchange only once
std::mutex task_mutex;
......@@ -140,6 +141,7 @@ private:
void initExchange();
void bindExchange();
void bindQueue(size_t queue_id);
bool restoreConnection(bool reconnecting);
bool streamToViews();
......
......@@ -537,14 +537,14 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
NUM_CONSUMERS = 10
NUM_QUEUES = 2
NUM_QUEUES = 10
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'test_sharding',
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_num_consumers = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
......@@ -617,7 +617,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
rabbitmq_exchange_name = 'combo',
rabbitmq_queue_base = 'combo',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 5,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
......@@ -879,7 +879,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_queue_base = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 5,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_max_block_size = 10000,
rabbitmq_routing_key_list = 'over',
rabbitmq_format = 'TSV',
......@@ -1722,7 +1722,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_num_consumers = 10,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册