提交 96df2e6b 编写于 作者: A alesapin

Better shutdown and conversion

上级 64583ceb
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/RabbitMQ/RabbitMQBlockInputStream.h>
......@@ -152,11 +150,7 @@ Block RabbitMQBlockInputStream::readImpl()
result_block.insert(column);
}
return ConvertingBlockInputStream(
std::make_shared<OneBlockInputStream>(result_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
return result_block;
}
}
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
......@@ -153,11 +154,14 @@ Pipes StorageRabbitMQ::read(
Pipes pipes;
pipes.reserve(num_created_consumers);
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
pipes.emplace_back(
std::make_shared<SourceFromInputStream>(std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names, log)));
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names, log);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
}
if (!loop_started)
......@@ -202,16 +206,16 @@ void StorageRabbitMQ::shutdown()
{
stream_cancelled = true;
for (size_t i = 0; i < num_created_consumers; ++i)
{
popReadBuffer();
}
event_handler->stop();
looping_task->deactivate();
streaming_task->deactivate();
heartbeat_task->deactivate();
event_handler->stop();
looping_task->deactivate();
for (size_t i = 0; i < num_created_consumers; ++i)
{
popReadBuffer();
}
connection->close();
}
......@@ -355,17 +359,22 @@ bool StorageRabbitMQ::streamToViews()
BlockInputStreams streams;
streams.reserve(num_created_consumers);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<RabbitMQBlockInputStream>(*this, getInMemoryMetadataPtr(), rabbitmq_context, block_io.out->getHeader().getNames(), log);
streams.emplace_back(stream);
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(*this, metadata_snapshot, rabbitmq_context, column_names, log);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
streams.emplace_back(converting_stream);
// Limit read batch to maximum block size to allow DDL
IBlockInputStream::LocalLimits limits;
const Settings & settings = global_context.getSettingsRef();
limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
stream->setLimits(limits);
rabbit_stream->setLimits(limits);
}
if (!loop_started)
......
......@@ -816,7 +816,7 @@ def test_rabbitmq_insert(rabbitmq_cluster):
channel.stop_consuming()
consumer.basic_qos(prefetch_count=50)
consumer.basic_consume(onReceived, queue_name)
consumer.basic_consume(queue_name, onReceived)
consumer.start_consuming()
consumer_connection.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册