#include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } void QueryPlan::checkInitialized() const { if (!isInitialized()) throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR); } void QueryPlan::checkNotCompleted() const { if (isCompleted()) throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR); } bool QueryPlan::isCompleted() const { return isInitialized() && !root->step->hasOutputStream(); } const DataStream & QueryPlan::getCurrentDataStream() const { checkInitialized(); checkNotCompleted(); return root->step->getOutputStream(); } void QueryPlan::addStep(QueryPlanStepPtr step) { checkNotCompleted(); size_t num_input_streams = step->getInputStreams().size(); if (num_input_streams == 0) { if (isInitialized()) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR); nodes.emplace_back(Node{.step = std::move(step)}); return; } if (num_input_streams == 1) { if (!isInitialized()) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "step has input, but QueryPlan is not initialised", ErrorCodes::LOGICAL_ERROR); const auto & root_header = root->step->getOutputStream().header; const auto & step_header = step->getInputStreams().front().header; if (!blocksHaveEqualStructure(root_header, step_header)) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "it has incompatible header with root step " + root->step->getName() + " " "root header: " + root_header.dumpStructure() + "step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR); nodes.emplace_back(Node{.step = std::move(step), .children = {root}}); root = &nodes.back(); return; } throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " + std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) + " input expected", ErrorCodes::LOGICAL_ERROR); } QueryPipelinePtr QueryPlan::buildQueryPipeline() { checkInitialized(); struct Frame { Node * node; QueryPipelines pipelines; }; QueryPipelinePtr last_pipeline; std::stack stack; stack.push({.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (last_pipeline) frame.pipelines.emplace_back(std::move(last_pipeline)); size_t next_child = frame.pipelines.size(); if (next_child == frame.node->children.size()) { last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines)); stack.pop(); } else stack.push({.node = frame.node->children[next_child]}); } return last_pipeline; } }