提交 8266715c 编写于 作者: K kssenii

Fix build & fix style

上级 0362bb2d
......@@ -64,8 +64,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, 1);
auto input_format = FormatFactory::instance().getInputFormat(storage.getFormatName(), *buffer, non_virtual_header, context, 1);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
......
......@@ -12,7 +12,7 @@ RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) :
}
void RabbitMQHandler::onError(AMQP::TcpConnection * , const char * message)
void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message)
{
LOG_ERROR(log, "Library error report: {}", message);
stop();
......
#include <utility>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <common/logger_useful.h>
......
......@@ -45,10 +45,8 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
......@@ -157,6 +155,7 @@ void StorageRabbitMQ::shutdown()
popReadBuffer();
}
connection.close();
task->deactivate();
}
......@@ -201,8 +200,10 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
next_channel_id += num_queues;
update_channel_id = true;
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(&connection);
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
std::make_shared<AMQP::TcpChannel>(&connection), eventHandler, exchange_name, routing_key, next_channel_id,
consumer_channel, eventHandler, exchange_name, routing_key, next_channel_id,
log, row_delimiter, bind_by_id, hash_exchange, num_queues, stream_cancelled);
}
......@@ -460,7 +461,8 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
}
return StorageRabbitMQ::create(args.table_id, args.context, args.columns, host_port, routing_key, exchange,
return StorageRabbitMQ::create(
args.table_id, args.context, args.columns, host_port, routing_key, exchange,
format, row_delimiter, num_consumers, num_queues, hash_exchange);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册