提交 24b032b3 编写于 作者: K kssenii

Allow multiple consumers for same queues

上级 add698a8
......@@ -79,7 +79,10 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
if (msgcount)
LOG_TRACE(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/// Binding key must be a string integer in case of hash exchange (here it is either hash or fanout).
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary.
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id))
.onSuccess([&]
{
......@@ -104,15 +107,11 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
}
if (!queue_base.empty())
{
const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id);
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
}
else
{
setup_channel->declareQueue(AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
}
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting.
*/
const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id);
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!bindings_created && !bindings_error)
{
......@@ -128,8 +127,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
consumer_channel->consume(queue_name)
.onSuccess([&](const std::string & consumer)
{
consumer_tag = consumer;
LOG_TRACE(log, "Consumer {} (consumer tag: {}) is subscribed to queue {}", channel_id, consumer, queue_name);
if (consumer_tag.empty())
consumer_tag = consumer;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}, consumer tag {}", channel_id, queue_name, consumer);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
......
......@@ -143,9 +143,28 @@ StorageRabbitMQ::StorageRabbitMQ(
auto table_id = getStorageID();
String table_name = table_id.table_name;
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
local_exchange = exchange_name + "_" + table_name;
bridge_exchange = local_exchange + "_bridge";
if (queue_base.empty())
{
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
sharding_exchange = exchange_name + "_" + table_name;
/* By default without a specified queue name in queue's declaration - its name will be generated by the library, but its better
* to specify it unique for each table to reuse them once the table is recreated. So it means that queues remain the same for every
* table unless queue_base table setting is specified (which allows to register consumers to specific queues). Now this is a base
* for the names of later declared queue (as everything is based on names).
*/
queue_base = "queue_" + table_name;
}
else
{
/* In case different tables are used to register multiple consumers to the same queues (so queues are shared between tables) and
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need
* to share sharding exchange.
*/
sharding_exchange = exchange_name + queue_base;
}
bridge_exchange = sharding_exchange + "_bridge";
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
......@@ -203,19 +222,19 @@ void StorageRabbitMQ::initExchange()
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
setup_channel->declareExchange(local_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
setup_channel->declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
setup_channel->bindExchange(bridge_exchange, local_exchange, routing_keys[0])
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
consumer_exchange = local_exchange;
consumer_exchange = sharding_exchange;
}
......
......@@ -95,7 +95,7 @@ private:
bool hash_exchange;
size_t num_queues;
const bool use_transactional_channel;
const String queue_base;
String queue_base;
const String deadletter_exchange;
const bool persistent;
......@@ -111,7 +111,7 @@ private:
std::mutex mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
String local_exchange, bridge_exchange, consumer_exchange;
String sharding_exchange, bridge_exchange, consumer_exchange;
std::once_flag flag;
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false;
......
......@@ -512,8 +512,6 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq;
''')
time.sleep(1)
i = [0]
messages_num = 10000
......@@ -1546,7 +1544,7 @@ def test_rabbitmq_queue_resume_1(rabbitmq_cluster):
''')
i = [0]
messages_num = 5000
messages_num = 1000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
......@@ -1635,7 +1633,7 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
''')
i = [0]
messages_num = 5000
messages_num = 10000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
......@@ -1689,8 +1687,6 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
if int(result1) > collected:
break
result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view")
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_queue_resume;
DROP TABLE IF EXISTS test.consumer;
......@@ -1698,7 +1694,6 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
''')
assert int(result1) > collected, 'ClickHouse lost some messages: {}'.format(result)
assert int(result2) == 2
@pytest.mark.timeout(420)
......@@ -1778,8 +1773,6 @@ def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster):
if int(result1) >= messages_num * threads_num:
break
#result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view")
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_consumer_acks;
DROP TABLE IF EXISTS test.consumer;
......@@ -1790,6 +1783,91 @@ def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster):
assert int(result1) >= messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination(key UInt64, value UInt64, consumer_tag String)
ENGINE = MergeTree()
ORDER BY key;
''')
num_tables = 4
for table_id in range(num_tables):
print("Setting up table {}".format(table_id))
instance.query('''
DROP TABLE IF EXISTS test.many_consumers_{0};
DROP TABLE IF EXISTS test.many_consumers_{0}_mv;
CREATE TABLE test.many_consumers_{0} (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'many_consumers',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 2,
rabbitmq_queue_base = 'many_consumers',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS
SELECT key, value, _consumer_tag as consumer_tag FROM test.many_consumers_{0};
'''.format(table_id))
i = [0]
messages_num = 1000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
current = 0
for message in messages:
current += 1
mes_id = str(current)
channel.basic_publish(exchange='many_consumers', routing_key='',
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
result1 = ''
while True:
result1 = instance.query('SELECT count() FROM test.destination')
time.sleep(1)
if int(result1) == messages_num * threads_num:
break
result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.destination")
for thread in threads:
thread.join()
for consumer_id in range(num_tables):
instance.query('''
DROP TABLE IF EXISTS test.many_consumers_{0};
DROP TABLE IF EXISTS test.many_consumers_{0}_mv;
'''.format(consumer_id))
instance.query('''
DROP TABLE IF EXISTS test.destination;
''')
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
# 4 tables, 2 consumers for each table => 8 consumer tags
assert int(result2) == 8
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册