From 8266715c492749e035f4bd764f5c75d3620af73e Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 31 May 2020 08:39:22 +0000 Subject: [PATCH] Fix build & fix style --- src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp | 3 +-- src/Storages/RabbitMQ/RabbitMQHandler.cpp | 2 +- .../RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp | 3 +++ src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 10 ++++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index d498a36f95..86d760be54 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -64,8 +64,7 @@ Block RabbitMQBlockInputStream::readImpl() MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, context, 1); + auto input_format = FormatFactory::instance().getInputFormat(storage.getFormatName(), *buffer, non_virtual_header, context, 1); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 1f6e9ce1bb..cebe8ee3c3 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -12,7 +12,7 @@ RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) : } -void RabbitMQHandler::onError(AMQP::TcpConnection * , const char * message) +void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message) { LOG_ERROR(log, "Library error report: {}", message); stop(); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index d6da585047..d6372dfe4d 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include #include #include diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index ed486e8e70..5f7570dd8c 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -45,10 +45,8 @@ namespace DB namespace ErrorCodes { - extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } @@ -157,6 +155,7 @@ void StorageRabbitMQ::shutdown() popReadBuffer(); } + connection.close(); task->deactivate(); } @@ -201,8 +200,10 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() next_channel_id += num_queues; update_channel_id = true; + ChannelPtr consumer_channel = std::make_shared(&connection); + return std::make_shared( - std::make_shared(&connection), eventHandler, exchange_name, routing_key, next_channel_id, + consumer_channel, eventHandler, exchange_name, routing_key, next_channel_id, log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled); } @@ -460,7 +461,8 @@ void registerStorageRabbitMQ(StorageFactory & factory) } } - return StorageRabbitMQ::create(args.table_id, args.context, args.columns, host_port, routing_key, exchange, + return StorageRabbitMQ::create( + args.table_id, args.context, args.columns, host_port, routing_key, exchange, format, row_delimiter, num_consumers, num_queues, hash_exchange); }; -- GitLab