From 4c179e454a1a9a1beda02223de79eef225654d3e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 8 Jun 2020 12:14:58 +0300 Subject: [PATCH] Add QueryPlan. --- src/Processors/QueryPlan/IQueryPlanStep.h | 1 + src/Processors/QueryPlan/QueryPlan.cpp | 112 ++++++++++++++++++++++ src/Processors/QueryPlan/QueryPlan.h | 45 +++++++++ src/Processors/ya.make | 1 + 4 files changed, 159 insertions(+) create mode 100644 src/Processors/QueryPlan/QueryPlan.cpp create mode 100644 src/Processors/QueryPlan/QueryPlan.h diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index fe84e49672..0c3b0727b0 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -51,4 +51,5 @@ protected: std::optional output_stream; }; +using QueryPlanStepPtr = std::unique_ptr; } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp new file mode 100644 index 0000000000..82bc421082 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -0,0 +1,112 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +void QueryPlan::checkInitialized() const +{ + if (!isInitialized()) + throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR); +} + +void QueryPlan::checkNotCompleted() const +{ + if (isCompleted()) + throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR); +} + +bool QueryPlan::isCompleted() const +{ + return isInitialized() && !root->step->hasOutputStream(); +} + +const DataStream & QueryPlan::getCurrentDataStream() const +{ + checkInitialized(); + checkNotCompleted(); + return root->step->getOutputStream(); +} + +void QueryPlan::addStep(QueryPlanStepPtr step) +{ + checkNotCompleted(); + + size_t num_input_streams = step->getInputStreams().size(); + + if (num_input_streams == 0) + { + if (isInitialized()) + throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " + "step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR); + + nodes.emplace_back(Node{.step = std::move(step)}); + return; + } + + if (num_input_streams == 1) + { + if (!isInitialized()) + throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " + "step has input, but QueryPlan is not initialised", ErrorCodes::LOGICAL_ERROR); + + const auto & root_header = root->step->getOutputStream().header; + const auto & step_header = step->getInputStreams().front().header; + if (!blocksHaveEqualStructure(root_header, step_header)) + throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " + "it has incompatible header with root step " + root->step->getName() + " " + "root header: " + root_header.dumpStructure() + + "step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR); + + nodes.emplace_back(Node{.step = std::move(step), .children = {root}}); + root = &nodes.back(); + return; + } + + throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " + + std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) + + " input expected", ErrorCodes::LOGICAL_ERROR); +} + +QueryPipelinePtr QueryPlan::buildQueryPipeline() +{ + checkInitialized(); + + struct Frame + { + Node * node; + QueryPipelines pipelines; + }; + + QueryPipelinePtr last_pipeline; + + std::stack stack; + stack.push({.node = root}); + + while (!stack.empty()) + { + auto & frame = stack.top(); + + if (last_pipeline) + frame.pipelines.emplace_back(std::move(last_pipeline)); + + size_t next_child = frame.pipelines.size(); + if (next_child == frame.node->children.size()) + { + last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines)); + stack.pop(); + } + else + stack.push({.node = frame.node->children[next_child]}); + } + + return last_pipeline; +} + +} diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h new file mode 100644 index 0000000000..de93252490 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -0,0 +1,45 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +class DataStream; + +class IQueryPlanStep; +using QueryPlanStepPtr = std::unique_ptr; + +class QueryPipeline; +using QueryPipelinePtr = std::unique_ptr; + +/// A tree of query steps. +class QueryPlan +{ +public: + void addStep(QueryPlanStepPtr step); + + bool isInitialized() const { return root != nullptr; } /// Tree is not empty + bool isCompleted() const; /// Tree is not empty and root hasOutputStream() + const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted()) + + QueryPipelinePtr buildQueryPipeline(); + +private: + struct Node + { + QueryPlanStepPtr step; + std::vector children; + }; + + using Nodes = std::list; + Nodes nodes; + + Node * root = nullptr; + + void checkInitialized() const; + void checkNotCompleted() const; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index fee4847fb5..5cbc5dfd29 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -135,6 +135,7 @@ SRCS( Transforms/SortingTransform.cpp Transforms/TotalsHavingTransform.cpp QueryPlan/IQueryPlanStep.cpp + QueryPlan/QueryPlan.cpp ) END() -- GitLab