diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index a1442f91fbe015022dafd7c093491d4126f83ec6..e10a4eb0f968f9d8f448e69e927a86f0b4e97ac1 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -1,5 +1,3 @@ -#include -#include #include #include #include @@ -152,11 +150,7 @@ Block RabbitMQBlockInputStream::readImpl() result_block.insert(column); } - return ConvertingBlockInputStream( - std::make_shared(result_block), - getHeader(), - ConvertingBlockInputStream::MatchColumnsMode::Name) - .read(); + return result_block; } } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 7426e939bece115dd159431ccb85403930515a6e..00da53e7909e2ce5708e642b79e7ca1bc597dfe4 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -153,11 +154,14 @@ Pipes StorageRabbitMQ::read( Pipes pipes; pipes.reserve(num_created_consumers); + auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); for (size_t i = 0; i < num_created_consumers; ++i) { - pipes.emplace_back( - std::make_shared(std::make_shared( - *this, metadata_snapshot, context, column_names, log))); + auto rabbit_stream = std::make_shared( + *this, metadata_snapshot, context, column_names, log); + auto converting_stream = std::make_shared( + rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); + pipes.emplace_back(std::make_shared(converting_stream)); } if (!loop_started) @@ -202,16 +206,16 @@ void StorageRabbitMQ::shutdown() { stream_cancelled = true; - for (size_t i = 0; i < num_created_consumers; ++i) - { - popReadBuffer(); - } + event_handler->stop(); + looping_task->deactivate(); streaming_task->deactivate(); heartbeat_task->deactivate(); - event_handler->stop(); - looping_task->deactivate(); + for (size_t i = 0; i < num_created_consumers; ++i) + { + popReadBuffer(); + } connection->close(); } @@ -355,17 +359,22 @@ bool StorageRabbitMQ::streamToViews() BlockInputStreams streams; streams.reserve(num_created_consumers); + auto metadata_snapshot = getInMemoryMetadataPtr(); + auto column_names = block_io.out->getHeader().getNames(); + auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); for (size_t i = 0; i < num_created_consumers; ++i) { - auto stream = std::make_shared(*this, getInMemoryMetadataPtr(), rabbitmq_context, block_io.out->getHeader().getNames(), log); - streams.emplace_back(stream); + auto rabbit_stream = std::make_shared(*this, metadata_snapshot, rabbitmq_context, column_names, log); + auto converting_stream = std::make_shared(rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name); + + streams.emplace_back(converting_stream); // Limit read batch to maximum block size to allow DDL IBlockInputStream::LocalLimits limits; const Settings & settings = global_context.getSettingsRef(); limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms; limits.timeout_overflow_mode = OverflowMode::BREAK; - stream->setLimits(limits); + rabbit_stream->setLimits(limits); } if (!loop_started) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 42b7101f9c6f708907b2e3b047912d5023129b1d..6da7239fc94c0c588bd2ba99488477e918fe9b8d 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -816,7 +816,7 @@ def test_rabbitmq_insert(rabbitmq_cluster): channel.stop_consuming() consumer.basic_qos(prefetch_count=50) - consumer.basic_consume(onReceived, queue_name) + consumer.basic_consume(queue_name, onReceived) consumer.start_consuming() consumer_connection.close()