未验证 提交 2299930c 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #20545 from kssenii/rabbit-format-settings

rabbitmq: add missing format factory settings
......@@ -59,6 +59,21 @@ Optional parameters:
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
Also format settings can be added along with rabbitmq-related settings.
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5,
date_time_input_format = 'best_effort';
```
The RabbitMQ server configuration should be added using the ClickHouse config file.
......@@ -79,18 +94,6 @@ Additional configuration:
</rabbitmq>
```
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5;
```
## Description {#description}
`SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
......@@ -114,6 +117,7 @@ Exchange type options:
- `consistent_hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Setting `rabbitmq_queue_base` may be used for the following cases:
- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables.
- to reuse queues as they are declared durable and not auto-deleted. (Can be deleted via any of RabbitMQ CLI tools.)
......
......@@ -52,6 +52,21 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек.
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5,
date_time_input_format = 'best_effort';
```
Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла ClickHouse.
......@@ -72,18 +87,6 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
</rabbitmq>
```
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5;
```
## Описание {#description}
Запрос `SELECT` не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью [материализованных преставлений](../../../sql-reference/statements/create/view.md). Для этого:
......@@ -107,6 +110,7 @@ Example:
- `consistent_hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Настройка `rabbitmq_queue_base` может быть использована в следующих случаях:
1. чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в `rabbitmq_queue_base` настройку и не указывать настройки `rabbitmq_num_consumers` и `rabbitmq_num_queues`. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. По умолчанию, если настройка `rabbitmq_queue_base` не указана, будут использованы уникальные для каждой таблицы имена очередей.
2. чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: `rabbitmq_num_consumers`, `rabbitmq_num_queues`.
3. чтобы повторно использовать созданные c `durable` настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
......
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
namespace DB
{
class ASTStorage;
#define LIST_OF_RABBITMQ_SETTINGS(M) \
#define RABBITMQ_RELATED_SETTINGS(M) \
M(String, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \
M(String, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
M(String, rabbitmq_format, "", "The message format.", 0) \
......@@ -24,6 +25,10 @@ namespace DB
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \
FORMAT_FACTORY_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS)
struct RabbitMQSettings : public BaseSettings<RabbitMQSettingsTraits>
......
......@@ -200,6 +200,15 @@ std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) c
if (!schema_name.empty())
modified_context->setSetting("format_schema", schema_name);
for (const auto & setting : *rabbitmq_settings)
{
const auto & setting_name = setting.getName();
/// check for non-rabbitmq-related settings
if (!setting_name.starts_with("rabbitmq_"))
modified_context->setSetting(setting_name, setting.getValue());
}
return modified_context;
}
......
......@@ -1912,6 +1912,59 @@ def test_rabbitmq_no_connection_at_startup(rabbitmq_cluster):
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(120)
def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.format_settings (
id String, date DateTime
) ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'format_settings',
rabbitmq_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
message = json.dumps({"id":"format_settings_test","date":"2021-01-19T14:42:33.1829214Z"})
expected = instance.query('''SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))''')
channel.basic_publish(exchange='format_settings', routing_key='', body=message)
result = ''
while True:
result = instance.query('SELECT date FROM test.format_settings')
if result == expected:
break;
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (
id String, date DateTime
) ENGINE = MergeTree ORDER BY id;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.format_settings;
''')
channel.basic_publish(exchange='format_settings', routing_key='', body=message)
result = ''
while True:
result = instance.query('SELECT date FROM test.view')
if result == expected:
break;
connection.close()
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.format_settings;
''')
assert(result == expected)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册