rabbitmq.md 8.9 KB
Newer Older
1 2 3 4
---
toc_priority: 6
toc_title: RabbitMQ
---
5 6

# RabbitMQ Engine {#rabbitmq-engine}
7 8 9

This engine allows integrating ClickHouse with [RabbitMQ](https://www.rabbitmq.com).

E
Evgeniia Sudarikova 已提交
10
`RabbitMQ` lets you:
11

12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
-   Publish or subscribe to data flows.
-   Process streams as they become available.

## Creating a Table {#table_engine-rabbitmq-creating-a-table}

``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port',
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_row_delimiter = 'delimiter_symbol',]
K
kssenii 已提交
30
    [rabbitmq_schema = '',]
31 32
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
K
kssenii 已提交
33 34 35 36 37 38
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_deadletter_exchange = 'dl-exchange',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N]
39 40 41 42 43 44 45 46 47 48
```

Required parameters:

-   `rabbitmq_host_port` – host:port (for example, `localhost:5672`).
-   `rabbitmq_exchange_name` – RabbitMQ exchange name.
-   `rabbitmq_format` – Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.

Optional parameters:

K
kssenii 已提交
49
-   `rabbitmq_exchange_type` – The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`.
50 51
-   `rabbitmq_routing_key_list` – A comma-separated list of routing keys.
-   `rabbitmq_row_delimiter` – Delimiter character, which ends the message.
K
kssenii 已提交
52
-   `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
53
-   `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
K
kssenii 已提交
54
-   `rabbitmq_num_queues` – Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
K
kssenii 已提交
55
-   `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
K
kssenii 已提交
56
-   `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
K
kssenii 已提交
57
-   `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
K
kssenii 已提交
58 59 60
-   `rabbitmq_skip_broken_messages` – RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
-   `rabbitmq_max_block_size`
-   `rabbitmq_flush_interval_ms`
61

K
kssenii 已提交
62 63
Also FormatFactory settings can be added along with rabbitmq-related settings.

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
Required configuration:

The RabbitMQ server configuration should be added using the ClickHouse config file.

``` xml
 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>
```

Example:

``` sql
  CREATE TABLE queue (
    key UInt64,
K
kssenii 已提交
80 81
    value UInt64,
    date DateTime
82 83 84
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
K
kssenii 已提交
85 86
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';
87 88 89 90
```

## Description {#description}

91
`SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
92 93 94 95 96 97 98 99 100 101 102 103

1.  Use the engine to create a RabbitMQ consumer and consider it a data stream.
2.  Create a table with the desired structure.
3.  Create a materialized view that converts data from the engine and puts it into a previously created table.

When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from RabbitMQ and convert them to the required format using `SELECT`.
One RabbitMQ table can have as many materialized views as you like.

Data can be channeled based on `rabbitmq_exchange_type` and the specified `rabbitmq_routing_key_list`.
There can be no more than one exchange per table. One exchange can be shared between multiple tables - it enables routing into multiple tables at the same time.

Exchange type options:
104

E
Evgeniia Sudarikova 已提交
105
-   `direct` - Routing is based on the exact matching of keys. Example table key list: `key1,key2,key3,key4,key5`, message key can equal any of them.
106 107 108
-   `fanout` - Routing to all tables (where exchange name is the same) regardless of the keys.
-   `topic` - Routing is based on patterns with dot-separated keys. Examples: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
-   `headers` - Routing is based on `key=value` matches with a setting `x-match=all` or `x-match=any`. Example table key list: `x-match=all,format=logs,type=report,year=2020`.
K
kssenii 已提交
109
-   `consistent_hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
110

K
kssenii 已提交
111
Setting `rabbitmq_queue_base` may be used for the following cases:
K
kssenii 已提交
112

K
kssenii 已提交
113
-   to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same.
K
kssenii 已提交
114 115
-   to be able to restore reading from certain durable queues when not all messages were successfully consumed. To resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables.
-   to reuse queues as they are declared durable and not auto-deleted. (Can be deleted via any of RabbitMQ CLI tools.)
K
kssenii 已提交
116

A
Alexey Milovidov 已提交
117
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/server-configuration-parameters/settings.md#settings-max_insert_block_size). If the block wasn’t formed within [stream_flush_interval_ms](../../../operations/server-configuration-parameters/settings.md) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
K
kssenii 已提交
118

K
kssenii 已提交
119
If `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings are specified along with `rabbitmq_exchange_type`, then:
120

121 122 123
-   `rabbitmq-consistent-hash-exchange` plugin must be enabled.
-   `message_id` property of the published messages must be specified (unique for each message/batch).

K
kssenii 已提交
124
For insert query there is message metadata, which is added for each published message: `messageID` and `republished` flag (true, if published more than once) - can be accessed via message headers.
K
kssenii 已提交
125

K
kssenii 已提交
126 127
Do not use the same table for inserts and materialized views.

128 129 130 131 132 133 134 135 136 137 138 139 140 141
Example:

``` sql
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
K
kssenii 已提交
142
    ENGINE = MergeTree() ORDER BY key;
143 144 145 146 147 148

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
```
K
kssenii 已提交
149 150 151 152

## Virtual Columns {#virtual-columns}

-   `_exchange_name` - RabbitMQ exchange name.
K
kssenii 已提交
153 154
-   `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
-   `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
K
kssenii 已提交
155
-   `_redelivered` - `redelivered` flag of the message.
K
kssenii 已提交
156 157
-   `_message_id` - messageID of the received message; non-empty if was set, when message was published.
-   `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.