提交 154d7e32 编写于 作者: A Alexander Marshalov 提交者: alexey-milovidov

Added SETTINGS clause for Kafka storage engine

上级 0c233d66
......@@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/Kafka)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
......
#include <Common/config.h>
#if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
for (const ASTSetQuery::Change & setting : storage_def.settings->changes)
{
#define SET(TYPE, NAME, DEFAULT, DESCRIPTION) \
else if (setting.name == #NAME) NAME.set(setting.value);
if (false) {}
APPLY_FOR_KAFKA_SETTINGS(SET)
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + storage_def.engine->name,
ErrorCodes::BAD_ARGUMENTS);
#undef SET
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}
#endif
#pragma once
#include <Common/config.h>
#if USE_RDKAFKA
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Interpreters/SettingsCommon.h>
namespace DB
{
class ASTStorage;
/** Settings for the Kafka engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct KafkaSettings
{
#define APPLY_FOR_KAFKA_SETTINGS(M) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.")
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
APPLY_FOR_KAFKA_SETTINGS(DECLARE)
#undef DECLARE
public:
void loadFromQuery(ASTStorage & storage_def);
};
}
#endif
......@@ -23,7 +23,9 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageKafka.h> // Y_IGNORE
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h> // Y_IGNORE
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
......@@ -566,93 +568,200 @@ void registerStorageKafka(StorageFactory & factory)
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
KafkaSettings kafka_settings;
if (has_settings)
{
kafka_settings.loadFromQuery(*args.storage_def);
}
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
*/
if (engine_args.size() < 3 || engine_args.size() > 7)
throw Exception(
"Storage Kafka requires 3-7 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Check arguments and settings
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \
/* One of the four required arguments is not specified */ \
if (args_count < ARG_NUM && ARG_NUM <= 4 && \
!kafka_settings.PAR_NAME.changed) \
{ \
throw Exception( \
"Required parameter '" #PAR_NAME "' " \
"for storage Kafka not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
/* The same argument is given in two places */ \
if (has_settings && \
kafka_settings.PAR_NAME.changed && \
args_count >= ARG_NUM) \
{ \
throw Exception( \
"The argument №" #ARG_NUM " of storage Kafka " \
"and the parameter '" #PAR_NAME "' " \
"in SETTINGS cannot be specified at the same time", \
ErrorCodes::BAD_ARGUMENTS); \
}
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
#undef CHECK_KAFKA_STORAGE_ARGUMENT
// Get and check broker list
String brokers;
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
brokers = safeGet<String>(ast->value);
else
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
if (args_count >= 1)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
{
brokers = safeGet<String>(ast->value);
}
else
{
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_broker_list.changed)
{
brokers = kafka_settings.kafka_broker_list.value;
}
// Get and check topic list
String topic_list;
if (args_count >= 2)
{
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
topic_list = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
}
else if (kafka_settings.kafka_topic_list.changed)
{
topic_list = kafka_settings.kafka_topic_list.value;
}
Names topics;
boost::split(topics, topic_list , [](char c){ return c == ','; });
for (String & topic : topics)
{
boost::trim(topic);
}
// Get and check group name
String group;
if (args_count >= 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
}
else if (kafka_settings.kafka_group_name.changed)
{
group = kafka_settings.kafka_group_name.value;
}
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
// Get and check message format name
String format;
if (args_count >= 4)
{
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
{
format = safeGet<String>(ast->value);
}
else
{
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_format.changed)
{
format = kafka_settings.kafka_format.value;
}
// Parse row delimiter (optional)
char row_delimiter = '\0';
if (engine_args.size() >= 5)
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[4].get());
String arg;
if (ast && ast->value.getType() == Field::Types::String)
{
arg = safeGet<String>(ast->value);
}
else
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
if (arg.size() > 1)
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
else if (arg.size() == 0)
{
row_delimiter = '\0';
}
else
{
row_delimiter = arg[0];
}
}
else if (kafka_settings.kafka_row_delimiter.changed)
{
row_delimiter = kafka_settings.kafka_row_delimiter.value;
}
// Parse format schema if supported (optional)
String schema;
if (engine_args.size() >= 6)
if (args_count >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[5].get());
if (ast && ast->value.getType() == Field::Types::String)
{
schema = safeGet<String>(ast->value);
}
else
{
throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_schema.changed)
{
schema = kafka_settings.kafka_schema.value;
}
// Parse number of consumers (optional)
UInt64 num_consumers = 1;
if (engine_args.size() >= 7)
if (args_count >= 7)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[6].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_num_consumers.changed)
{
num_consumers = kafka_settings.kafka_num_consumers.value;
}
// Parse topic list
Names topics;
String topic_arg = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
boost::split(topics, topic_arg , [](char c){ return c == ','; });
for(String & topic : topics)
boost::trim(topic);
// Parse consumer group
String group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
// Parse format from string
String format;
ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
format = safeGet<String>(ast->value);
else
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
......
......@@ -87,11 +87,19 @@ StoragePtr StorageFactory::get(
name = engine_def.name;
if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by || storage_def->settings)
if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka")
{
throw Exception(
"Engine " + name + " doesn't support SETTINGS clause. "
"Currently only the MergeTree family of engines and Kafka engine supports it",
ErrorCodes::BAD_ARGUMENTS);
}
if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by)
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. "
"Engine " + name + " doesn't support PARTITION BY, ORDER BY or SAMPLE BY clauses. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}
......
import os.path as p
import time
import datetime
import pytest
from helpers.cluster import ClickHouseCluster
......@@ -10,9 +9,11 @@ import json
import subprocess
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/kafka.xml'], with_kafka = True)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
with_kafka=True)
@pytest.fixture(scope="module")
def started_cluster():
......@@ -25,23 +26,36 @@ def started_cluster():
finally:
cluster.shutdown()
def kafka_is_available(started_cluster):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'PLAINTEXT://localhost:9092'), stdout=subprocess.PIPE)
streamdata = p.communicate()[0]
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'PLAINTEXT://localhost:9092'),
stdout=subprocess.PIPE)
p.communicate()[0]
return p.returncode == 0
def kafka_produce(started_cluster, topic, messages):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-console-producer', '--broker-list', 'localhost:9092', '--topic', topic), stdin=subprocess.PIPE)
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'localhost:9092',
'--topic',
topic),
stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
def test_kafka_json(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'json', 'json', 'JSONEachRow', '\\n');
''')
def kafka_check_json_numbers(instance):
retries = 0
while True:
if kafka_is_available(started_cluster):
......@@ -58,10 +72,38 @@ CREATE TABLE test.kafka (key UInt64, value UInt64)
kafka_produce(started_cluster, 'json', messages)
time.sleep(3)
result = instance.query('SELECT * FROM test.kafka;')
with open(p.join(p.dirname(__file__), 'test_kafka_json.reference')) as reference:
file = p.join(p.dirname(__file__), 'test_kafka_json.reference')
with open(file) as reference:
assert TSV(result) == TSV(reference)
def test_kafka_json(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'json', 'json',
'JSONEachRow', '\\n');
''')
kafka_check_json_numbers(instance)
instance.query('DROP TABLE test.kafka')
def test_kafka_json_settings(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_topic_list = 'json'
kafka_group_name = 'json'
kafka_format = 'JSONEachRow'
kafka_row_delimiter = '\\n';
''')
kafka_check_json_numbers(instance)
instance.query('DROP TABLE test.kafka')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
......
......@@ -8,20 +8,41 @@ Kafka lets you:
- Organize fault-tolerant storage.
- Process streams as they become available.
Old format:
```
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
```
Parameters:
New format:
- `broker_list` – A comma-separated list of brokers (`localhost:9092`).
- `topic_list` – A list of Kafka topics (`my_topic`).
- `group_name` – A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `--format` – Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
- `schema` – An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_schema = '',
kafka_num_consumers = 2
```
Example:
Required parameters:
- `kafka_broker_list` – A comma-separated list of brokers (`localhost:9092`).
- `kafka_topic_list` – A list of Kafka topics (`my_topic`).
- `kafka_group_name` – A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` – Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
Optional parameters:
- `kafka_row_delimiter` - Character-delimiter of records (rows), which ends the message.
- `kafka_schema` – An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
Examples:
```sql
CREATE TABLE queue (
......@@ -31,6 +52,24 @@ Example:
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
```
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
......@@ -59,7 +98,7 @@ Example:
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
......
# Kafka
Движок работает с [Apache Kafka](http://kafka.apache.org/).
Движок работает с [Apache Kafka](http://kafka.apache.org/).
Kafka позволяет:
......@@ -8,20 +8,40 @@ Kafka позволяет:
- Организовать отказо-устойчивое хранилище.
- Обрабатывать потоки по мере их появления.
Старый формат:
```
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
```
Параметры:
Новый формат:
- `broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`).
- `topic_list` - Перечень необходимых топиков Kafka (`my_topic`).
- `group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы".
- `schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_schema = '',
kafka_num_consumers = 2
```
Пример:
Обязательные параметры:
- `kafka_broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`).
- `kafka_topic_list` - Перечень необходимых топиков Kafka (`my_topic`).
- `kafka_group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `kafka_format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы".
Опциональные параметры:
- `kafka_row_delimiter` - Символ-разделитель записей (строк), которым завершается сообщение.
- `kafka_schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `kafka_num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
Примеры:
```sql
CREATE TABLE queue (
......@@ -31,6 +51,24 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
```
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
......@@ -59,7 +97,7 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册