提交 fd9b4168 编写于 作者: K kssenii

Fix and simplify code

上级 88ece429
#include <common/logger_useful.h>
#include <Common/Exception.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
namespace DB
{
static const auto Lock_timeout = 50;
namespace ErrorCodes
{
extern const int CANNOT_CONNECT_RABBITMQ;
}
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
......@@ -23,15 +27,14 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
if (!connection->usable() || !connection->ready())
{
LOG_ERROR(log, "Connection lost completely");
throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
stop();
}
void RabbitMQHandler::startLoop()
void RabbitMQHandler::startBackgroundLoop()
{
/// stop_loop variable is updated in a separate thread
while (!stop_loop)
{
uv_run(loop, UV_RUN_NOWAIT);
......@@ -39,24 +42,13 @@ void RabbitMQHandler::startLoop()
}
void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
{
std::lock_guard lock(mutex_before_event_loop);
uv_run(loop, UV_RUN_NOWAIT);
}
void RabbitMQHandler::startProducerLoop()
{
uv_run(loop, UV_RUN_NOWAIT);
}
void RabbitMQHandler::stop()
void RabbitMQHandler::startLoop()
{
//std::lock_guard lock(mutex_before_loop_stop);
//uv_stop(loop);
stop_loop = true;
if (starting_loop.try_lock())
{
uv_run(loop, UV_RUN_NOWAIT);
starting_loop.unlock();
}
}
}
......@@ -16,22 +16,18 @@ class RabbitMQHandler : public AMQP::LibUvHandler
public:
RabbitMQHandler(uv_loop_t * evbase_, Poco::Logger * log_);
void onError(AMQP::TcpConnection * connection, const char * message) override;
void startConsumerLoop(std::atomic<bool> & loop_started);
void startProducerLoop();
void stop();
void stop() { stop_loop.store(true); }
void startBackgroundLoop();
void startLoop();
private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> stop_loop = false, running_loop = false;
std::atomic<bool> stop_loop = false;
std::timed_mutex starting_loop;
std::mutex mutex_before_event_loop;
std::mutex mutex_before_loop_stop;
};
}
......@@ -30,7 +30,7 @@ namespace ExchangeType
static const String HEADERS = "headers";
}
static const auto QUEUE_SIZE = 50000; /// Equals capacity of single rabbitmq queue
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
......@@ -60,7 +60,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
, stopped(stopped_)
, messages(QUEUE_SIZE)
, messages(QUEUE_SIZE * num_queues)
{
exchange_type_set = exchange_type != ExchangeType::DEFAULT;
......@@ -125,7 +125,7 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only
* option to avoid getting the same messages more than once - is having only one consumer with one queue, which is not good.)
* option to avoid getting the same messages more than once - is having only one consumer with one queue)
*/
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
{
......@@ -243,7 +243,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
});
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise
* if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise
* consumer might fail to subscribe and no resubscription will help.
*/
subscribe(queues.back());
......@@ -327,7 +327,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
*/
while (!default_bindings_created && !default_bindings_error || (exchange_type_set && !bindings_created && !bindings_error))
{
startEventLoop(loop_started);
startEventLoop();
}
}
......@@ -378,7 +378,7 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
/// These variables are updated in a separate thread.
while (count_subscribed != wait_subscribed && !consumer_error)
{
startEventLoop(loop_started);
startEventLoop();
}
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
......@@ -395,15 +395,9 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
}
void ReadBufferFromRabbitMQConsumer::stopEventLoop()
void ReadBufferFromRabbitMQConsumer::startEventLoop()
{
event_handler->stop();
}
void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic<bool> & loop_started)
{
event_handler->startConsumerLoop(loop_started);
event_handler->startLoop();
}
......
......@@ -44,8 +44,6 @@ public:
auto getExchange() const { return exchange_name; }
private:
using Messages = std::vector<String>;
ChannelPtr consumer_channel;
HandlerPtr event_handler;
......@@ -70,7 +68,7 @@ private:
bool local_exchange_declared = false, local_hash_exchange_declared = false;
bool exchange_type_set = false, hash_exchange = false;
std::atomic<bool> loop_started = false, consumer_error = false;
std::atomic<bool> consumer_error = false;
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
ConcurrentBoundedQueue<String> messages;
......@@ -78,21 +76,12 @@ private:
std::vector<String> queues;
std::unordered_map<String, bool> subscribed_queue;
/* Note: as all consumers share the same connection => they also share the same
* event loop, which can be started by any consumer and the loop is blocking only to the thread that
* started it, and the loop executes ALL active callbacks on the connection => in case num_consumers > 1,
* at most two threads will be present: main thread and the one that executes callbacks (1 thread if
* main thread is the one that started the loop).
*/
std::mutex mutex;
bool nextImpl() override;
void initExchange();
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void startEventLoop(std::atomic<bool> & loop_started);
void stopEventLoop();
void startEventLoop();
};
}
......@@ -138,7 +138,7 @@ void StorageRabbitMQ::heartbeatFunc()
void StorageRabbitMQ::loopingFunc()
{
LOG_DEBUG(log, "Starting event looping iterations");
event_handler->startLoop();
event_handler->startBackgroundLoop();
}
......
......@@ -18,7 +18,7 @@ namespace ErrorCodes
extern const int CANNOT_CONNECT_RABBITMQ;
}
static const auto QUEUE_SIZE = 100000;
static const auto QUEUE_SIZE = 50000;
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto LOOP_WAIT = 10;
......@@ -47,7 +47,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
, payloads(QUEUE_SIZE)
, payloads(QUEUE_SIZE * num_queues)
{
loop = std::make_unique<uv_loop_t>();
......@@ -127,6 +127,7 @@ void WriteBufferToRabbitMQProducer::countRow()
void WriteBufferToRabbitMQProducer::writingFunc()
{
String payload;
while (!stop_loop || !payloads.empty())
{
while (!payloads.empty())
......@@ -142,10 +143,7 @@ void WriteBufferToRabbitMQProducer::writingFunc()
{
producer_channel->publish(exchange_name, routing_key, payload);
}
++message_counter;
}
startEventLoop();
}
}
......@@ -183,7 +181,7 @@ void WriteBufferToRabbitMQProducer::finilizeProducer()
if (use_transactional_channel)
{
std::atomic<bool> answer_received = false;
std::atomic<bool> answer_received = false, wait_rollback = false;
producer_channel->commitTransaction()
.onSuccess([&]()
{
......@@ -193,12 +191,22 @@ void WriteBufferToRabbitMQProducer::finilizeProducer()
.onError([&](const char * message)
{
answer_received = true;
LOG_TRACE(log, "None of messages were publishd: {}", message);
/// Probably should do something here
wait_rollback = true;
LOG_TRACE(log, "Publishing not successful: {}", message);
producer_channel->rollbackTransaction()
.onSuccess([&]()
{
wait_rollback = false;
})
.onError([&](const char * message)
{
LOG_ERROR(log, "Failed to rollback transaction: {}", message);
wait_rollback = false;
});
});
size_t count_retries = 0;
while (!answer_received && ++count_retries != RETRIES_MAX)
while ((!answer_received || wait_rollback) && ++count_retries != RETRIES_MAX)
{
startEventLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_WAIT));
......@@ -217,7 +225,7 @@ void WriteBufferToRabbitMQProducer::nextImpl()
void WriteBufferToRabbitMQProducer::startEventLoop()
{
event_handler->startProducerLoop();
event_handler->startLoop();
}
}
......@@ -14,8 +14,7 @@
namespace DB
{
using ProducerPtr = std::shared_ptr<AMQP::TcpChannel>;
using Messages = std::vector<String>;
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
class WriteBufferToRabbitMQProducer : public WriteBuffer
{
......@@ -60,7 +59,7 @@ private:
std::unique_ptr<uv_loop_t> loop;
std::unique_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection;
ProducerPtr producer_channel;
ChannelPtr producer_channel;
ConcurrentBoundedQueue<String> payloads;
size_t next_queue = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册