From 24b032b3786f350a77f32871e6f36c6a81ca13ce Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 6 Aug 2020 13:33:46 +0000 Subject: [PATCH] Allow multiple consumers for same queues --- .../ReadBufferFromRabbitMQConsumer.cpp | 24 ++--- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 31 ++++-- src/Storages/RabbitMQ/StorageRabbitMQ.h | 4 +- .../integration/test_storage_rabbitmq/test.py | 96 +++++++++++++++++-- 4 files changed, 126 insertions(+), 29 deletions(-) diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index cb3ef43d4d..47c15df3bd 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -79,7 +79,10 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id) if (msgcount) LOG_TRACE(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name); - /// Binding key must be a string integer in case of hash exchange (here it is either hash or fanout). + /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are + * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for + * fanout exchange it can be arbitrary. + */ setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id)) .onSuccess([&] { @@ -104,15 +107,11 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id) queue_settings["x-dead-letter-exchange"] = deadletter_exchange; } - if (!queue_base.empty()) - { - const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id); - setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); - } - else - { - setup_channel->declareQueue(AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); - } + /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one + * specific queue when its name is specified in queue_base setting. + */ + const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id); + setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); while (!bindings_created && !bindings_error) { @@ -128,8 +127,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe() consumer_channel->consume(queue_name) .onSuccess([&](const std::string & consumer) { - consumer_tag = consumer; - LOG_TRACE(log, "Consumer {} (consumer tag: {}) is subscribed to queue {}", channel_id, consumer, queue_name); + if (consumer_tag.empty()) + consumer_tag = consumer; + LOG_TRACE(log, "Consumer {} is subscribed to queue {}, consumer tag {}", channel_id, queue_name, consumer); }) .onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered) { diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 1e6e22c7c6..80f66c6be0 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -143,9 +143,28 @@ StorageRabbitMQ::StorageRabbitMQ( auto table_id = getStorageID(); String table_name = table_id.table_name; - /// Make sure that local exchange name is unique for each table and is not the same as client's exchange name - local_exchange = exchange_name + "_" + table_name; - bridge_exchange = local_exchange + "_bridge"; + if (queue_base.empty()) + { + /// Make sure that local exchange name is unique for each table and is not the same as client's exchange name + sharding_exchange = exchange_name + "_" + table_name; + + /* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better + * to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every + * table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base + * for the names of later declared queue (as everything is based on names). + */ + queue_base = "queue_" + table_name; + } + else + { + /* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and + * at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need + * to share sharding exchange. + */ + sharding_exchange = exchange_name + queue_base; + } + + bridge_exchange = sharding_exchange + "_bridge"; /// One looping task for all consumers as they share the same connection == the same handler == the same event loop looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); @@ -203,19 +222,19 @@ void StorageRabbitMQ::initExchange() AMQP::Table binding_arguments; binding_arguments["hash-property"] = "message_id"; - setup_channel->declareExchange(local_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments) + setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments) .onError([&](const char * message) { throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); - setup_channel->bindExchange(bridge_exchange, local_exchange, routing_keys[0]) + setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0]) .onError([&](const char * message) { throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); }); - consumer_exchange = local_exchange; + consumer_exchange = sharding_exchange; } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 31e045ddb8..5aa030d821 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -95,7 +95,7 @@ private: bool hash_exchange; size_t num_queues; const bool use_transactional_channel; - const String queue_base; + String queue_base; const String deadletter_exchange; const bool persistent; @@ -111,7 +111,7 @@ private: std::mutex mutex; std::vector buffers; /// available buffers for RabbitMQ consumers - String local_exchange, bridge_exchange, consumer_exchange; + String sharding_exchange, bridge_exchange, consumer_exchange; std::once_flag flag; size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0 bool update_channel_id = false; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index e45afa4742..be45298b52 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -512,8 +512,6 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq; ''') - time.sleep(1) - i = [0] messages_num = 10000 @@ -1546,7 +1544,7 @@ def test_rabbitmq_queue_resume_1(rabbitmq_cluster): ''') i = [0] - messages_num = 5000 + messages_num = 1000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) @@ -1635,7 +1633,7 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster): ''') i = [0] - messages_num = 5000 + messages_num = 10000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) @@ -1689,8 +1687,6 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster): if int(result1) > collected: break - result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view") - instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_queue_resume; DROP TABLE IF EXISTS test.consumer; @@ -1698,7 +1694,6 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster): ''') assert int(result1) > collected, 'ClickHouse lost some messages: {}'.format(result) - assert int(result2) == 2 @pytest.mark.timeout(420) @@ -1778,8 +1773,6 @@ def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster): if int(result1) >= messages_num * threads_num: break - #result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view") - instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_consumer_acks; DROP TABLE IF EXISTS test.consumer; @@ -1790,6 +1783,91 @@ def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster): assert int(result1) >= messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) +@pytest.mark.timeout(420) +def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.destination; + CREATE TABLE test.destination(key UInt64, value UInt64, consumer_tag String) + ENGINE = MergeTree() + ORDER BY key; + ''') + + num_tables = 4 + for table_id in range(num_tables): + print("Setting up table {}".format(table_id)) + instance.query(''' + DROP TABLE IF EXISTS test.many_consumers_{0}; + DROP TABLE IF EXISTS test.many_consumers_{0}_mv; + CREATE TABLE test.many_consumers_{0} (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'many_consumers', + rabbitmq_num_queues = 2, + rabbitmq_num_consumers = 2, + rabbitmq_queue_base = 'many_consumers', + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; + CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS + SELECT key, value, _consumer_tag as consumer_tag FROM test.many_consumers_{0}; + '''.format(table_id)) + + i = [0] + messages_num = 1000 + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + def produce(): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + messages = [] + for _ in range(messages_num): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + current = 0 + for message in messages: + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='many_consumers', routing_key='', + properties=pika.BasicProperties(message_id=mes_id), body=message) + connection.close() + + threads = [] + threads_num = 20 + + for _ in range(threads_num): + threads.append(threading.Thread(target=produce)) + for thread in threads: + time.sleep(random.uniform(0, 1)) + thread.start() + + result1 = '' + while True: + result1 = instance.query('SELECT count() FROM test.destination') + time.sleep(1) + if int(result1) == messages_num * threads_num: + break + + result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.destination") + + for thread in threads: + thread.join() + + for consumer_id in range(num_tables): + instance.query(''' + DROP TABLE IF EXISTS test.many_consumers_{0}; + DROP TABLE IF EXISTS test.many_consumers_{0}_mv; + '''.format(consumer_id)) + + instance.query(''' + DROP TABLE IF EXISTS test.destination; + ''') + + assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + # 4 tables, 2 consumers for each table => 8 consumer tags + assert int(result2) == 8 + + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...") -- GitLab