未验证 提交 2f7e599a 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #17695 from vitlibar/fix-test_rabbitmq_csv_with_delimiter-20.12

Backport to 20.12: Fix test_rabbitmq_csv_with_delimiter
......@@ -16,7 +16,7 @@ namespace DB
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
......@@ -54,7 +54,7 @@ Block RabbitMQBlockInputStream::getHeader() const
void RabbitMQBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
}
......@@ -96,7 +96,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
......
......@@ -15,7 +15,7 @@ public:
RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
......@@ -37,7 +37,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
const Context & context;
std::shared_ptr<Context> context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;
......
......@@ -74,7 +74,6 @@ StorageRabbitMQ::StorageRabbitMQ(
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, rabbitmq_settings(std::move(rabbitmq_settings_))
, exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
, format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
......@@ -114,8 +113,8 @@ StorageRabbitMQ::StorageRabbitMQ(
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
rabbitmq_context.makeQueryContext();
rabbitmq_context = addSettings(rabbitmq_context);
rabbitmq_context = addSettings(global_context);
rabbitmq_context->makeQueryContext();
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
......@@ -193,16 +192,17 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i
}
Context StorageRabbitMQ::addSettings(Context context) const
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
{
context.setSetting("input_format_skip_unknown_fields", true);
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
auto modified_context = std::make_shared<Context>(context);
modified_context->setSetting("input_format_skip_unknown_fields", true);
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
if (!schema_name.empty())
context.setSetting("format_schema", schema_name);
modified_context->setSetting("format_schema", schema_name);
return context;
return modified_context;
}
......@@ -538,6 +538,7 @@ Pipe StorageRabbitMQ::read(
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
......@@ -581,7 +582,9 @@ Pipe StorageRabbitMQ::read(
looping_task->activateAndSchedule();
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
auto united_pipe = Pipe::unitePipes(std::move(pipes));
united_pipe.addInterpreterContext(modified_context);
return united_pipe;
}
......@@ -785,7 +788,7 @@ bool StorageRabbitMQ::streamToViews()
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();
......
......@@ -73,7 +73,7 @@ protected:
private:
const Context & global_context;
Context rabbitmq_context;
std::shared_ptr<Context> rabbitmq_context;
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
const String exchange_name;
......@@ -135,7 +135,7 @@ private:
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
Context addSettings(Context context) const;
std::shared_ptr<Context> addSettings(const Context & context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);
......
......@@ -123,7 +123,7 @@ def rabbitmq_setup_teardown():
# Tests
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_select(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
......@@ -159,7 +159,7 @@ def test_rabbitmq_select(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_select_empty(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
......@@ -173,7 +173,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
assert int(instance.query('SELECT count() FROM test.rabbitmq')) == 0
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
......@@ -215,7 +215,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
......@@ -250,7 +250,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
......@@ -285,7 +285,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_materialized_view(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
......@@ -328,7 +328,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
......@@ -371,7 +371,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view1;
......@@ -426,7 +426,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
@pytest.mark.skip(reason="clichouse_path with rabbitmq.proto fails to be exported")
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_protobuf(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册