From 154d7e32283bdaa08940d1e9832448b736be1253 Mon Sep 17 00:00:00 2001 From: Alexander Marshalov <_@marshalov.org> Date: Thu, 2 Aug 2018 00:23:50 +0700 Subject: [PATCH] Added SETTINGS clause for Kafka storage engine --- dbms/CMakeLists.txt | 1 + dbms/src/Storages/Kafka/KafkaSettings.cpp | 44 +++++ dbms/src/Storages/Kafka/KafkaSettings.h | 43 +++++ .../src/Storages/{ => Kafka}/StorageKafka.cpp | 179 ++++++++++++++---- dbms/src/Storages/{ => Kafka}/StorageKafka.h | 0 dbms/src/Storages/StorageFactory.cpp | 12 +- .../integration/test_storage_kafka/test.py | 68 +++++-- docs/en/operations/table_engines/kafka.md | 59 +++++- docs/ru/operations/table_engines/kafka.md | 60 ++++-- 9 files changed, 395 insertions(+), 71 deletions(-) create mode 100644 dbms/src/Storages/Kafka/KafkaSettings.cpp create mode 100644 dbms/src/Storages/Kafka/KafkaSettings.h rename dbms/src/Storages/{ => Kafka}/StorageKafka.cpp (78%) rename dbms/src/Storages/{ => Kafka}/StorageKafka.h (100%) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index ba01476746..7c5bdb329c 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy) add_headers_and_sources(dbms src/Columns) add_headers_and_sources(dbms src/Storages) add_headers_and_sources(dbms src/Storages/Distributed) +add_headers_and_sources(dbms src/Storages/Kafka) add_headers_and_sources(dbms src/Storages/MergeTree) add_headers_and_sources(dbms src/Client) add_headers_and_sources(dbms src/Formats) diff --git a/dbms/src/Storages/Kafka/KafkaSettings.cpp b/dbms/src/Storages/Kafka/KafkaSettings.cpp new file mode 100644 index 0000000000..be6c3b11b0 --- /dev/null +++ b/dbms/src/Storages/Kafka/KafkaSettings.cpp @@ -0,0 +1,44 @@ +#include +#if USE_RDKAFKA + +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +void KafkaSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + for (const ASTSetQuery::Change & setting : storage_def.settings->changes) + { +#define SET(TYPE, NAME, DEFAULT, DESCRIPTION) \ + else if (setting.name == #NAME) NAME.set(setting.value); + + if (false) {} + APPLY_FOR_KAFKA_SETTINGS(SET) + else + throw Exception( + "Unknown setting " + setting.name + " for storage " + storage_def.engine->name, + ErrorCodes::BAD_ARGUMENTS); +#undef SET + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} + +} +#endif diff --git a/dbms/src/Storages/Kafka/KafkaSettings.h b/dbms/src/Storages/Kafka/KafkaSettings.h new file mode 100644 index 0000000000..bd7a5cc0bb --- /dev/null +++ b/dbms/src/Storages/Kafka/KafkaSettings.h @@ -0,0 +1,43 @@ +#pragma once +#include +#if USE_RDKAFKA + +#include +#include +#include +#include + + +namespace DB +{ + +class ASTStorage; + +/** Settings for the Kafka engine. + * Could be loaded from a CREATE TABLE query (SETTINGS clause). + */ +struct KafkaSettings +{ + +#define APPLY_FOR_KAFKA_SETTINGS(M) \ + M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \ + M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \ + M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \ + M(SettingString, kafka_format, "", "Message format for Kafka engine.") \ + M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \ + M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \ + M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") + +#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \ + TYPE NAME {DEFAULT}; + + APPLY_FOR_KAFKA_SETTINGS(DECLARE) + +#undef DECLARE + +public: + void loadFromQuery(ASTStorage & storage_def); +}; + +} +#endif diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp similarity index 78% rename from dbms/src/Storages/StorageKafka.cpp rename to dbms/src/Storages/Kafka/StorageKafka.cpp index 43ed4e3b63..d43996e65b 100644 --- a/dbms/src/Storages/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -23,7 +23,9 @@ #include #include #include -#include // Y_IGNORE +#include +#include +#include // Y_IGNORE #include #include #include @@ -566,93 +568,200 @@ void registerStorageKafka(StorageFactory & factory) factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; + size_t args_count = engine_args.size(); + bool has_settings = args.storage_def->settings; + + KafkaSettings kafka_settings; + if (has_settings) + { + kafka_settings.loadFromQuery(*args.storage_def); + } /** Arguments of engine is following: * - Kafka broker list * - List of topics * - Group ID (may be a constaint expression with a string result) * - Message format (string) + * - Row delimiter * - Schema (optional, if the format supports it) + * - Number of consumers */ - if (engine_args.size() < 3 || engine_args.size() > 7) - throw Exception( - "Storage Kafka requires 3-7 parameters" - " - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + // Check arguments and settings + #define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \ + /* One of the four required arguments is not specified */ \ + if (args_count < ARG_NUM && ARG_NUM <= 4 && \ + !kafka_settings.PAR_NAME.changed) \ + { \ + throw Exception( \ + "Required parameter '" #PAR_NAME "' " \ + "for storage Kafka not specified", \ + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \ + } \ + /* The same argument is given in two places */ \ + if (has_settings && \ + kafka_settings.PAR_NAME.changed && \ + args_count >= ARG_NUM) \ + { \ + throw Exception( \ + "The argument №" #ARG_NUM " of storage Kafka " \ + "and the parameter '" #PAR_NAME "' " \ + "in SETTINGS cannot be specified at the same time", \ + ErrorCodes::BAD_ARGUMENTS); \ + } + + CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list) + CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list) + CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name) + CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format) + CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter) + CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema) + CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers) + #undef CHECK_KAFKA_STORAGE_ARGUMENT + // Get and check broker list String brokers; - auto ast = typeid_cast(engine_args[0].get()); - if (ast && ast->value.getType() == Field::Types::String) - brokers = safeGet(ast->value); - else - throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS); + if (args_count >= 1) + { + auto ast = typeid_cast(engine_args[0].get()); + if (ast && ast->value.getType() == Field::Types::String) + { + brokers = safeGet(ast->value); + } + else + { + throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS); + } + } + else if (kafka_settings.kafka_broker_list.changed) + { + brokers = kafka_settings.kafka_broker_list.value; + } + + // Get and check topic list + String topic_list; + if (args_count >= 2) + { + engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); + topic_list = static_cast(*engine_args[1]).value.safeGet(); + } + else if (kafka_settings.kafka_topic_list.changed) + { + topic_list = kafka_settings.kafka_topic_list.value; + } + Names topics; + boost::split(topics, topic_list , [](char c){ return c == ','; }); + for (String & topic : topics) + { + boost::trim(topic); + } + + // Get and check group name + String group; + if (args_count >= 3) + { + engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); + group = static_cast(*engine_args[2]).value.safeGet(); + } + else if (kafka_settings.kafka_group_name.changed) + { + group = kafka_settings.kafka_group_name.value; + } - engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); - engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); - engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); + // Get and check message format name + String format; + if (args_count >= 4) + { + engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); + + auto ast = typeid_cast(engine_args[3].get()); + if (ast && ast->value.getType() == Field::Types::String) + { + format = safeGet(ast->value); + } + else + { + throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); + } + } + else if (kafka_settings.kafka_format.changed) + { + format = kafka_settings.kafka_format.value; + } // Parse row delimiter (optional) char row_delimiter = '\0'; - if (engine_args.size() >= 5) + if (args_count >= 5) { engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); auto ast = typeid_cast(engine_args[4].get()); String arg; if (ast && ast->value.getType() == Field::Types::String) + { arg = safeGet(ast->value); + } else + { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); + } if (arg.size() > 1) + { throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); + } else if (arg.size() == 0) + { row_delimiter = '\0'; + } else + { row_delimiter = arg[0]; + } + } + else if (kafka_settings.kafka_row_delimiter.changed) + { + row_delimiter = kafka_settings.kafka_row_delimiter.value; } // Parse format schema if supported (optional) String schema; - if (engine_args.size() >= 6) + if (args_count >= 6) { engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); auto ast = typeid_cast(engine_args[5].get()); if (ast && ast->value.getType() == Field::Types::String) + { schema = safeGet(ast->value); + } else + { throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS); + } + } + else if (kafka_settings.kafka_schema.changed) + { + schema = kafka_settings.kafka_schema.value; } // Parse number of consumers (optional) UInt64 num_consumers = 1; - if (engine_args.size() >= 7) + if (args_count >= 7) { auto ast = typeid_cast(engine_args[6].get()); if (ast && ast->value.getType() == Field::Types::UInt64) + { num_consumers = safeGet(ast->value); + } else + { throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS); + } + } + else if (kafka_settings.kafka_num_consumers.changed) + { + num_consumers = kafka_settings.kafka_num_consumers.value; } - - // Parse topic list - Names topics; - String topic_arg = static_cast(*engine_args[1]).value.safeGet(); - boost::split(topics, topic_arg , [](char c){ return c == ','; }); - for(String & topic : topics) - boost::trim(topic); - - // Parse consumer group - String group = static_cast(*engine_args[2]).value.safeGet(); - - // Parse format from string - String format; - ast = typeid_cast(engine_args[3].get()); - if (ast && ast->value.getType() == Field::Types::String) - format = safeGet(ast->value); - else - throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); return StorageKafka::create( args.table_name, args.database_name, args.context, args.columns, diff --git a/dbms/src/Storages/StorageKafka.h b/dbms/src/Storages/Kafka/StorageKafka.h similarity index 100% rename from dbms/src/Storages/StorageKafka.h rename to dbms/src/Storages/Kafka/StorageKafka.h diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index d56e7ee4d8..9ceb59abbc 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -87,11 +87,19 @@ StoragePtr StorageFactory::get( name = engine_def.name; - if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by || storage_def->settings) + if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka") + { + throw Exception( + "Engine " + name + " doesn't support SETTINGS clause. " + "Currently only the MergeTree family of engines and Kafka engine supports it", + ErrorCodes::BAD_ARGUMENTS); + } + + if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by) && !endsWith(name, "MergeTree")) { throw Exception( - "Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. " + "Engine " + name + " doesn't support PARTITION BY, ORDER BY or SAMPLE BY clauses. " "Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS); } diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 85d6f090d5..41076ac78c 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -1,6 +1,5 @@ import os.path as p import time -import datetime import pytest from helpers.cluster import ClickHouseCluster @@ -10,9 +9,11 @@ import json import subprocess - cluster = ClickHouseCluster(__file__) -instance = cluster.add_instance('instance', main_configs=['configs/kafka.xml'], with_kafka = True) +instance = cluster.add_instance('instance', + main_configs=['configs/kafka.xml'], + with_kafka=True) + @pytest.fixture(scope="module") def started_cluster(): @@ -25,23 +26,36 @@ def started_cluster(): finally: cluster.shutdown() + def kafka_is_available(started_cluster): - p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'PLAINTEXT://localhost:9092'), stdout=subprocess.PIPE) - streamdata = p.communicate()[0] + p = subprocess.Popen(('docker', + 'exec', + '-i', + started_cluster.kafka_docker_id, + '/usr/bin/kafka-broker-api-versions', + '--bootstrap-server', + 'PLAINTEXT://localhost:9092'), + stdout=subprocess.PIPE) + p.communicate()[0] return p.returncode == 0 + def kafka_produce(started_cluster, topic, messages): - p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-console-producer', '--broker-list', 'localhost:9092', '--topic', topic), stdin=subprocess.PIPE) + p = subprocess.Popen(('docker', + 'exec', + '-i', + started_cluster.kafka_docker_id, + '/usr/bin/kafka-console-producer', + '--broker-list', + 'localhost:9092', + '--topic', + topic), + stdin=subprocess.PIPE) p.communicate(messages) p.stdin.close() -def test_kafka_json(started_cluster): - instance.query(''' -DROP TABLE IF EXISTS test.kafka; -CREATE TABLE test.kafka (key UInt64, value UInt64) - ENGINE = Kafka('kafka1:9092', 'json', 'json', 'JSONEachRow', '\\n'); -''') +def kafka_check_json_numbers(instance): retries = 0 while True: if kafka_is_available(started_cluster): @@ -58,10 +72,38 @@ CREATE TABLE test.kafka (key UInt64, value UInt64) kafka_produce(started_cluster, 'json', messages) time.sleep(3) result = instance.query('SELECT * FROM test.kafka;') - with open(p.join(p.dirname(__file__), 'test_kafka_json.reference')) as reference: + file = p.join(p.dirname(__file__), 'test_kafka_json.reference') + with open(file) as reference: assert TSV(result) == TSV(reference) + + +def test_kafka_json(started_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.kafka; + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka('kafka1:9092', 'json', 'json', + 'JSONEachRow', '\\n'); + ''') + kafka_check_json_numbers(instance) instance.query('DROP TABLE test.kafka') + +def test_kafka_json_settings(started_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.kafka; + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS + kafka_broker_list = 'kafka1:9092', + kafka_topic_list = 'json' + kafka_group_name = 'json' + kafka_format = 'JSONEachRow' + kafka_row_delimiter = '\\n'; + ''') + kafka_check_json_numbers(instance) + instance.query('DROP TABLE test.kafka') + + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...") diff --git a/docs/en/operations/table_engines/kafka.md b/docs/en/operations/table_engines/kafka.md index 31616e77d2..f04c234dcd 100644 --- a/docs/en/operations/table_engines/kafka.md +++ b/docs/en/operations/table_engines/kafka.md @@ -8,20 +8,41 @@ Kafka lets you: - Organize fault-tolerant storage. - Process streams as they become available. + +Old format: + ``` -Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers]) +Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format + [, kafka_row_delimiter, kafka_schema, kafka_num_consumers]) ``` -Parameters: +New format: -- `broker_list` – A comma-separated list of brokers (`localhost:9092`). -- `topic_list` – A list of Kafka topics (`my_topic`). -- `group_name` – A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere. -- `--format` – Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section. -- `schema` – An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. -- `num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition. +``` +Kafka SETTINGS + kafka_broker_list = 'localhost:9092', + kafka_topic_list = 'topic1,topic2', + kafka_group_name = 'group1', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\n' + kafka_schema = '', + kafka_num_consumers = 2 +``` -Example: +Required parameters: + +- `kafka_broker_list` – A comma-separated list of brokers (`localhost:9092`). +- `kafka_topic_list` – A list of Kafka topics (`my_topic`). +- `kafka_group_name` – A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere. +- `kafka_format` – Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section. + +Optional parameters: + +- `kafka_row_delimiter` - Character-delimiter of records (rows), which ends the message. +- `kafka_schema` – An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. +- `kafka_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition. + +Examples: ```sql CREATE TABLE queue ( @@ -31,6 +52,24 @@ Example: ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); SELECT * FROM queue LIMIT 5; + + CREATE TABLE queue2 ( + timestamp UInt64, + level String, + message String + ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', + kafka_topic_list = 'topic', + kafka_group_name = 'group1', + kafka_format = 'JSONEachRow', + kafka_num_consumers = 4; + + CREATE TABLE queue2 ( + timestamp UInt64, + level String, + message String + ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') + SETTINGS kafka_format = 'JSONEachRow', + kafka_num_consumers = 4; ``` The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name. @@ -59,7 +98,7 @@ Example: level String, total UInt64 ) ENGINE = SummingMergeTree(day, (day, level), 8192); - + CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level; diff --git a/docs/ru/operations/table_engines/kafka.md b/docs/ru/operations/table_engines/kafka.md index f368fae386..a0f370df79 100644 --- a/docs/ru/operations/table_engines/kafka.md +++ b/docs/ru/operations/table_engines/kafka.md @@ -1,6 +1,6 @@ # Kafka -Движок работает с [Apache Kafka](http://kafka.apache.org/). +Движок работает с [Apache Kafka](http://kafka.apache.org/). Kafka позволяет: @@ -8,20 +8,40 @@ Kafka позволяет: - Организовать отказо-устойчивое хранилище. - Обрабатывать потоки по мере их появления. +Старый формат: + ``` -Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers]) +Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format + [, kafka_row_delimiter, kafka_schema, kafka_num_consumers]) ``` -Параметры: +Новый формат: -- `broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`). -- `topic_list` - Перечень необходимых топиков Kafka (`my_topic`). -- `group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы. -- `format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы". -- `schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`. -- `num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя. +``` +Kafka SETTINGS + kafka_broker_list = 'localhost:9092', + kafka_topic_list = 'topic1,topic2', + kafka_group_name = 'group1', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\n' + kafka_schema = '', + kafka_num_consumers = 2 +``` -Пример: +Обязательные параметры: + +- `kafka_broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`). +- `kafka_topic_list` - Перечень необходимых топиков Kafka (`my_topic`). +- `kafka_group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы. +- `kafka_format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы". + +Опциональные параметры: + +- `kafka_row_delimiter` - Символ-разделитель записей (строк), которым завершается сообщение. +- `kafka_schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`. +- `kafka_num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя. + +Примеры: ```sql CREATE TABLE queue ( @@ -31,6 +51,24 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers]) ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); SELECT * FROM queue LIMIT 5; + + CREATE TABLE queue2 ( + timestamp UInt64, + level String, + message String + ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', + kafka_topic_list = 'topic', + kafka_group_name = 'group1', + kafka_format = 'JSONEachRow', + kafka_num_consumers = 4; + + CREATE TABLE queue2 ( + timestamp UInt64, + level String, + message String + ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') + SETTINGS kafka_format = 'JSONEachRow', + kafka_num_consumers = 4; ``` Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы. @@ -59,7 +97,7 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers]) level String, total UInt64 ) ENGINE = SummingMergeTree(day, (day, level), 8192); - + CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level; -- GitLab