提交 e1ef558a 编写于 作者: K kssenii

Fixes

上级 6682c62a
......@@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
......@@ -49,7 +50,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, row_delimiter(row_delimiter_)
, queue_size(queue_size_)
, stopped(stopped_)
, received(queue_size)
, received(queue_size * num_queues)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);
......@@ -101,7 +102,7 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::LOGICAL_ERROR);
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
......@@ -220,8 +221,6 @@ void ReadBufferFromRabbitMQConsumer::setupChannel()
consumer_channel->onError([&](const char * message)
{
/// If here, then fatal error occured on the channel and it is not usable anymore, need to close it
consumer_channel->close();
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
......
......@@ -84,7 +84,8 @@ StorageRabbitMQ::StorageRabbitMQ(
, persistent(rabbitmq_settings->rabbitmq_persistent.value)
, hash_exchange(num_consumers > 1 || num_queues > 1)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, parsed_address(parseAddress(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value), 5672))
, address(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_host_port.value))
, parsed_address(parseAddress(address, 5672))
, login_password(std::make_pair(
global_context.getConfigRef().getString("rabbitmq.username"),
global_context.getConfigRef().getString("rabbitmq.password")))
......@@ -101,7 +102,7 @@ StorageRabbitMQ::StorageRabbitMQ(
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
throw Exception("Cannot connect to RabbitMQ " + address, ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
StorageInMemoryMetadata storage_metadata;
......@@ -232,16 +233,15 @@ void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task,
if (stop_loop)
event_handler->updateLoopState(Loop::STOP);
if (task_mutex.try_lock())
std::unique_lock<std::mutex> lk(task_mutex, std::defer_lock);
if (lk.try_lock())
{
task->deactivate();
task_mutex.unlock();
lk.unlock();
}
else if (wait)
else if (wait) /// Wait only if deactivating from shutdown
{
/// Wait only if deactivating from shutdown
std::lock_guard lock(task_mutex);
lk.lock();
task->deactivate();
}
}
......@@ -272,7 +272,7 @@ void StorageRabbitMQ::initExchange()
* specified its own settings, which differ from this implementation.
*/
throw Exception("Unable to declare exchange (1). Make sure specified exchange is not already declared. Error: "
+ std::string(message), ErrorCodes::LOGICAL_ERROR);
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
/// Bridge exchange is needed to easily disconnect consumer queues and also simplifies queue bindings
......@@ -303,7 +303,7 @@ void StorageRabbitMQ::initExchange()
* to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
* is bad.
*/
throw Exception("Unable to declare exchange (3). Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unable to declare exchange (3). Reason: " + std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
......@@ -397,7 +397,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
if (!connection->closed())
connection->close(true);
LOG_TRACE(log, "Trying to restore consumer connection");
LOG_TRACE(log, "Trying to restore connection to " + address);
}
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(),
......@@ -475,7 +475,7 @@ Pipe StorageRabbitMQ::read(
auto block_size = getMaxBlockSize();
bool update_channels = false;
if (!connection->usable())
if (!event_handler->connectionRunning())
{
if (event_handler->loopRunning())
deactivateTask(looping_task, false, true);
......@@ -498,7 +498,7 @@ Pipe StorageRabbitMQ::read(
* close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
* and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
*/
if ((update_channels || rabbit_stream->needChannelUpdate()) && connection->usable())
if ((update_channels || rabbit_stream->needChannelUpdate()) && event_handler->connectionRunning())
{
if (event_handler->loopRunning())
{
......@@ -713,9 +713,9 @@ bool StorageRabbitMQ::streamToViews()
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
/* Need to use event_handler->connectionRunning() because connection might have failed and to start error callbacks need to start
* the loop, so it is important not to use connection->usable() method here. And need to use connection->usable() method in cases
* when loop is deactivated and connection check is needed.
/* event_handler->connectionRunning() does not guarantee that connnection is not closed in case loop was not running before, but
* need to anyway start the loop to activate error callbacks and update connection state, because even checking with
* connection->usable() will not give correct answer before callbacks are activated.
*/
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
......@@ -795,7 +795,10 @@ bool StorageRabbitMQ::streamToViews()
*/
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
{
if (connection->usable())
/// Iterate loop to activate error callbacks if they happened
event_handler->iterateLoop();
if (event_handler->connectionRunning())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
* connection is not closed - also need to restore channels
......
......@@ -92,6 +92,7 @@ private:
bool hash_exchange;
Poco::Logger * log;
String address;
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
......
......@@ -67,7 +67,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
throw Exception("Cannot connect to RabbitMQ host: " + parsed_address.first + ", port: " + std::to_string(parsed_address.second),
ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册