未验证 提交 37b904b8 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #4808 from vitlibar/kafka-with-protobuf-format

Kafka with protobuf format
......@@ -797,14 +797,33 @@ private:
written_progress_chars = 0;
written_first_block = false;
connection->forceConnected();
{
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); });
auto apply_query_settings = [&](const IAST & settings_ast)
{
if (!old_settings)
old_settings.emplace(context.getSettingsRef());
for (const auto & change : settings_ast.as<ASTSetQuery>()->changes)
context.setSetting(change.name, change.value);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->settings_ast)
apply_query_settings(*insert->settings_ast);
/// FIXME: try to prettify this cast using `as<>()`
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
const auto * insert_query = parsed_query->as<ASTInsertQuery>();
if (insert_query && !insert_query->select)
processInsertQuery();
else
processOrdinaryQuery();
connection->forceConnected();
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
if (insert && !insert->select)
processInsertQuery();
else
processOrdinaryQuery();
}
/// Do not change context (current DB, settings) in case of an exception.
if (!got_exception)
......@@ -964,8 +983,6 @@ private:
{
if (!insert->format.empty())
current_format = insert->format;
if (insert->settings_ast)
InterpreterSetQuery(insert->settings_ast, context).executeForCurrentContext();
}
BlockInputStreamPtr block_input = context.getInputFormat(
......@@ -1248,10 +1265,6 @@ private:
const auto & id = query_with_output->format->as<ASTIdentifier &>();
current_format = id.name;
}
if (query_with_output->settings_ast)
{
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
}
}
if (has_vertical_output_suffix)
......
......@@ -533,12 +533,6 @@ void SettingString::write(WriteBuffer & buf) const
}
void SettingChar::checkStringIsACharacter(const String & x) const
{
if (x.size() != 1)
throw Exception("A setting's value string has to be an exactly one character long", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH);
}
String SettingChar::toString() const
{
return String(1, value);
......@@ -552,9 +546,10 @@ void SettingChar::set(char x)
void SettingChar::set(const String & x)
{
checkStringIsACharacter(x);
value = x[0];
changed = true;
if (x.size() > 1)
throw Exception("A setting's value string has to be an exactly one character long", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH);
char c = (x.size() == 1) ? x[0] : '\0';
set(c);
}
void SettingChar::set(const Field & x)
......@@ -565,10 +560,9 @@ void SettingChar::set(const Field & x)
void SettingChar::set(ReadBuffer & buf)
{
String x;
readBinary(x, buf);
checkStringIsACharacter(x);
set(x);
String s;
readBinary(s, buf);
set(s);
}
void SettingChar::write(WriteBuffer & buf) const
......
......@@ -335,9 +335,6 @@ struct SettingString
struct SettingChar
{
private:
void checkStringIsACharacter(const String & x) const;
public:
char value;
bool changed = false;
......
......@@ -305,7 +305,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
[](ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "capnp")),
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")),
sample,
max_block_size,
settings);
......
......@@ -11,20 +11,29 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schema_file_extension, bool schema_required)
namespace
{
String format_schema = context.getSettingsRef().format_schema.toString();
if (format_schema.empty())
String getFormatSchemaDefaultFileExtension(const String & format)
{
if (schema_required)
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'"),
ErrorCodes::BAD_ARGUMENTS);
}
return;
if (format == "Protobuf")
return "proto";
else if (format == "CapnProto")
return "capnp";
else
return "";
}
}
FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & format)
{
String format_schema = context.getSettingsRef().format_schema.toString();
if (format_schema.empty())
throw Exception(
"The format " + format + " requires a schema. The 'format_schema' setting should be set", ErrorCodes::BAD_ARGUMENTS);
String default_file_extension = getFormatSchemaDefaultFileExtension(format);
size_t colon_pos = format_schema.find(':');
Poco::Path path;
......@@ -33,12 +42,11 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema
+ (default_file_extension.empty() ? "" : ", e.g. 'schema." + default_file_extension + ":Message'") + ". Got '" + format_schema
+ "'",
ErrorCodes::BAD_ARGUMENTS);
}
is_null = false;
message_name = format_schema.substr(colon_pos + 1);
auto default_schema_directory = [&context]()
......@@ -51,8 +59,8 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
return context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
};
if (path.getExtension().empty() && !schema_file_extension.empty())
path.setExtension(schema_file_extension);
if (path.getExtension().empty() && !default_file_extension.empty())
path.setExtension(default_file_extension);
if (path.isAbsolute())
{
......
......@@ -10,10 +10,7 @@ class Context;
class FormatSchemaInfo
{
public:
FormatSchemaInfo() = default;
FormatSchemaInfo(const Context & context, const String & schema_file_extension = String(), bool schema_required = true);
bool isNull() const { return is_null; }
FormatSchemaInfo(const Context & context, const String & format);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }
......@@ -26,7 +23,6 @@ public:
const String & messageName() const { return message_name; }
private:
bool is_null = true;
String schema_path;
String schema_directory;
String message_name;
......
......@@ -75,7 +75,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "proto")),
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, settings);
});
}
......
......@@ -38,7 +38,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "proto")), header);
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "Protobuf")), header);
});
}
......
......@@ -36,7 +36,7 @@ protected:
return false;
BufferBase::set(buffer->position(), buffer->available(), 0);
put_delimiter = true;
put_delimiter = (delimiter != 0);
}
return true;
......
......@@ -15,14 +15,17 @@ services:
image: confluentinc/cp-kafka:4.1.0
hostname: kafka1
ports:
- "9092:9092"
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_ZOOKEEPER_CONNECT: "kafka_zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kafka1:19092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_zookeeper:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- kafka_zookeeper
- kafka_zookeeper
security_opt:
- label:disable
- label:disable
......@@ -25,7 +25,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes -
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal
RUN pip install pytest docker-compose==1.22.0 docker dicttoxml kazoo PyMySQL psycopg2 pymongo tzlocal kafka-python protobuf
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce
......
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}
\ No newline at end of file
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/kafka.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/kafka.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n*clickhouse_path/format_schemas/kafka.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_KEYVALUEPAIR = _descriptor.Descriptor(
name='KeyValuePair',
full_name='KeyValuePair',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValuePair.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValuePair.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=46,
serialized_end=88,
)
DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR
KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), dict(
DESCRIPTOR = _KEYVALUEPAIR,
__module__ = 'clickhouse_path.format_schemas.kafka_pb2'
# @@protoc_insertion_point(class_scope:KeyValuePair)
))
_sym_db.RegisterMessage(KeyValuePair)
# @@protoc_insertion_point(module_scope)
......@@ -7,6 +7,17 @@ from helpers.test_tools import TSV
import json
import subprocess
from kafka import KafkaProducer
from google.protobuf.internal.encoder import _VarintBytes
"""
protoc --version
libprotoc 3.0.0
# to create kafka_pb2.py
protoc --python_out=. kafka.proto
"""
import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
......@@ -17,7 +28,8 @@ import subprocess
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
with_kafka=True)
with_kafka=True,
clickhouse_path_dir='clickhouse_path')
kafka_id = ''
......@@ -30,7 +42,7 @@ def check_kafka_is_available():
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'PLAINTEXT://localhost:9092'),
'INSIDE://localhost:9092'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
......@@ -56,7 +68,7 @@ def kafka_produce(topic, messages):
kafka_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'localhost:9092',
'INSIDE://localhost:9092',
'--topic',
topic,
'--sync',
......@@ -65,7 +77,21 @@ def kafka_produce(topic, messages):
stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
print("Produced {} messages".format(len(messages.splitlines())))
print("Produced {} messages for topic {}".format(len(messages.splitlines()), topic))
def kafka_produce_protobuf_messages(topic, start_index, num_messages):
data = ''
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic=topic, value=data)
producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic))
# Since everything is async and shaky when receiving messages from Kafka,
......@@ -110,7 +136,7 @@ def kafka_setup_teardown():
def test_kafka_settings_old_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'old', 'old', 'JSONEachRow', '\\n');
ENGINE = Kafka('kafka1:19092', 'old', 'old', 'JSONEachRow', '\\n');
''')
# Don't insert malformed messages since old settings syntax
......@@ -133,7 +159,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
......@@ -168,7 +194,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
......@@ -193,7 +219,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
......@@ -213,6 +239,30 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
kafka_produce_protobuf_messages('pb', 20, 1)
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
for i in range(50):
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
kafka_check_result(result, True)
def test_kafka_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
......@@ -220,7 +270,7 @@ def test_kafka_materialized_view(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册