rabbitmq.md 13.2 KB
Newer Older
E
Evgeniia Sudarikova 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# RabbitMQ {#rabbitmq-engine}

Движок работает с [RabbitMQ](https://www.rabbitmq.com).

`RabbitMQ` позволяет:

-   Публиковать/подписываться на потоки данных.
-   Обрабатывать потоки по мере их появления.

## Создание таблицы {#table_engine-rabbitmq-creating-a-table}

``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port',
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_row_delimiter = 'delimiter_symbol',]
K
kssenii 已提交
25
    [rabbitmq_schema = '',]
E
Evgeniia Sudarikova 已提交
26 27
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
K
kssenii 已提交
28 29 30 31 32
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N]
E
Evgeniia Sudarikova 已提交
33 34 35 36 37 38 39 40 41 42
```

Обязательные параметры:

-   `rabbitmq_host_port` – адрес сервера (`хост:порт`). Например: `localhost:5672`.
-   `rabbitmq_exchange_name` – имя точки обмена в RabbitMQ.
-   `rabbitmq_format` – формат сообщения. Используется такое же обозначение, как и в функции `FORMAT` в SQL, например, `JSONEachRow`. Подробнее см. в разделе [Форматы входных и выходных данных](../../../interfaces/formats.md).

Дополнительные параметры:

K
kssenii 已提交
43
-   `rabbitmq_exchange_type` – тип точки обмена в RabbitMQ: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. По умолчанию: `fanout`.
E
Evgeniia Sudarikova 已提交
44 45
-   `rabbitmq_routing_key_list` – список ключей маршрутизации, через запятую.
-   `rabbitmq_row_delimiter` – символ-разделитель, который завершает сообщение.
K
kssenii 已提交
46
-   `rabbitmq_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
E
Evgeniia Sudarikova 已提交
47
-   `rabbitmq_num_consumers` – количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
K
kssenii 已提交
48
-   `rabbitmq_num_queues` – количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
K
kssenii 已提交
49 50 51 52 53
-   `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
-   `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
-   `rabbitmq_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.
-   `rabbitmq_max_block_size`
-   `rabbitmq_flush_interval_ms`
E
Evgeniia Sudarikova 已提交
54

K
kssenii 已提交
55
Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек.
K
kssenii 已提交
56

E
Evgeniia Sudarikova 已提交
57 58 59 60 61
Example:

``` sql
  CREATE TABLE queue (
    key UInt64,
K
kssenii 已提交
62 63
    value UInt64,
    date DateTime
E
Evgeniia Sudarikova 已提交
64 65 66
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
K
kssenii 已提交
67 68
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';
E
Evgeniia Sudarikova 已提交
69 70 71 72
```

Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла ClickHouse.

K
kssenii 已提交
73 74
Требуемая конфигурация:

E
Evgeniia Sudarikova 已提交
75 76 77 78 79 80 81
``` xml
 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>
```

K
kssenii 已提交
82 83 84 85 86 87 88 89
Дополнительная конфигурация:

``` xml
 <rabbitmq>
    <vhost>clickhouse</vhost>
 </rabbitmq>
```

E
Evgeniia Sudarikova 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
## Описание {#description}

Запрос `SELECT` не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью [материализованных преставлений](../../../sql-reference/statements/create/view.md). Для этого:

1.  Создайте потребителя RabbitMQ с помощью движка и рассматривайте его как поток данных.
2.  Создайте таблицу с необходимой структурой.
3.  Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.

Когда к движку присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от RabbitMQ и преобразовывать их в необходимый формат с помощью `SELECT`.
У одной таблицы RabbitMQ может быть неограниченное количество материализованных представлений.

Данные передаются с помощью параметров `rabbitmq_exchange_type` и `rabbitmq_routing_key_list`.
Может быть не более одной точки обмена на таблицу. Одна точка обмена может использоваться несколькими таблицами: это позволяет выполнять маршрутизацию по нескольким таблицам одновременно.

Параметры точек обмена:

-   `direct` - маршрутизация основана на точном совпадении ключей. Пример списка ключей: `key1,key2,key3,key4,key5`. Ключ сообщения может совпадать с одним из них.
-   `fanout` - маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.
-   `topic` - маршрутизация основана на правилах с ключами, разделенными точками. Например: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
-   `headers` - маршрутизация основана на совпадении `key=value` с настройкой `x-match=all` или `x-match=any`. Пример списка ключей таблицы: `x-match=all,format=logs,type=report,year=2020`.
K
kssenii 已提交
110 111 112
-   `consistent_hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.

Настройка `rabbitmq_queue_base` может быть использована в следующих случаях:
K
kssenii 已提交
113

K
kssenii 已提交
114 115 116
1.   чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в `rabbitmq_queue_base` настройку и не указывать настройки `rabbitmq_num_consumers` и `rabbitmq_num_queues`. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. По умолчанию, если настройка `rabbitmq_queue_base` не указана, будут использованы уникальные для каждой таблицы имена очередей.
2.   чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: `rabbitmq_num_consumers`, `rabbitmq_num_queues`.
3.   чтобы повторно использовать созданные c `durable` настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
E
Evgeniia Sudarikova 已提交
117

A
Alexey Milovidov 已提交
118
Для улучшения производительности полученные сообщения группируются в блоки размера [max_insert_block_size](../../../operations/settings/settings.md#settings-max_insert_block_size). Если блок не удалось сформировать за [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms) миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
E
Evgeniia Sudarikova 已提交
119 120 121 122 123 124

Если параметры`rabbitmq_num_consumers` и/или `rabbitmq_num_queues` заданы вместе с параметром `rabbitmq_exchange_type`:

-   плагин `rabbitmq-consistent-hash-exchange` должен быть включен.
-   свойство `message_id` должно быть определено (уникальное для каждого сообщения/пакета).

K
kssenii 已提交
125 126 127
При запросах `INSERT` отправляемым сообщениям добавляются метаданные: `messageID` и флаг `republished` - доступны через заголовки сообщений (headers).
Для запросов чтения и вставки не должна использоваться одна и та же таблица.

E
Evgeniia Sudarikova 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
Пример:

``` sql
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree();

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
```
K
kssenii 已提交
149 150 151 152 153 154 155

## Virtual Columns {#virtual-columns}

-   `_exchange_name` - имя точки обмена RabbitMQ.
-   `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
-   `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
-   `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
K
kssenii 已提交
156 157
-   `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
-   `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.