diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index 414290168986ddf8c1675d9bc0318bc47ede9c94..3e8b34f7f41963a3a42715c4e54829bee29cefc5 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -27,9 +27,15 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] [rabbitmq_exchange_type = 'exchange_type',] [rabbitmq_routing_key_list = 'key1,key2,...',] [rabbitmq_row_delimiter = 'delimiter_symbol',] + [rabbitmq_schema = '',] [rabbitmq_num_consumers = N,] [rabbitmq_num_queues = N,] - [rabbitmq_transactional_channel = 0] + [rabbitmq_queue_base = 'queue',] + [rabbitmq_deadletter_exchange = 'dl-exchange',] + [rabbitmq_persistent = 0,] + [rabbitmq_skip_broken_messages = N,] + [rabbitmq_max_block_size = N,] + [rabbitmq_flush_interval_ms = N] ``` Required parameters: @@ -43,12 +49,15 @@ Optional parameters: - `rabbitmq_exchange_type` – The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`. - `rabbitmq_routing_key_list` – A comma-separated list of routing keys. - `rabbitmq_row_delimiter` – Delimiter character, which ends the message. +- `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. - `rabbitmq_num_queues` – The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient. -- `rabbitmq_transactional_channel` – Wrap insert queries in transactions. Default: `0`. -- `rabbitmq_queue_base` - Specify a base name for queues that will be declared. +- `rabbitmq_queue_base` - Specify a base name for queues that will be declared. By default, queues are declared unique to tables based on db and table names. - `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified. - `persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`. +- `rabbitmq_skip_broken_messages` – RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data). +- `rabbitmq_max_block_size` +- `rabbitmq_flush_interval_ms` Required configuration: @@ -96,16 +105,18 @@ Exchange type options: - `consistent-hash` - Data is evenly distributed between all bound tables (where exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`. Setting `rabbitmq_queue_base` may be used for the following cases: -- to be able to restore reading from certain durable queues when not all messages were successfully consumed. Note: it makes sence only if messages are sent with delivery mode 2 - marked 'persistent', durable. To be able to resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To be able to resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. +- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same. +- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To be able to resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To be able to resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. Note: it makes sence only if messages are sent with delivery mode 2 - marked 'persistent', durable. - to reuse queues as they are declared durable and not auto-deleted. -- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same. + +To improve performance, received messages are grouped into blocks the size of [max\_insert\_block\_size](../../../operations/server-configuration-parameters/settings.md#settings-max_insert_block_size). If the block wasn’t formed within [stream\_flush\_interval\_ms](../../../operations/server-configuration-parameters/settings.md) milliseconds, the data will be flushed to the table regardless of the completeness of the block. If `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings are specified along with `rabbitmq_exchange_type`, then: - `rabbitmq-consistent-hash-exchange` plugin must be enabled. - `message_id` property of the published messages must be specified (unique for each message/batch). -For insert query there is message metadata, which is added for each published message: messageID and republished flag - can be accessed via message headers. +For insert query there is message metadata, which is added for each published message: `messageID` and `republished` flag (true, if published more than once) - can be accessed via message headers. Do not use the same table for inserts and materialized views. @@ -134,6 +145,7 @@ Example: ## Virtual Columns {#virtual-columns} - `_exchange_name` - RabbitMQ exchange name. -- `_consumer_tag` - ConsumerTag of the consumer that received the message. -- `_delivery_tag` - DeliveryTag if the message. Scoped per consumer. +- `_channel_id` - ChannelID, on which consumer, who received the message, was declared. +- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel. - `_redelivered` - Redelivered flag of the message. +- `_message_id` - messageID of the received message; non-empty if was set, when message was published.