diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index f0bc813702902f4430ffdbf89b90111abeade87a..5360e750a33041fb59a0be74a75945aff32dbc60 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -61,6 +61,8 @@ #include #include +#include + namespace DB { @@ -2438,6 +2440,9 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t initChain(chain, source_columns); ExpressionActionsChain::Step & step = chain.steps.back(); + DUMP(only_types); + DUMP(StackTrace().toString()); + step.required_output.push_back(select_query->where_expression->getColumnName()); getRootActions(select_query->where_expression, only_types, false, step.actions); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 49b3ba27004b3a3c63ef047754289c23d86f0ff0..1f6aeb9d3c5e1b912a3a0650d8d23d385f70c2b1 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -676,7 +676,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); if (pipeline.streams.empty()) - pipeline.streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); + pipeline.streams.emplace_back(std::make_shared( + storage->analyze(required_columns, query_info, context, from_stage))); pipeline.transform([&](auto & stream) { diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index ef9fd8004ce142a8bfefe066e3a71efc66e4130b..b30aeb2bfb8aa43e3496ea5107fd0896322de380 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -180,6 +180,20 @@ public: throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + /** Without executing a query determine, + * what structure of result and what query processed stage + * we will get if we will call 'read' method. + */ + virtual Block analyze( + const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum & processed_stage) + { + processed_stage = QueryProcessingStage::FetchColumns; + return getSampleBlockForColumns(column_names); + } + /** Writes the data to a table. * Receives a description of the query, which can contain information about the data write method. * Returns an object by which you can write data sequentially. diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 2b1dfe6652351ab2f44fb20c91086538d11eeb96..f73f23e5a1334e0d870cc9f9254d47260f25dad2 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -197,6 +197,26 @@ BlockInputStreams StorageDistributed::read( } +Block StorageDistributed::analyze( + const Names & /*column_names*/, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage) +{ + auto cluster = getCluster(); + + const Settings & settings = context.getSettingsRef(); + + size_t result_size = (cluster->getRemoteShardCount() * settings.max_parallel_replicas) + cluster->getLocalShardCount(); + + processed_stage = result_size == 1 || settings.distributed_group_by_no_merge + ? QueryProcessingStage::Complete + : QueryProcessingStage::WithMergeableState; + + return materializeBlock(InterpreterSelectQuery(query_info.query, context, {}, processed_stage).getSampleBlock()); +} + + BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings) { auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name); diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 451c9f12d8ad4cf6fd0d6eb8af944b489dbcdb43..63fefbfe501170b457019d6e4ca07c83f3b1aec5 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -60,6 +60,12 @@ public: size_t max_block_size, unsigned num_streams) override; + Block analyze( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage) override; + BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; void drop() override {}