diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index 91fa31c1d04a3a330d88e0322f75e6871ae47696..9b71ae39a8e77bfeb3e1a30e7c598f2ed18bacb2 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -130,6 +130,7 @@ struct QueryPipelineSettings { QueryPlan::ExplainPipelineOptions query_pipeline_options; bool graph = false; + bool compact = true; constexpr static char name[] = "PIPELINE"; @@ -137,6 +138,7 @@ struct QueryPipelineSettings { {"header", query_pipeline_options.header}, {"graph", graph}, + {"compact", compact}, }; }; @@ -262,7 +264,10 @@ BlockInputStreamPtr InterpreterExplainQuery::executeImpl() if (settings.graph) { - printPipeline(pipeline->getProcessors(), buffer); + if (settings.compact) + printPipelineCompact(pipeline->getProcessors(), buffer, settings.query_pipeline_options.header); + else + printPipeline(pipeline->getProcessors(), buffer); } else { diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index 7c9f94916bff88e8b6a8e4fb630e0c194ed931f4..e9148dd507569c33fd6a9b0c3f135c2f90398423 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -312,8 +312,8 @@ private: bool has_quota = false; - IQueryPlanStep * query_plan_step; - size_t query_plan_step_group; + IQueryPlanStep * query_plan_step = nullptr; + size_t query_plan_step_group = 0; }; diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 8927c75329a192f0aa0db66dc732eaa6d70182fb..fa04082c82ffdfd82ba086f9828c6290b5446113 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -879,12 +879,11 @@ QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector() Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group) { - Processors res; - res.swap(processors); - for (auto & processor : processors) processor->setQueryPlanStep(step, group); + Processors res; + res.swap(processors); return res; } diff --git a/src/Processors/printPipeline.cpp b/src/Processors/printPipeline.cpp new file mode 100644 index 0000000000000000000000000000000000000000..76c84218fb8226dd16038b2c685aefbbaf3cbe1f --- /dev/null +++ b/src/Processors/printPipeline.cpp @@ -0,0 +1,164 @@ +#include +#include +#include +#include + +namespace DB +{ + +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header) +{ + struct Node; + struct Key + { + size_t group; + IQueryPlanStep * step; + std::string name; + + auto getTuple() const { return std::forward_as_tuple(group, step, name); } + + bool operator<(const Key & other) const + { + return getTuple() < other.getTuple(); + } + }; + + struct EdgeData + { + Block header; + size_t count; + }; + + using Edge = std::vector; + + struct Node + { + size_t id = 0; + std::map edges = {}; + std::vector agents = {}; + }; + + std::map graph; + auto get_key = [](const IProcessor & processor) + { + return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()}; + }; + + for (const auto & processor : processors) + { + auto res = graph.emplace(get_key(*processor), Node()); + res.first->second.agents.emplace_back(processor.get()); + + if (res.second) + res.first->second.id = graph.size(); + } + + Block empty_header; + + for (const auto & processor : processors) + { + auto & from = graph[get_key(*processor)]; + + for (auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + + auto & to = graph[get_key(port.getInputPort().getProcessor())]; + auto & edge = from.edges[&to]; + const auto & header = with_header ? port.getHeader() : empty_header; + + bool found = false; + for (auto & item : edge) + { + if (blocksHaveEqualStructure(header, item.header)) + { + found = true; + ++item.count; + break; + } + } + + if (!found) + edge.emplace_back(EdgeData{header, 1}); + } + } + + std::map> steps_map; + + for (const auto & item : graph) + steps_map[item.first.step].emplace_back(&item.second); + + out << "digraph\n{\n"; + out << " rankdir=\"LR\";\n"; + out << " { node [shape = box]\n"; + + /// Nodes // TODO quoting and escaping + size_t next_step = 0; + for (const auto & item : steps_map) + { + if (item.first != nullptr) + { + out << " subgraph cluster_" << next_step << " {\n"; + out << " label =\"" << item.first->getName() << "\";\n"; + out << " style=filled;\n"; + out << " color=lightgrey;\n"; + out << " node [style=filled,color=white];\n"; + out << " { rank = same;\n"; + + ++next_step; + } + + for (const auto & node : item.second) + { + const auto & processor = node->agents.front(); + out << " n" << node->id << " [label=\"" << processor->getName(); + + if (node->agents.size() > 1) + out << " x " << node->agents.size(); + + const auto & description = processor->getDescription(); + if (!description.empty()) + out << ' ' << description; + + out << "\"];\n"; + } + + if (item.first != nullptr) + { + out << " }\n"; + out << " }\n"; + } + } + + out << " }\n"; + + /// Edges + for (const auto & item : graph) + { + for (const auto & edge : item.second.edges) + { + for (const auto & data : edge.second) + { + out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\""; + + if (data.count > 1) + out << "x " << data.count; + + if (with_header) + { + for (const auto & elem : data.header) + { + out << "\n"; + elem.dumpStructure(out); + } + } + + out << "\"];\n"; + } + } + } + out << "}\n"; +} + +} diff --git a/src/Processors/printPipeline.h b/src/Processors/printPipeline.h index ce7306ec4cf95b4bba219a9b717f6e470eadc24d..a6d134a5c17bf0df7a153d8eb7af1ba25af2a58e 100644 --- a/src/Processors/printPipeline.h +++ b/src/Processors/printPipeline.h @@ -15,6 +15,8 @@ template void printPipeline(const Processors & processors, const Statuses & statuses, WriteBuffer & out) { out << "digraph\n{\n"; + out << " rankdir=\"LR\";\n"; + out << " { node [shape = box]\n"; auto get_proc_id = [](const IProcessor & proc) -> UInt64 { @@ -26,7 +28,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri /// Nodes // TODO quoting and escaping for (const auto & processor : processors) { - out << "n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << processor->getDescription(); + out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << processor->getDescription(); if (statuses_iter != statuses.end()) { @@ -37,6 +39,8 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri out << "\"];\n"; } + out << " }\n"; + /// Edges for (const auto & processor : processors) { @@ -48,7 +52,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri const IProcessor & curr = *processor; const IProcessor & next = port.getInputPort().getProcessor(); - out << "n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << ";\n"; + out << " n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << ";\n"; } } out << "}\n"; @@ -60,4 +64,6 @@ void printPipeline(const Processors & processors, WriteBuffer & out) printPipeline(processors, std::vector(), out); } +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header); + } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 550aa9ad7da1f166d74bac31b3bf83b932b4b44b..3488cf534d3708e7cadacf1bfa146f0d6fe22579 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -105,6 +105,7 @@ SRCS( OffsetTransform.cpp Pipe.cpp Port.cpp + printPipeline.cpp QueryPipeline.cpp ResizeProcessor.cpp Sources/DelayedSource.cpp