提交 7a87da78 编写于 作者: Z zhang2014

ISSUES-3110 fix merge and distributed engine query stage

上级 92b2cc66
......@@ -346,24 +346,9 @@ public:
/// Returns sampling expression for storage or nullptr if there is no.
virtual ASTPtr getSamplingExpression() const { return nullptr; }
protected:
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context)
{
auto expected_stage = getQueryProcessingStage(context);
checkQueryProcessingStage(processed_stage, expected_stage);
}
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum expected_stage)
{
if (processed_stage != expected_stage)
throw Exception("Unexpected query processing stage for storage " + getName() +
": expected " + QueryProcessingStage::toString(expected_stage) +
", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR);
}
private:
friend class TableStructureReadLock;
......
......@@ -269,11 +269,10 @@ BlockInputStreams StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
if (num_consumers == 0)
......
......@@ -136,8 +136,6 @@ BlockInputStreams StorageBuffer::read(
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
BlockInputStreams streams_from_dst;
if (!no_destination)
......
......@@ -261,12 +261,10 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
BlockInputStreams StorageCatBoostPool::read(const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned /*threads*/)
{
checkQueryProcessingStage(processed_stage, context);
auto stream = std::make_shared<CatBoostDatasetBlockInputStream>(
data_description_file_name, "TSV", sample_block, context, max_block_size);
......
......@@ -38,11 +38,10 @@ BlockInputStreams StorageDictionary::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned /*threads*/)
{
checkQueryProcessingStage(processed_stage, context);
auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
}
......
......@@ -249,7 +249,6 @@ BlockInputStreams StorageDistributed::read(
const unsigned /*num_streams*/)
{
auto cluster = getCluster();
checkQueryProcessingStage(processed_stage, getQueryProcessingStage(context, cluster));
const Settings & settings = context.getSettingsRef();
......
......@@ -190,11 +190,10 @@ BlockInputStreams StorageFile::read(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
}
......
......@@ -572,11 +572,10 @@ BlockInputStreams StorageLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
loadMarks();
......
......@@ -83,12 +83,11 @@ StorageMemory::StorageMemory(String table_name_, ColumnsDescription columns_desc
BlockInputStreams StorageMemory::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
std::lock_guard<std::mutex> lock(mutex);
......
......@@ -141,7 +141,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator(context);
bool first = true;
size_t selected_table_size = 0;
while (iterator->isValid())
{
......@@ -149,23 +149,14 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
{
auto & table = iterator->table();
if (table.get() != this)
{
auto stage = table->getQueryProcessingStage(context);
if (first)
stage_in_source_tables = stage;
else if (stage != stage_in_source_tables)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
first = false;
}
++selected_table_size;
}
iterator->next();
}
return stage_in_source_tables;
auto fetch_or_mergeable_stage = std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
return selected_table_size == 1 ? stage_in_source_tables : fetch_or_mergeable_stage;
}
......@@ -202,12 +193,6 @@ BlockInputStreams StorageMerge::read(
for (const auto & elem : selected_tables)
{
/// Check processing stage again in case new table was added after getQueryProcessingStage call.
auto stage = elem.first->getQueryProcessingStage(context);
if (stage != processed_stage)
throw Exception("Source tables for Merge table are processing data up to different stages",
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
/// If PREWHERE is used in query, you need to make sure that all tables support this.
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
if (!elem.first->supportsPrewhere())
......
......@@ -110,11 +110,10 @@ BlockInputStreams StorageMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0);
}
......
......@@ -51,12 +51,11 @@ BlockInputStreams StorageMySQL::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
String query = transformQueryForExternalDatabase(
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::Backticks, remote_database_name, remote_table_name, context);
......
......@@ -23,12 +23,11 @@ public:
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum processing_stage,
const Context & /*context*/,
QueryProcessingStage::Enum /*processing_stage*/,
size_t,
unsigned) override
{
checkQueryProcessingStage(processing_stage, context);
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
}
......
......@@ -80,7 +80,6 @@ BlockInputStreams StorageODBC::read(const Names & column_names,
unsigned num_streams)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
odbc_bridge_helper.startODBCBridgeSync();
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
......
......@@ -2929,11 +2929,10 @@ BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
const Settings & settings = context.getSettingsRef();
/** The `select_sequential_consistency` setting has two meanings:
......
......@@ -235,11 +235,10 @@ BlockInputStreams StorageStripeLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
unsigned num_streams)
{
checkQueryProcessingStage(processed_stage, context);
std::shared_lock<std::shared_mutex> lock(rwlock);
check(column_names);
......
......@@ -384,11 +384,10 @@ BlockInputStreams StorageTinyLog::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
......
......@@ -160,8 +160,6 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
size_t max_block_size,
unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
auto request_uri = uri;
auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
for (const auto & [param, value] : params)
......
......@@ -41,12 +41,10 @@ BlockInputStreams StorageView::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
BlockInputStreams res;
ASTPtr & current_inner_query = inner_query;
......
......@@ -33,12 +33,11 @@ public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
Block sample_block = getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();
......
......@@ -198,11 +198,10 @@ BlockInputStreams StorageSystemColumns::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
/// Create a mask of what columns are needed in the result.
......
......@@ -53,13 +53,12 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multi
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
if (limit && limit < max_block_size)
{
......
......@@ -20,13 +20,12 @@ StorageSystemOne::StorageSystemOne(const std::string & name_)
BlockInputStreams StorageSystemOne::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(
Block{ColumnWithTypeAndName(
......
......@@ -237,12 +237,11 @@ BlockInputStreams StorageSystemPartsBase::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
bool has_state_column = hasStateColumn(column_names);
checkQueryProcessingStage(processed_stage, context);
StoragesInfoStream stream(query_info, context, has_state_column);
......
......@@ -55,12 +55,11 @@ BlockInputStreams StorageSystemReplicas::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
checkQueryProcessingStage(processed_stage, context);
/// We collect a set of replicated tables.
std::map<String, std::map<String, StoragePtr>> replicated_tables;
......
......@@ -258,11 +258,10 @@ BlockInputStreams StorageSystemTables::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned /*num_streams*/)
{
checkQueryProcessingStage(processed_stage, context);
check(column_names);
/// Create a mask of what columns are needed in the result.
......
DROP TABLE IF EXISTS test.test_local_1;
DROP TABLE IF EXISTS test.test_local_2;
DROP TABLE IF EXISTS test.test_distributed_1;
DROP TABLE IF EXISTS test.test_distributed_2;
CREATE TABLE test.test_local_1 (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE test.test_local_2 (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE test.test_distributed_1 AS test.test_local_1 ENGINE = Distributed('test_shard_localhost', 'test', test_local_1, rand());
CREATE TABLE test.test_distributed_2 AS test.test_local_2 ENGINE = Distributed('test_shard_localhost', 'test', test_local_2, rand());
INSERT INTO test.test_local_1 VALUES ('2018-08-01',100);
INSERT INTO test.test_local_2 VALUES ('2018-08-01',200);
SELECT sum(value) FROM merge('test', 'test_local_1|test_distributed_2');
SELECT sum(value) FROM merge('test', 'test_distributed_1|test_distributed_2');
DROP TABLE IF EXISTS test.test_local_1;
DROP TABLE IF EXISTS test.test_local_2;
DROP TABLE IF EXISTS test.test_distributed_1;
DROP TABLE IF EXISTS test.test_distributed_2;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册