提交 1e7eb494 编写于 作者: K kssenii

Add one more mes property, support format_schema

上级 c6fdeb6c
......@@ -22,8 +22,10 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
, column_names(columns)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered"}, storage.getVirtuals(), storage.getStorageID()))
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"}, storage.getVirtuals(), storage.getStorageID()))
{
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
}
......@@ -131,6 +133,7 @@ Block RabbitMQBlockInputStream::readImpl()
auto channel_id = buffer->getChannelID();
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
auto message_id = buffer->getMessageID();
buffer->updateAckTracker({delivery_tag, channel_id});
......@@ -140,6 +143,7 @@ Block RabbitMQBlockInputStream::readImpl()
virtual_columns[1]->insert(channel_id);
virtual_columns[2]->insert(delivery_tag);
virtual_columns[3]->insert(redelivered);
virtual_columns[4]->insert(message_id);
}
total_rows = total_rows + new_rows;
......
......@@ -14,6 +14,7 @@ namespace DB
M(String, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \
M(String, rabbitmq_format, "", "The message format.", 0) \
M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \
M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \
M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \
M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
......
......@@ -14,6 +14,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static const auto QUEUE_SIZE = 50000;
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
......@@ -51,7 +56,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
consumer_channel->onReady([&]()
{
channel_id = channel_base + "_" + std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++);
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
......@@ -142,7 +147,10 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
if (row_delimiter != '\0')
message_received += row_delimiter;
received.push({message_received, redelivered, AckTracker(delivery_tag, channel_id)});
if (message.hasMessageID())
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
else
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
}
})
.onError([&](const char * message)
......@@ -195,7 +203,11 @@ void ReadBufferFromRabbitMQConsumer::restoreChannel(ChannelPtr new_channel)
consumer_channel = std::move(new_channel);
consumer_channel->onReady([&]()
{
channel_id = channel_base + "_" + std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++);
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
* channel_id is unique for each table.
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
......
......@@ -51,6 +51,7 @@ public:
struct MessageData
{
String message;
String message_id;
bool redelivered;
AckTracker track;
};
......@@ -65,6 +66,7 @@ public:
auto getChannelID() const { return current.track.channel_id; }
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
private:
bool nextImpl() override;
......
......@@ -69,6 +69,7 @@ StorageRabbitMQ::StorageRabbitMQ(
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
const String & schema_name_,
const String & exchange_type_,
size_t num_consumers_,
size_t num_queues_,
......@@ -83,6 +84,7 @@ StorageRabbitMQ::StorageRabbitMQ(
, exchange_name(exchange_name_)
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, schema_name(global_context.getMacros()->expand(schema_name_))
, num_consumers(num_consumers_)
, num_queues(num_queues_)
, use_transactional_channel(use_transactional_channel_)
......@@ -785,13 +787,29 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
}
String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value;
String schema = rabbitmq_settings.rabbitmq_schema.value;
if (args_count >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context);
const auto * ast = engine_args[5]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
schema = safeGet<String>(ast->value);
}
else
{
throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value;
if (args_count >= 7)
{
engine_args[6] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[6], args.local_context);
const auto * ast = engine_args[6]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
exchange_type = safeGet<String>(ast->value);
}
......@@ -802,9 +820,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
if (args_count >= 7)
if (args_count >= 8)
{
const auto * ast = engine_args[6]->as<ASTLiteral>();
const auto * ast = engine_args[7]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
......@@ -816,9 +834,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues;
if (args_count >= 8)
if (args_count >= 9)
{
const auto * ast = engine_args[7]->as<ASTLiteral>();
const auto * ast = engine_args[8]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
......@@ -830,9 +848,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
bool use_transactional_channel = static_cast<bool>(rabbitmq_settings.rabbitmq_transactional_channel);
if (args_count >= 9)
if (args_count >= 10)
{
const auto * ast = engine_args[8]->as<ASTLiteral>();
const auto * ast = engine_args[9]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
use_transactional_channel = static_cast<bool>(safeGet<UInt64>(ast->value));
......@@ -844,11 +862,11 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
String queue_base = rabbitmq_settings.rabbitmq_queue_base.value;
if (args_count >= 10)
if (args_count >= 11)
{
engine_args[9] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[9], args.local_context);
engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context);
const auto * ast = engine_args[9]->as<ASTLiteral>();
const auto * ast = engine_args[10]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
queue_base = safeGet<String>(ast->value);
......@@ -856,11 +874,11 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value;
if (args_count >= 11)
if (args_count >= 12)
{
engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context);
engine_args[11] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[11], args.local_context);
const auto * ast = engine_args[10]->as<ASTLiteral>();
const auto * ast = engine_args[11]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
deadletter_exchange = safeGet<String>(ast->value);
......@@ -868,9 +886,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
bool persistent = static_cast<bool>(rabbitmq_settings.rabbitmq_persistent_mode);
if (args_count >= 12)
if (args_count >= 13)
{
const auto * ast = engine_args[11]->as<ASTLiteral>();
const auto * ast = engine_args[12]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
{
persistent = static_cast<bool>(safeGet<UInt64>(ast->value));
......@@ -883,7 +901,7 @@ void registerStorageRabbitMQ(StorageFactory & factory)
return StorageRabbitMQ::create(
args.table_id, args.context, args.columns,
host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers,
host_port, routing_keys, exchange, format, row_delimiter, schema, exchange_type, num_consumers,
num_queues, use_transactional_channel, queue_base, deadletter_exchange, persistent);
};
......@@ -898,7 +916,8 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{"_exchange_name", std::make_shared<DataTypeString>()},
{"_channel_id", std::make_shared<DataTypeString>()},
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
{"_redelivered", std::make_shared<DataTypeUInt8>()}
{"_redelivered", std::make_shared<DataTypeUInt8>()},
{"_message_id", std::make_shared<DataTypeString>()}
};
}
......
......@@ -55,6 +55,7 @@ public:
const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
const auto & getSchemaName() const { return schema_name; }
const String getExchange() const { return exchange_name; }
bool checkBridge() const { return !exchange_removed.load(); }
......@@ -74,6 +75,7 @@ protected:
const String & exchange_name_,
const String & format_name_,
char row_delimiter_,
const String & schema_name_,
const String & exchange_type_,
size_t num_consumers_,
size_t num_queues_,
......@@ -92,6 +94,7 @@ private:
const String format_name;
char row_delimiter;
const String schema_name;
size_t num_consumers;
size_t num_created_consumers = 0;
bool hash_exchange;
......
......@@ -186,7 +186,7 @@ void WriteBufferToRabbitMQProducer::setupChannel()
producer_channel->onReady([&]()
{
channel_id = channel_base + "_" + channel_id_base + std::to_string(channel_id_counter++);
channel_id = channel_id_base + std::to_string(channel_id_counter++) + "_" + channel_base;
LOG_DEBUG(log, "Producer's channel {} is ready", channel_id);
if (use_txn)
......
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/rabbitmq.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/rabbitmq.proto',
package='',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
)
_KEYVALUEPAIR = _descriptor.Descriptor(
name='KeyValuePair',
full_name='KeyValuePair',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValuePair.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValuePair.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=49,
serialized_end=91,
)
DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), {
'DESCRIPTOR' : _KEYVALUEPAIR,
'__module__' : 'clickhouse_path.format_schemas.rabbitmq_pb2'
# @@protoc_insertion_point(class_scope:KeyValuePair)
})
_sym_db.RegisterMessage(KeyValuePair)
# @@protoc_insertion_point(module_scope)
......@@ -16,13 +16,19 @@ from helpers.network import PartitionManager
import json
import subprocess
import avro.schema
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers.MessageSerializer import MessageSerializer
from google.protobuf.internal.encoder import _VarintBytes
import rabbitmq_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
config_dir='configs',
main_configs=['configs/rabbitmq.xml','configs/log_conf.xml'],
with_rabbitmq=True)
with_rabbitmq=True,
clickhouse_path_dir='clickhouse_path')
rabbitmq_id = ''
......@@ -316,6 +322,57 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
def test_rabbitmq_protobuf(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'pb',
rabbitmq_format = 'Protobuf',
rabbitmq_schema = 'rabbitmq.proto:KeyValuePair';
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
data = ''
for i in range(0, 20):
msg = rabbitmq_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
channel.basic_publish(exchange='pb', routing_key='', body=data)
data = ''
for i in range(20, 21):
msg = rabbitmq_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
channel.basic_publish(exchange='pb', routing_key='', body=data)
data = ''
for i in range(21, 50):
msg = rabbitmq_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
channel.basic_publish(exchange='pb', routing_key='', body=data)
result = ''
while True:
result += instance.query('SELECT * FROM test.rabbitmq')
if rabbitmq_check_result(result):
break
connection.close()
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
def test_rabbitmq_materialized_view(rabbitmq_cluster):
instance.query('''
......@@ -451,6 +508,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
DROP TABLE test.view2;
''')
connection.close()
rabbitmq_check_result(result1, True)
rabbitmq_check_result(result2, True)
......@@ -1440,7 +1498,7 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster):
connection.close()
result = instance.query('''
SELECT key, value, _exchange_name, SUBSTRING(_channel_id, 34, 3), _delivery_tag, _redelivered
SELECT key, value, _exchange_name, SUBSTRING(_channel_id, 1, 3), _delivery_tag, _redelivered
FROM test.view ORDER BY key
''')
......@@ -1505,7 +1563,7 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
connection.close()
result = instance.query("SELECT key, value, exchange_name, SUBSTRING(channel_id, 34, 3), delivery_tag, redelivered FROM test.view ORDER BY delivery_tag")
result = instance.query("SELECT key, value, exchange_name, SUBSTRING(channel_id, 1, 3), delivery_tag, redelivered FROM test.view ORDER BY delivery_tag")
expected = '''\
0 0 virtuals_mv 1_0 1 0
1 1 virtuals_mv 1_0 2 0
......@@ -1769,7 +1827,7 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_consumer_restore_failed_connection_without_losses(rabbitmq_cluster):
def test_rabbitmq_consumer_restore_failed_connection_without_losses_1(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
ENGINE = RabbitMQ
......@@ -1901,71 +1959,72 @@ def test_rabbitmq_producer_restore_failed_connection_without_losses(rabbitmq_clu
@pytest.mark.timeout(420)
def test_rabbitmq_virtual_columns_2(rabbitmq_cluster):
def test_rabbitmq_consumer_restore_failed_connection_without_losses_2(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination(key UInt64, value UInt64,
exchange_name String, channel_id String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_num_consumers = 10,
rabbitmq_num_queues = 2,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
table_num = 3
for table_id in range(table_num):
print("Setting up table {}".format(table_id))
instance.query('''
DROP TABLE IF EXISTS test.virtuals_{0};
DROP TABLE IF EXISTS test.virtuals_{0}_mv;
CREATE TABLE test.virtuals_{0} (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'virtuals_2',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 2,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS
SELECT *, _exchange_name as exchange_name, _channel_id as channel_id, _delivery_tag as delivery_tag, _redelivered as redelivered
FROM test.virtuals_{0};
'''.format(table_id))
i = 0
messages_num = 150000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
message_num = 10
i = 0
messages = []
for _ in range(message_num):
for _ in range(messages_num):
messages.append(json.dumps({'key': i, 'value': i}))
i += 1
for i in range(messages_num):
channel.basic_publish(exchange='consumer_reconnect', routing_key='', body=messages[i],
properties=pika.BasicProperties(delivery_mode = 2, message_id=str(i)))
connection.close()
for i in range(message_num):
channel.basic_publish(exchange='virtuals_2', routing_key='', body=messages[i],
properties=pika.BasicProperties(delivery_mode=2, message_id=str(i)))
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.consumer_reconnect;
''')
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(0.1)
kill_rabbitmq();
time.sleep(8);
revive_rabbitmq();
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(0.1)
#kill_rabbitmq();
#time.sleep(2);
#revive_rabbitmq();
kill_rabbitmq();
time.sleep(2);
revive_rabbitmq();
while True:
result = instance.query('SELECT count(DISTINCT concat([channel_id], [toString(delivery_tag)])) FROM test.destination')
print instance.query('''
SELECT DISTINCT concat([channel_id], [toString(delivery_tag)])
FROM (SELECT channel_id AS id, delivery_tag AS tag FROM test.destination GROUP BY id ORDER BY tag)''')
result = instance.query('SELECT count(DISTINCT key) FROM test.view')
time.sleep(1)
if int(result) == message_num * table_num:
if int(result) == messages_num:
break
connection.close()
instance.query('''
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.rabbitmq_virtuals_mv
DROP TABLE IF EXISTS test.consumer_reconnect;
''')
assert int(result) == message_num * table_num
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册