提交 626eb53b 编写于 作者: K kssenii

Fix multiple bindings for single queue & rm hardcoded strings

上级 9350472e
......@@ -14,6 +14,17 @@ namespace DB
{
namespace Exchange
{
/// Note that default here means default by implementation and not by rabbitmq settings
static const String DEFAULT = "default";
static const String FANOUT = "fanout";
static const String DIRECT = "direct";
static const String TOPIC = "topic";
static const String HASH = "consistent_hash";
}
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & eventHandler_,
......@@ -44,7 +55,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
messages.clear();
current = messages.begin();
exchange_type_set = exchange_type != "default" ? true : false;
exchange_type_set = exchange_type != Exchange::DEFAULT ? true : false;
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
......@@ -72,14 +83,14 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
/* If exchange_type is not set - then direct-exchange is used - this type of exchange is the fastest (also due to different
* binding algorithm this default behaviuor is much faster). It is also used in INSERT query.
*/
String producer_exchange = exchange_type_set ? exchange_name + "_default" : exchange_name;
String producer_exchange = exchange_type_set ? exchange_name + "_" + Exchange::DEFAULT : exchange_name;
consumer_channel->declareExchange(producer_exchange, AMQP::fanout).onError([&](const char * message)
{
internal_exchange_declared = false;
LOG_ERROR(log, "Failed to declare exchange: {}", message);
});
internal_exchange_name = producer_exchange + "_direct";
internal_exchange_name = producer_exchange + "_" + Exchange::DIRECT;
consumer_channel->declareExchange(internal_exchange_name, AMQP::direct).onError([&](const char * message)
{
internal_exchange_declared = false;
......@@ -99,11 +110,11 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
/// For special purposes to use the flexibility of routing provided by rabbitmq - choosing exchange types is supported.
AMQP::ExchangeType type;
if (exchange_type == "fanout") type = AMQP::ExchangeType::fanout;
else if (exchange_type == "direct") type = AMQP::ExchangeType::direct;
else if (exchange_type == "topic") type = AMQP::ExchangeType::topic;
else if (exchange_type == "consistent_hash") type = AMQP::ExchangeType::consistent_hash;
else return;
if (exchange_type == Exchange::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type == Exchange::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type == Exchange::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type == Exchange::HASH) type = AMQP::ExchangeType::consistent_hash;
else return;
/* Declare exchange of the specified type and bind it to hash-exchange, which will evenly distribute messages
* between all consumers. (This enables better scaling as without hash-echange - the only oprion to avoid getting the same
......@@ -115,12 +126,12 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
hash_exchange = true;
/// No need for declaring hash-exchange if there is only one consumer with one queue and exchange type is not hash
if (!bind_by_id && exchange_type != "consistent_hash")
if (!bind_by_id && exchange_type != Exchange::HASH)
return;
hash_exchange = true;
AMQP::Table exchange_arguments;
exchange_arguments["hash-property"] = "message_id";
......@@ -153,6 +164,10 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
internal_exchange_declared = true;
}
/* Internal exchange is a default exchange (by implementstion, not by rabbitmq settings) and is used for INSERT query
* and if exchange_type is not set - there is no local exchange. If it is set - then local exchange is a distributor
* exchange, which is bound to the exchange specified by the client.
*/
bool internal_bindings_created = false, internal_bindings_error = false;
bool local_bindings_created = false, local_bindings_error = false;
......@@ -188,7 +203,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
internal_bindings_error = true;
LOG_ERROR(log, "Failed to bind to key {}, the reason is: {}", binding_key, message);
LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message);
});
/// Must be done here and not in readPrefix() because library might fail to handle async subscription on the same connection
......@@ -209,15 +224,16 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
local_bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding: {}", message);
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", binding_key, message);
});
}
else
{
/// means there is only one queue with one consumer - no even distribution needed - no hash-exchange
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange
for (auto & routing_key : routing_keys)
{
consumer_channel->bindQueue(local_exchange_name, queue_name_, routing_key)
/// Binding directly to exchange, specified by the client
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
.onSuccess([&]
{
local_bindings_created = true;
......@@ -225,7 +241,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
local_bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding: {}", message);
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", routing_key, message);
});
}
}
......
......@@ -213,7 +213,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
auto table_id = getStorageID();
String table_name = table_id.getNameForLogs();
String table_name = table_id.getNameForLogs();
return std::make_shared<ReadBufferFromRabbitMQConsumer>(consumer_channel, eventHandler, exchange_name, routing_keys,
next_channel_id, log, row_delimiter, bind_by_id, num_queues, exchange_type, table_name, stream_cancelled);
......
......@@ -927,7 +927,6 @@ def test_rabbitmq_sharding_between_channels_insert(rabbitmq_cluster):
while True:
result = instance.query('SELECT count() FROM test.view_sharding')
time.sleep(1)
print result
if int(result) == messages_num * threads_num:
break
......@@ -1288,9 +1287,17 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.bindings;
DROP TABLE IF EXISTS test.bindings_mv;
CREATE TABLE test.bindings (key UInt64, value UInt64)
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination(key UInt64, value UInt64,
_consumed_by LowCardinality(String))
ENGINE = MergeTree()
ORDER BY key;
''')
instance.query('''
DROP TABLE IF EXISTS test.bindings_1;
DROP TABLE IF EXISTS test.bindings_1_mv;
CREATE TABLE test.bindings_1 (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_num_consumers = 5,
......@@ -1300,13 +1307,25 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_bindings (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.bindings_mv TO test.view_bindings AS
SELECT * FROM test.bindings;
CREATE MATERIALIZED VIEW test.bindings_1_mv TO test.destination AS
SELECT * FROM test.bindings_1;
''')
# in case num_consumers and num_queues are not set - multiple bindings are implemented differently, so test them too
instance.query('''
DROP TABLE IF EXISTS test.bindings_2;
DROP TABLE IF EXISTS test.bindings_2_mv;
CREATE TABLE test.bindings_2 (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'multiple_bindings_testing',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.bindings_2_mv TO test.destination AS
SELECT * FROM test.bindings_2;
''')
i = [0]
messages_num = 500
......@@ -1318,7 +1337,7 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
# init connection here because otherwise python rabbitmq client might fail
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='hash_exchange_testing', exchange_type='x-consistent-hash')
channel.exchange_declare(exchange='multiple_bindings_testing', exchange_type='direct')
messages = []
for _ in range(messages_num):
......@@ -1343,16 +1362,15 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
thread.start()
while True:
result = instance.query('SELECT count() FROM test.view_bindings')
result = instance.query('SELECT count() FROM test.destination')
time.sleep(1)
print result
if int(result) == messages_num * threads_num * 5:
if int(result) == messages_num * threads_num * 5 * 2:
break
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num * 5, 'ClickHouse lost some messages: {}'.format(result)
assert int(result) == messages_num * threads_num * 5 * 2, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册