提交 f0f61116 编写于 作者: K kssenii

Move exchange init, add bridge-exchange

上级 ac448db4
......@@ -124,7 +124,7 @@ Block RabbitMQBlockInputStream::readImpl()
auto new_rows = read_rabbitmq_message();
auto exchange_name = buffer->getExchange();
auto exchange_name = storage.getExchange();
auto consumer_tag = buffer->getConsumerTag();
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
......
......@@ -33,6 +33,8 @@ Block RabbitMQBlockOutputStream::getHeader() const
void RabbitMQBlockOutputStream::writePrefix()
{
if (storage.checkBridge())
storage.unbindExchange();
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
......
......@@ -31,9 +31,11 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
void RabbitMQHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
loop_started.store(true);
/// stop_loop variable is updated in a separate thread
while (!stop_loop.load())
uv_run(loop, UV_RUN_NOWAIT);
loop_started.store(false);
}
void RabbitMQHandler::iterateLoop()
......
......@@ -21,12 +21,13 @@ public:
void stop() { stop_loop.store(true); }
void startLoop();
void iterateLoop();
bool checkLoop() const { return loop_started.load(); }
private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> stop_loop = false;
std::atomic<bool> stop_loop = false, loop_started = false;
std::mutex startup_mutex;
};
......
......@@ -14,15 +14,11 @@
namespace DB
{
namespace ExchangeType
{
static const String HASH_SUF = "_hash";
}
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
const AMQP::ExchangeType & exchange_type_,
......@@ -36,6 +32,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
......@@ -43,21 +40,14 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, channel_id(channel_id_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, local_exchange(local_exchange_)
, local_hash_exchange(local_exchange + ExchangeType::HASH_SUF)
, log(log_)
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, local_exchange(local_exchange_)
, received(QUEUE_SIZE * num_queues)
{
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
*/
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
{
/// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix()
initQueueBindings(queue_id);
}
}
......@@ -70,125 +60,34 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
}
void ReadBufferFromRabbitMQConsumer::initExchange()
{
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers.
*/
consumer_channel->declareExchange(exchange_name, exchange_type).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message);
});
/// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash
if (!hash_exchange || exchange_type == AMQP::ExchangeType::consistent_hash)
return;
{
/* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
*/
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
}
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
if (exchange_type == AMQP::ExchangeType::headers)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
matching.clear();
}
/// Routing key can be arbitrary here.
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
else if (exchange_type == AMQP::ExchangeType::fanout)
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0]).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
else
{
for (const auto & routing_key : routing_keys)
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
}
}
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
/// These variables might be updated later from a separate thread in onError callbacks.
if (!local_exchange_declared || (hash_exchange && !local_hash_exchange_declared))
{
initExchange();
local_exchange_declared = true;
local_hash_exchange_declared = true;
}
bool bindings_created = false, bindings_error = false;
consumer_channel->declareQueue(AMQP::exclusive)
setup_channel->declareQueue(AMQP::exclusive)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
subscribed_queue[queue_name_] = false;
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise
* consumer might fail to subscribe and no resubscription will help.
*/
subscribe(queues.back());
if (hash_exchange)
{
String binding_key;
if (queues.size() == 1)
{
binding_key = std::to_string(channel_id);
}
else
{
binding_key = std::to_string(channel_id + queue_id);
}
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
*/
String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_hash_exchange;
String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_exchange;
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary.
consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
setup_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
.onSuccess([&]
{
bindings_created = true;
......@@ -201,7 +100,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
}
else if (exchange_type == AMQP::ExchangeType::fanout)
{
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0])
setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0])
.onSuccess([&]
{
bindings_created = true;
......@@ -225,7 +124,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
matching.clear();
}
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
.onSuccess([&]
{
bindings_created = true;
......@@ -242,7 +141,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
for (const auto & routing_key : routing_keys)
{
/// Binding directly to exchange, specified by the client.
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
setup_channel->bindQueue(exchange_name, queue_name_, routing_key)
.onSuccess([&]
{
bindings_created = true;
......@@ -261,10 +160,6 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
});
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
*/
while (!bindings_created && !bindings_error)
{
iterateEventLoop();
......
......@@ -24,6 +24,7 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
const AMQP::ExchangeType & exchange_type_,
......@@ -48,13 +49,13 @@ public:
void allowNext() { allowed = true; } // Allow to read next message.
void checkSubscription();
auto getExchange() const { return exchange_name; }
auto getConsumerTag() const { return consumer_tag; }
auto getDeliveryTag() const { return current.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
private:
ChannelPtr consumer_channel;
ChannelPtr setup_channel;
HandlerPtr event_handler;
const String exchange_name;
......@@ -64,18 +65,12 @@ private:
const bool hash_exchange;
const size_t num_queues;
const String local_exchange;
const String local_default_exchange;
const String local_hash_exchange;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
const std::atomic<bool> & stopped;
String default_local_exchange;
bool local_exchange_declared = false, local_hash_exchange_declared = false;
const String local_exchange;
std::atomic<bool> consumer_error = false;
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
......@@ -87,7 +82,7 @@ private:
bool nextImpl() override;
void initExchange();
void connectAlternateExchange();
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void iterateEventLoop();
......
......@@ -118,8 +118,7 @@ StorageRabbitMQ::StorageRabbitMQ(
hash_exchange = num_consumers > 1 || num_queues > 1;
exchange_type_set = exchange_type_ != ExchangeType::DEFAULT;
if (exchange_type_set)
if (exchange_type_ != ExchangeType::DEFAULT)
{
if (exchange_type_ == ExchangeType::FANOUT) exchange_type = AMQP::ExchangeType::fanout;
else if (exchange_type_ == ExchangeType::DIRECT) exchange_type = AMQP::ExchangeType::direct;
......@@ -133,11 +132,23 @@ StorageRabbitMQ::StorageRabbitMQ(
exchange_type = AMQP::ExchangeType::fanout;
}
if (exchange_type == AMQP::ExchangeType::headers)
{
std::vector<String> matching;
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
bind_headers[matching[0]] = matching[1];
matching.clear();
}
}
auto table_id = getStorageID();
String table_name = table_id.table_name;
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
local_exchange_name = exchange_name + "_" + table_name;
local_exchange = exchange_name + "_" + table_name;
bridge_exchange = local_exchange + "_bridge";
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
......@@ -163,6 +174,133 @@ void StorageRabbitMQ::loopingFunc()
}
void StorageRabbitMQ::initExchange()
{
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers.
*/
setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
/// Bridge exchange is needed to easily disconnect consumer queues.
setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
if (!hash_exchange)
{
consumer_exchange = bridge_exchange;
return;
}
/// Declare exchange for sharding.
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
setup_channel->declareExchange(local_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments)
.onError([&](const char * message)
{
throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
setup_channel->bindExchange(bridge_exchange, local_exchange, routing_keys[0])
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
consumer_exchange = local_exchange;
}
void StorageRabbitMQ::bindExchange()
{
std::atomic<bool> binding_created = false;
/// Bridge exchange connects client's exchange with consumers' queues.
if (exchange_type == AMQP::ExchangeType::headers)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
.onSuccess([&]()
{
binding_created = true;
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0])
.onSuccess([&]()
{
binding_created = true;
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
}
else
{
for (const auto & routing_key : routing_keys)
{
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
.onSuccess([&]()
{
binding_created = true;
})
.onError([&](const char * message)
{
throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
}
}
while (!binding_created)
{
event_handler->iterateLoop();
}
}
void StorageRabbitMQ::unbindExchange()
{
if (bridge.try_lock())
{
if (exchange_removed.load())
return;
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
exchange_removed.store(true);
})
.onError([&](const char * message)
{
throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ);
});
while (!exchange_removed)
{
event_handler->iterateLoop();
}
event_handler->stop();
looping_task->deactivate();
bridge.unlock();
}
}
Pipes StorageRabbitMQ::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
......@@ -207,6 +345,10 @@ BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadat
void StorageRabbitMQ::startup()
{
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
initExchange();
bindExchange();
for (size_t i = 0; i < num_consumers; ++i)
{
try
......@@ -288,9 +430,9 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, event_handler, exchange_name, exchange_type, routing_keys,
consumer_channel, setup_channel, event_handler, consumer_exchange, exchange_type, routing_keys,
next_channel_id, log, row_delimiter, hash_exchange, num_queues,
local_exchange_name, stream_cancelled);
local_exchange, stream_cancelled);
}
......
......@@ -54,6 +54,9 @@ public:
const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override;
const String getExchange() const { return exchange_name; }
bool checkBridge() const { return !exchange_removed.load(); }
void unbindExchange();
protected:
StorageRabbitMQ(
......@@ -77,7 +80,6 @@ private:
Names routing_keys;
const String exchange_name;
AMQP::ExchangeType exchange_type;
String local_exchange_name;
const String format_name;
char row_delimiter;
......@@ -99,10 +101,13 @@ private:
std::mutex mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
bool exchange_type_set = false;
String local_exchange, bridge_exchange, consumer_exchange;
std::mutex bridge;
AMQP::Table bind_headers;
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false;
std::atomic<bool> loop_started = false;
std::atomic<bool> loop_started = false, exchange_removed = false;
ChannelPtr setup_channel;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
......@@ -115,6 +120,8 @@ private:
void threadFunc();
void heartbeatFunc();
void loopingFunc();
void initExchange();
void bindExchange();
void pingConnection() { connection->heartbeat(); }
bool streamToViews();
......
......@@ -171,15 +171,14 @@ void WriteBufferToRabbitMQProducer::initExchange()
{
std::atomic<bool> exchange_declared = false, exchange_error = false;
producer_channel->declareExchange(exchange_name, exchange_type)
producer_channel->declareExchange(exchange_name, exchange_type, AMQP::durable + AMQP::passive)
.onSuccess([&]()
{
exchange_declared = true;
})
.onError([&](const char * message)
.onError([&](const char * /* message */)
{
exchange_error = true;
LOG_ERROR(log, "Exchange error: {}", message);
});
/// These variables are updated in a separate thread.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册