未验证 提交 b5d25596 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #17709 from kssenii/better-startup

rabbitmq: fix startup with no connection
......@@ -61,15 +61,10 @@ void RabbitMQBlockInputStream::readPrefixImpl()
bool RabbitMQBlockInputStream::needChannelUpdate()
{
if (!buffer || !buffer->isChannelUpdateAllowed())
if (!buffer)
return false;
if (buffer->isChannelError())
return true;
ChannelPtr channel = buffer->getChannel();
return !channel || !channel->usable();
return buffer->needChannelUpdate();
}
......@@ -80,8 +75,8 @@ void RabbitMQBlockInputStream::updateChannel()
buffer->updateAckTracker();
storage.updateChannel(buffer->getChannel());
buffer->setupChannel();
if (storage.updateChannel(buffer->getChannel()))
buffer->setupChannel();
}
......
......@@ -24,6 +24,7 @@ public:
String getName() const override { return storage.getName(); }
Block getHeader() const override;
ConsumerBufferPtr getBuffer() { return buffer; }
void readPrefixImpl() override;
Block readImpl() override;
......
......@@ -35,7 +35,8 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, stopped(stopped_)
, received(queue_size_)
{
setupChannel();
if (consumer_channel)
setupChannel();
}
......@@ -147,6 +148,15 @@ void ReadBufferFromRabbitMQConsumer::setupChannel()
}
bool ReadBufferFromRabbitMQConsumer::needChannelUpdate()
{
if (wait_subscription)
return false;
return channel_error || !consumer_channel || !consumer_channel->usable();
}
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
{
event_handler->iterateLoop();
......
......@@ -53,14 +53,14 @@ public:
AckTracker track;
};
bool isConsumerStopped() { return stopped; }
bool isChannelError() { return channel_error; }
/// Do not allow to update channel if current channel is not properly set up and subscribed
bool isChannelUpdateAllowed() { return !wait_subscription; }
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }
bool isConsumerStopped() { return stopped; }
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
......
......@@ -47,8 +47,8 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CONNECT_RABBITMQ;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int CANNOT_CONNECT_RABBITMQ;
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
......@@ -100,14 +100,7 @@ StorageRabbitMQ::StorageRabbitMQ(
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
if (!restoreConnection(false))
{
if (!connection->closed())
connection->close(true);
throw Exception("Cannot connect to RabbitMQ " + address, ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
restoreConnection(false);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......@@ -124,6 +117,9 @@ StorageRabbitMQ::StorageRabbitMQ(
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
connection_task = global_context.getSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
connection_task->deactivate();
if (queue_base.empty())
{
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
......@@ -213,6 +209,15 @@ void StorageRabbitMQ::loopingFunc()
}
void StorageRabbitMQ::connectionFunc()
{
if (restoreConnection(true))
initRabbitMQ();
else
connection_task->scheduleAfter(RESCHEDULE_MS);
}
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
*/
......@@ -243,6 +248,23 @@ size_t StorageRabbitMQ::getMaxBlockSize() const
}
void StorageRabbitMQ::initRabbitMQ()
{
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
initExchange();
bindExchange();
for (const auto i : ext::range(0, num_queues))
bindQueue(i + 1);
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_is_ready = true;
setup_channel->close();
}
void StorageRabbitMQ::initExchange()
{
/* Binding scheme is the following: client's exchange -> key bindings by routing key list -> bridge exchange (fanout) ->
......@@ -293,7 +315,8 @@ void StorageRabbitMQ::initExchange()
* is bad.
*/
throw Exception(
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE, "Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
"Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
});
setup_channel->bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
......@@ -333,9 +356,7 @@ void StorageRabbitMQ::bindExchange()
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
exchange_name, bridge_exchange, std::string(message));
});
}
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
......@@ -347,9 +368,7 @@ void StorageRabbitMQ::bindExchange()
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
exchange_name, bridge_exchange, std::string(message));
});
}
else
......@@ -368,9 +387,7 @@ void StorageRabbitMQ::bindExchange()
throw Exception(
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
exchange_name,
bridge_exchange,
std::string(message));
exchange_name, bridge_exchange, std::string(message));
});
}
}
......@@ -478,9 +495,16 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
}
void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
if (event_handler->connectionRunning())
{
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return true;
}
channel = nullptr;
return false;
}
......@@ -532,22 +556,21 @@ Pipe StorageRabbitMQ::read(
size_t /* max_block_size */,
unsigned /* num_streams */)
{
if (!rabbit_is_ready)
throw Exception("RabbitMQ setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
if (num_created_consumers == 0)
return {};
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
if (!event_handler->connectionRunning())
{
if (event_handler->loopRunning())
deactivateTask(looping_task, false, true);
update_channels = restoreConnection(true);
restoreConnection(true);
}
Pipes pipes;
......@@ -558,21 +581,6 @@ Pipe StorageRabbitMQ::read(
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, modified_context, column_names, block_size);
/* It is a possible but rare case when channel gets into error state and does not also close connection, so need manual update.
* But I believe that in current context and with local rabbitmq settings this will never happen and any channel error will also
* close connection, but checking anyway (in second condition of if statement). This must be done here (and also in streamToViews())
* and not in readPrefix as it requires to stop heartbeats and looping tasks to avoid race conditions inside the library
*/
if (event_handler->connectionRunning() && (update_channels || rabbit_stream->needChannelUpdate()))
{
if (event_handler->loopRunning())
{
deactivateTask(looping_task, false, true);
}
rabbit_stream->updateChannel();
}
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
......@@ -596,16 +604,10 @@ BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadat
void StorageRabbitMQ::startup()
{
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
initExchange();
bindExchange();
for (size_t i = 1; i <= num_queues; ++i)
{
bindQueue(i);
}
setup_channel->close();
if (event_handler->connectionRunning())
initRabbitMQ();
else
connection_task->activateAndSchedule();
for (size_t i = 0; i < num_consumers; ++i)
{
......@@ -633,6 +635,7 @@ void StorageRabbitMQ::shutdown()
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
deactivateTask(connection_task, true, false);
connection->close();
......@@ -685,7 +688,9 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo
ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
{
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
ChannelPtr consumer_channel;
if (event_handler->connectionRunning())
consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, event_handler, queues, ++consumer_id,
......@@ -732,42 +737,45 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
void StorageRabbitMQ::streamingToViewsFunc()
{
try
if (rabbit_is_ready && (event_handler->connectionRunning() || restoreConnection(true)))
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
if (dependencies_count)
try
{
auto start_time = std::chrono::steady_clock::now();
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0)
{
if (!checkDependencies(table_id))
break;
auto table_id = getStorageID();
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
// Check if at least one direct dependency is attached
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
if (streamToViews())
break;
if (dependencies_count)
{
auto start_time = std::chrono::steady_clock::now();
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0)
{
event_handler->updateLoopState(Loop::STOP);
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
if (!checkDependencies(table_id))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
if (streamToViews())
break;
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
event_handler->updateLoopState(Loop::STOP);
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// Wait for attached views
......@@ -866,6 +874,21 @@ bool StorageRabbitMQ::streamToViews()
if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
++queue_empty;
if (stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
{
auto buffer = stream->as<RabbitMQBlockInputStream>()->getBuffer();
if (buffer)
{
if (buffer->queuesCount() != queues.size())
buffer->updateQueues(queues);
buffer->updateAckTracker();
if (updateChannel(buffer->getChannel()))
buffer->setupChannel();
}
}
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
......@@ -882,19 +905,8 @@ bool StorageRabbitMQ::streamToViews()
{
/// Iterate loop to activate error callbacks if they happened
event_handler->iterateLoop();
if (event_handler->connectionRunning())
{
/* Almost any error with channel will lead to connection closure, but if so happens that channel errored and
* connection is not closed - also need to restore channels
*/
if (!stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
if (!event_handler->connectionRunning())
break;
}
}
event_handler->iterateLoop();
......
......@@ -62,7 +62,8 @@ public:
void unbindExchange();
bool exchangeRemoved() { return exchange_removed.load(); }
void updateChannel(ChannelPtr & channel);
bool updateChannel(ChannelPtr & channel);
void updateQueues(std::vector<String> & queues_) { queues_ = queues; }
protected:
StorageRabbitMQ(
......@@ -112,7 +113,7 @@ private:
size_t consumer_id = 0; /// counter for consumer buffer, needed for channel id
std::atomic<size_t> producer_id = 1; /// counter for producer buffer, needed for channel id
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false;
std::atomic<bool> exchange_removed = false, rabbit_is_ready = false;
ChannelPtr setup_channel;
std::vector<String> queues;
......@@ -120,6 +121,7 @@ private:
std::mutex task_mutex;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder looping_task;
BackgroundSchedulePool::TaskHolder connection_task;
std::atomic<bool> stream_cancelled{false};
size_t read_attempts = 0;
......@@ -128,8 +130,8 @@ private:
/// Functions working in the background
void streamingToViewsFunc();
void heartbeatFunc();
void loopingFunc();
void connectionFunc();
static Names parseRoutingKeys(String routing_key_list);
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
......@@ -139,6 +141,7 @@ private:
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
void initRabbitMQ();
void initExchange();
void bindExchange();
void bindQueue(size_t queue_id);
......
......@@ -1862,6 +1862,56 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
assert result == 1, 'Messages from RabbitMQ get duplicated!'
@pytest.mark.timeout(420)
def test_rabbitmq_no_connection_at_startup(rabbitmq_cluster):
# no connection when table is initialized
rabbitmq_cluster.pause_container('rabbitmq1')
instance.query('''
CREATE TABLE test.cs (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = '5',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.cs;
''')
time.sleep(5)
rabbitmq_cluster.unpause_container('rabbitmq1')
# need to make sure rabbit table made all rabbit setup
time.sleep(10)
messages_num = 1000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
for i in range(messages_num):
message = json.dumps({'key': i, 'value': i})
channel.basic_publish(exchange='cs', routing_key='', body=message,
properties=pika.BasicProperties(delivery_mode=2, message_id=str(i)))
connection.close()
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.cs;
''')
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册