diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 0c70acaf1e3d101c532262885bee9ca9fec7b7c7..1a20699d23ae15a99051b0cf89b11728d7f31557 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -124,23 +124,26 @@ Block RabbitMQBlockInputStream::readImpl() auto new_rows = read_rabbitmq_message(); - auto exchange_name = storage.getExchange(); - auto consumer_tag = buffer->getConsumerTag(); - auto delivery_tag = buffer->getDeliveryTag(); - auto redelivered = buffer->getRedelivered(); - - for (size_t i = 0; i < new_rows; ++i) + if (new_rows) { - virtual_columns[0]->insert(exchange_name); - virtual_columns[1]->insert(consumer_tag); - virtual_columns[2]->insert(delivery_tag); - virtual_columns[3]->insert(redelivered); - } + auto exchange_name = storage.getExchange(); + auto consumer_tag = buffer->getConsumerTag(); + auto delivery_tag = buffer->getDeliveryTag(); + auto redelivered = buffer->getRedelivered(); - if (delivery_tag > last_inserted_delivery_tag) - last_inserted_delivery_tag = delivery_tag; + buffer->updateNextDeliveryTag(delivery_tag); + + for (size_t i = 0; i < new_rows; ++i) + { + virtual_columns[0]->insert(exchange_name); + virtual_columns[1]->insert(consumer_tag); + virtual_columns[2]->insert(delivery_tag); + virtual_columns[3]->insert(redelivered); + } + + total_rows = total_rows + new_rows; + } - total_rows = total_rows + new_rows; buffer->allowNext(); if (!new_rows || !checkTimeLimit()) @@ -167,7 +170,7 @@ void RabbitMQBlockInputStream::readSuffixImpl() if (!buffer) return; - buffer->ackMessages(last_inserted_delivery_tag); + buffer->ackMessages(); } } diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index 09cda6ff94f585c397b59af3159faf86b069588c..f4405ce44df1211a7a50d16459af41e5a802242a 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -39,7 +39,6 @@ private: const Block virtual_header; ConsumerBufferPtr buffer; - UInt64 last_inserted_delivery_tag; }; } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index b1e63005126b6a45802e9ca8d25eed7e82d7d877..27bb7c12d3d5ca744f1db28ef201f5b82d293b04 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -14,7 +14,7 @@ namespace DB { -static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue +static const auto QUEUE_SIZE = 50000; ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, @@ -57,10 +57,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() { - if (ack.load() && max_tag && consumer_channel) - consumer_channel->ack(max_tag, AMQP::multiple); consumer_channel->close(); - received.clear(); BufferBase::set(nullptr, 0, 0); } @@ -68,7 +65,7 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) { - bool bindings_created = false, bindings_error = false; + std::atomic bindings_created = false, bindings_error = false; auto success_callback = [&](const std::string & queue_name_, int msgcount, int /* consumercount */) { @@ -220,13 +217,6 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) message_received += row_delimiter; received.push({deliveryTag, message_received, redelivered}); - - std::lock_guard lock(wait_ack); - if (ack.exchange(false) && prev_tag && prev_tag <= max_tag && consumer_channel) - { - consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked. - LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag); - } } }) .onError([&](const char * message) @@ -239,7 +229,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) void ReadBufferFromRabbitMQConsumer::checkSubscription() { - if (count_subscribed == num_queues) + if (count_subscribed == num_queues || !consumer_channel->usable()) return; wait_subscribed = num_queues; @@ -264,13 +254,14 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() } -void ReadBufferFromRabbitMQConsumer::ackMessages(UInt64 last_inserted_delivery_tag) +void ReadBufferFromRabbitMQConsumer::ackMessages() { - if (last_inserted_delivery_tag > prev_tag) + UInt64 delivery_tag = last_inserted_delivery_tag; + if (delivery_tag && delivery_tag > prev_tag) { - std::lock_guard lock(wait_ack); /// See onReceived() callback. - prev_tag = last_inserted_delivery_tag; - ack.store(true); + prev_tag = delivery_tag; + consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked. + LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag); } } @@ -291,7 +282,6 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() auto * new_position = const_cast(current.message.data()); BufferBase::set(new_position, current.message.size(), 0); allowed = false; - max_tag = current.delivery_tag; return true; } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 6d2deb0be03cdd859e7ae9438983eb09049c14bd..4854858c9b9fe2ba3b92eb2782cba28d710b871b 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -50,7 +50,8 @@ public: void allowNext() { allowed = true; } // Allow to read next message. void checkSubscription(); - void ackMessages(UInt64 last_inserted_delivery_tag); + void updateNextDeliveryTag(UInt64 delivery_tag) { last_inserted_delivery_tag = delivery_tag; } + void ackMessages(); auto getConsumerTag() const { return consumer_tag; } auto getDeliveryTag() const { return current.delivery_tag; } @@ -80,18 +81,16 @@ private: String consumer_tag; ConcurrentBoundedQueue received; - UInt64 prev_tag = 0, max_tag = 0; + UInt64 last_inserted_delivery_tag = 0, prev_tag = 0; MessageData current; std::vector queues; std::unordered_map subscribed_queue; - std::atomic ack = false; - std::mutex wait_ack; bool nextImpl() override; void initQueueBindings(const size_t queue_id); void subscribe(const String & queue_name); void iterateEventLoop(); - }; + } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index daa17719654d408176e1455523928dd67b46f77b..6a842a69550b55e2f2851d36eabccbe676d2d5a2 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -284,9 +284,6 @@ void StorageRabbitMQ::unbindExchange() { std::call_once(flag, [&]() { - if (exchange_removed.load()) - return; - setup_channel->removeExchange(bridge_exchange) .onSuccess([&]() { diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 990f70e0d64456ea62e14364149cd434a5e500e4..1b48232aa52f7d20dcbb36f3a05995baee0f9543 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -150,6 +150,7 @@ void WriteBufferToRabbitMQProducer::countRow() void WriteBufferToRabbitMQProducer::writingFunc() { String payload; + current = 0; auto returned_callback = [&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */) { diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index abf0a20d18ff4e0f1d415793c6a7caca27252c42..bc4585fb6f21301c87c7d9d25e2b4d913c333b4e 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -120,7 +120,6 @@ def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_routing_key_list = 'new', rabbitmq_exchange_name = 'new', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -136,13 +135,13 @@ def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='new', routing_key='new', body=message) + channel.basic_publish(exchange='new', routing_key='', body=message) messages = [] for i in range(25, 50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='new', routing_key='new', body=message) + channel.basic_publish(exchange='new', routing_key='', body=message) connection.close() @@ -191,7 +190,6 @@ def test_rabbitmq_select_empty(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_routing_key_list = 'empty', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') @@ -205,8 +203,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_routing_key_list = 'json', - rabbitmq_exchange_name = 'delim1', + rabbitmq_exchange_name = 'json', rabbitmq_format = 'JSONEachRow' ''') @@ -221,14 +218,14 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): all_messages = [messages] for message in all_messages: - channel.basic_publish(exchange='delim1', routing_key='json', body=message) + channel.basic_publish(exchange='json', routing_key='', body=message) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' all_messages = [messages] for message in all_messages: - channel.basic_publish(exchange='delim1', routing_key='json', body=message) + channel.basic_publish(exchange='json', routing_key='', body=message) result = '' while True: @@ -246,8 +243,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_routing_key_list = 'csv', - rabbitmq_exchange_name = 'delim2', + rabbitmq_exchange_name = 'csv', rabbitmq_format = 'CSV', rabbitmq_row_delimiter = '\\n'; ''') @@ -262,7 +258,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): messages.append('{i}, {i}'.format(i=i)) for message in messages: - channel.basic_publish(exchange='delim2', routing_key='csv', body=message) + channel.basic_publish(exchange='csv', routing_key='', body=message) result = '' while True: @@ -281,8 +277,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_routing_key_list = 'tsv', - rabbitmq_exchange_name = 'delim3', + rabbitmq_exchange_name = 'tsv', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') @@ -297,7 +292,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): messages.append('{i}\t{i}'.format(i=i)) for message in messages: - channel.basic_publish(exchange='delim3', routing_key='tsv', body=message) + channel.basic_publish(exchange='tsv', routing_key='', body=message) result = '' while True: @@ -318,7 +313,6 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mv', - rabbitmq_routing_key_list = 'mv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) @@ -337,7 +331,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='mv', routing_key='mv', body=message) + channel.basic_publish(exchange='mv', routing_key='', body=message) while True: result = instance.query('SELECT * FROM test.view') @@ -362,7 +356,6 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mvsq', - rabbitmq_routing_key_list = 'mvsq', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) @@ -381,7 +374,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='mvsq', routing_key='mvsq', body=message) + channel.basic_publish(exchange='mvsq', routing_key='', body=message) while True: result = instance.query('SELECT * FROM test.view') @@ -408,7 +401,6 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mmv', - rabbitmq_routing_key_list = 'mmv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view1 (key UInt64, value UInt64) @@ -432,7 +424,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='mmv', routing_key='mmv', body=message) + channel.basic_publish(exchange='mmv', routing_key='', body=message) while True: result1 = instance.query('SELECT * FROM test.view1') @@ -470,7 +462,6 @@ def test_rabbitmq_big_message(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'big', - rabbitmq_routing_key_list = 'big', rabbitmq_format = 'JSONEachRow'; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree @@ -480,7 +471,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster): ''') for message in messages: - channel.basic_publish(exchange='big', routing_key='big', body=message) + channel.basic_publish(exchange='big', routing_key='', body=message) while True: result = instance.query('SELECT count() FROM test.view') @@ -580,7 +571,8 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'combo', - rabbitmq_num_consumers = 4, + rabbitmq_num_consumers = 2, + rabbitmq_num_queues = 2, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') @@ -614,12 +606,11 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 - key = str(randrange(1, NUM_CONSUMERS)) current = 0 for message in messages: current += 1 mes_id = str(current) - channel.basic_publish(exchange='combo', routing_key=key, + channel.basic_publish(exchange='combo', routing_key='', properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -911,8 +902,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): def test_rabbitmq_direct_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; @@ -927,14 +917,15 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): CREATE TABLE test.direct_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 5, + rabbitmq_num_consumers = 2, + rabbitmq_num_queues = 2, rabbitmq_exchange_name = 'direct_exchange_testing', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'direct_{0}', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.direct_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.direct_exchange_{0}; + SELECT key, value FROM test.direct_exchange_{0}; '''.format(consumer_id)) i = [0] @@ -985,8 +976,7 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): def test_rabbitmq_fanout_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; ''') @@ -1000,14 +990,15 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): CREATE TABLE test.fanout_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 5, + rabbitmq_num_consumers = 2, + rabbitmq_num_queues = 2, rabbitmq_routing_key_list = 'key_{0}', rabbitmq_exchange_name = 'fanout_exchange_testing', rabbitmq_exchange_type = 'fanout', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.fanout_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.fanout_exchange_{0}; + SELECT key, value FROM test.fanout_exchange_{0}; '''.format(consumer_id)) i = [0] @@ -1055,8 +1046,7 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): def test_rabbitmq_topic_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; ''') @@ -1070,14 +1060,15 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 5, + rabbitmq_num_consumers = 2, + rabbitmq_num_queues = 2, rabbitmq_exchange_name = 'topic_exchange_testing', rabbitmq_exchange_type = 'topic', rabbitmq_routing_key_list = '*.{0}', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.topic_exchange_{0}; + SELECT key, value FROM test.topic_exchange_{0}; '''.format(consumer_id)) for consumer_id in range(num_tables): @@ -1088,14 +1079,15 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 4, + rabbitmq_num_consumers = 2, + rabbitmq_num_queues = 2, rabbitmq_exchange_name = 'topic_exchange_testing', rabbitmq_exchange_type = 'topic', rabbitmq_routing_key_list = '*.logs', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.topic_exchange_{0}; + SELECT key, value FROM test.topic_exchange_{0}; '''.format(num_tables + consumer_id)) i = [0] @@ -1166,7 +1158,8 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): CREATE TABLE test.{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 10, + rabbitmq_num_consumers = 4, + rabbitmq_num_queues = 2, rabbitmq_exchange_type = 'consistent_hash', rabbitmq_exchange_name = 'hash_exchange_testing', rabbitmq_format = 'JSONEachRow', @@ -1229,7 +1222,7 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): thread.join() assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) - assert int(result2) >= 30 + assert int(result2) == 4 * num_tables @@ -1237,34 +1230,15 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): def test_rabbitmq_multiple_bindings(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; ''') instance.query(''' - DROP TABLE IF EXISTS test.bindings_1; - DROP TABLE IF EXISTS test.bindings_1_mv; - CREATE TABLE test.bindings_1 (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 5, - rabbitmq_num_queues = 2, - rabbitmq_exchange_name = 'multiple_bindings_testing', - rabbitmq_exchange_type = 'direct', - rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5', - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - CREATE MATERIALIZED VIEW test.bindings_1_mv TO test.destination AS - SELECT * FROM test.bindings_1; - ''') - - # in case num_consumers and num_queues are not set - multiple bindings are implemented differently, so test them too - instance.query(''' - DROP TABLE IF EXISTS test.bindings_2; - DROP TABLE IF EXISTS test.bindings_2_mv; - CREATE TABLE test.bindings_2 (key UInt64, value UInt64) + DROP TABLE IF EXISTS test.bindings; + DROP TABLE IF EXISTS test.bindings_mv; + CREATE TABLE test.bindings (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'multiple_bindings_testing', @@ -1272,8 +1246,8 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; - CREATE MATERIALIZED VIEW test.bindings_2_mv TO test.destination AS - SELECT * FROM test.bindings_2; + CREATE MATERIALIZED VIEW test.bindings_mv TO test.destination AS + SELECT * FROM test.bindings; ''') i = [0] @@ -1295,12 +1269,8 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): keys = ['key1', 'key2', 'key3', 'key4', 'key5'] for key in keys: - current = 0 for message in messages: - current += 1 - mes_id = str(current) - channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, - properties=pika.BasicProperties(message_id=mes_id), body=message) + channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, body=message) connection.close() @@ -1316,32 +1286,31 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) - if int(result) == messages_num * threads_num * 5 * 2: + if int(result) == messages_num * threads_num * 5: break for thread in threads: thread.join() instance.query(''' - DROP TABLE IF EXISTS test.bindings_1; - DROP TABLE IF EXISTS test.bindings_2; + DROP TABLE IF EXISTS test.bindings; + DROP TABLE IF EXISTS test.bindings_mv; DROP TABLE IF EXISTS test.destination; ''') - assert int(result) == messages_num * threads_num * 5 * 2, 'ClickHouse lost some messages: {}'.format(result) + assert int(result) == messages_num * threads_num * 5, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_headers_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; ''') - num_tables_to_receive = 3 + num_tables_to_receive = 2 for consumer_id in range(num_tables_to_receive): print("Setting up table {}".format(consumer_id)) instance.query(''' @@ -1350,14 +1319,14 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): CREATE TABLE test.headers_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_num_consumers = 4, + rabbitmq_num_consumers = 2, rabbitmq_exchange_name = 'headers_exchange_testing', rabbitmq_exchange_type = 'headers', rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.headers_exchange_{0}; + SELECT key, value FROM test.headers_exchange_{0}; '''.format(consumer_id)) num_tables_to_ignore = 2 @@ -1375,7 +1344,7 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.headers_exchange_{0}; + SELECT key, value FROM test.headers_exchange_{0}; '''.format(consumer_id + num_tables_to_receive)) i = [0] @@ -1683,7 +1652,7 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster): connection.close() threads = [] - threads_num = 10 + threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: