未验证 提交 df57706f 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #18550 from ClickHouse/fix-delayed-source

Fix pipeline stuck after join
......@@ -3,24 +3,37 @@
namespace DB
{
DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header))
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
DelayedPortsProcessor::DelayedPortsProcessor(
const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
: IProcessor(InputPorts(num_ports, header),
OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
, num_delayed(delayed_ports.size())
{
port_pairs.resize(num_ports);
output_to_pair.reserve(outputs.size());
for (const auto & delayed : delayed_ports)
port_pairs[delayed].is_delayed = true;
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
port_pairs[i].output_port = &*output_it;
++input_it;
++output_it;
}
for (const auto & delayed : delayed_ports)
port_pairs[delayed].is_delayed = true;
if (port_pairs[i].is_delayed || !assert_main_ports_empty)
{
port_pairs[i].output_port = &*output_it;
output_to_pair.push_back(i);
++output_it;
}
}
}
bool DelayedPortsProcessor::processPair(PortsPair & pair)
......@@ -34,7 +47,7 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
}
};
if (pair.output_port->isFinished())
if (pair.output_port && pair.output_port->isFinished())
{
pair.input_port->close();
finish();
......@@ -43,17 +56,24 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
if (pair.input_port->isFinished())
{
pair.output_port->finish();
if (pair.output_port)
pair.output_port->finish();
finish();
return false;
}
if (!pair.output_port->canPush())
if (pair.output_port && !pair.output_port->canPush())
return false;
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
if (!pair.output_port)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Input port for DelayedPortsProcessor is assumed to have no data, but it has one");
pair.output_port->pushData(pair.input_port->pullData());
}
return true;
}
......@@ -63,10 +83,21 @@ IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_in
bool skip_delayed = (num_finished + num_delayed) < port_pairs.size();
bool need_data = false;
if (!are_inputs_initialized && !updated_outputs.empty())
{
/// Activate inputs with no output.
for (const auto & pair : port_pairs)
if (!pair.output_port)
pair.input_port->setNeeded();
are_inputs_initialized = true;
}
for (const auto & output_number : updated_outputs)
{
if (!skip_delayed || !port_pairs[output_number].is_delayed)
need_data = processPair(port_pairs[output_number]) || need_data;
auto pair_num = output_to_pair[output_number];
if (!skip_delayed || !port_pairs[pair_num].is_delayed)
need_data = processPair(port_pairs[pair_num]) || need_data;
}
for (const auto & input_number : updated_inputs)
......
......@@ -11,7 +11,7 @@ namespace DB
class DelayedPortsProcessor : public IProcessor
{
public:
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports);
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty = false);
String getName() const override { return "DelayedPorts"; }
......@@ -31,6 +31,9 @@ private:
size_t num_delayed;
size_t num_finished = 0;
std::vector<size_t> output_to_pair;
bool are_inputs_initialized = false;
bool processPair(PortsPair & pair);
};
......
......@@ -298,7 +298,7 @@ void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
addTransform(std::move(processor));
}
......
......@@ -32,6 +32,11 @@ void AddingDelayedSourceStep::transformPipeline(QueryPipeline & pipeline)
{
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}
SELECT k FROM (SELECT NULL, nullIf(number, 3) AS k, '1048575', (65536, -9223372036854775808), toString(number) AS a FROM system.numbers LIMIT 1048577) AS js1 ANY RIGHT JOIN (SELECT 1.000100016593933, nullIf(number, NULL) AS k, toString(number) AS b FROM system.numbers LIMIT 2, 255) AS js2 USING (k) ORDER BY 257 ASC NULLS LAST FORMAT Null;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册