From c5cb05f5f33238d7561e953e9bca1dc494e5e1d6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 7 Oct 2020 14:26:29 +0300 Subject: [PATCH] Try fix tests. --- src/Processors/QueryPlan/QueryPlan.cpp | 11 +++++++++++ src/Processors/QueryPlan/QueryPlan.h | 5 +++++ src/Storages/MergeTree/StorageFromMergeTreeDataPart.h | 5 +---- src/Storages/StorageBuffer.cpp | 2 +- src/Storages/StorageDistributed.cpp | 2 +- src/Storages/StorageMaterializedView.cpp | 2 +- src/Storages/StorageMergeTree.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageView.cpp | 2 +- 9 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 1ff844480a..e681d4989d 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -185,6 +185,17 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline() return last_pipeline; } +Pipe QueryPlan::convertToPipe() +{ + if (!isInitialized()) + return {}; + + if (isCompleted()) + throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR); + + return QueryPipeline::getPipe(std::move(*buildQueryPipeline())); +} + void QueryPlan::addInterpreterContext(std::shared_ptr context) { interpreter_context.emplace_back(std::move(context)); diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 4f558e04c5..8aa0e868dc 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -20,6 +20,8 @@ class WriteBuffer; class QueryPlan; using QueryPlanPtr = std::unique_ptr; +class Pipe; + /// A tree of query steps. /// The goal of QueryPlan is to build QueryPipeline. /// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations. @@ -42,6 +44,9 @@ public: QueryPipelinePtr buildQueryPipeline(); + /// If initialized, build pipeline and convert to pipe. Otherwise, return empty pipe. + Pipe convertToPipe(); + struct ExplainPlanOptions { /// Add output header to step. diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index 769e82155b..7c30c7302b 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -33,10 +33,7 @@ public: std::move(*MergeTreeDataSelectExecutor(part->storage) .readFromParts({part}, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams)); - if (!query_plan.isInitialized()) - return {}; - - return QueryPipeline::getPipe(std::move(*query_plan.buildQueryPipeline())); + return query_plan.convertToPipe(); } diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 182087c24a..f6e728ebbe 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -160,7 +160,7 @@ Pipe StorageBuffer::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } void StorageBuffer::read( diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 6e7d7ea41b..0b3070c88f 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -505,7 +505,7 @@ Pipe StorageDistributed::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } void StorageDistributed::read( diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 69669c0b68..9ea4a5d91d 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -117,7 +117,7 @@ Pipe StorageMaterializedView::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } void StorageMaterializedView::read( diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 35b147f22f..6c0b082c6a 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -195,7 +195,7 @@ Pipe StorageMergeTree::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } std::optional StorageMergeTree::totalRows() const diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8e2ecf1f18..ef06d27101 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3551,7 +3551,7 @@ Pipe StorageReplicatedMergeTree::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index e71228f2a2..949e7930cc 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -61,7 +61,7 @@ Pipe StorageView::read( { QueryPlan plan; read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); + return plan.convertToPipe(); } void StorageView::read( -- GitLab