未验证 提交 c28e224e 编写于 作者: N Nikolai Kochetov 提交者: GitHub

Merge pull request #8735 from ClickHouse/processors-5.12

Processors 5.12
......@@ -606,7 +606,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
/// If exception was thrown during pipeline execution, skip it while processing other exception.
}
pipeline = QueryPipeline()
/// pipeline = QueryPipeline()
);
while (true)
......
......@@ -781,7 +781,8 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const
size_t bucket,
std::atomic<bool> * is_cancelled) const
{
auto & merged_data = *variants[0];
auto method = merged_data.type;
......@@ -792,6 +793,8 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \
return {}; \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \
}
......@@ -1482,12 +1485,15 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> * is_cancelled) const
{
/// We merge all aggregation results to the first.
AggregatedDataVariantsPtr & res = data[0];
for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num)
{
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
return;
AggregatedDataVariants & current = *data[result_num];
mergeDataImpl<Method>(
......
......@@ -1170,7 +1170,8 @@ protected:
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
size_t bucket) const;
size_t bucket,
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
......@@ -1206,7 +1207,7 @@ protected:
template <typename Method>
void mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> * is_cancelled = nullptr) const;
template <typename Method>
void convertBlockToTwoLevelImpl(
......
......@@ -1949,7 +1949,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
pipeline.resize(pipeline.getNumStreams(), true);
if (!(storage && storage->hasEvenlyDistributedRead()))
pipeline.resize(pipeline.getNumStreams(), true, true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
auto merge_threads = settings.aggregation_memory_efficient_merge_threads
......
......@@ -259,7 +259,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
/// In this method we have ownership on node.
auto & node = graph[pid];
bool need_traverse = false;
bool need_expand_pipeline = false;
std::vector<Edge *> updated_back_edges;
......@@ -290,13 +289,11 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
need_traverse = true;
node.status = ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
need_traverse = true;
node.status = ExecStatus::Finished;
break;
}
......@@ -325,7 +322,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
}
if (need_traverse)
{
for (auto & edge_id : node.post_updated_input_ports)
{
......@@ -346,7 +342,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
}
if (need_traverse)
{
for (auto & edge : updated_direct_edges)
{
......@@ -543,7 +538,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!task_queue.empty() && !threads_queue.empty() /*&& task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
}
......@@ -627,9 +628,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
queue.pop();
}
if (!threads_queue.empty() /* && task_queue.quota() > threads_queue.size()*/)
if (!threads_queue.empty() && !finished /* && task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
......
......@@ -149,32 +149,37 @@ private:
++quota_;
}
ExecutionState * pop(size_t thread_num)
size_t getAnyThreadWithTasks(size_t from_thread = 0)
{
if (size_ == 0)
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < queues.size(); ++i)
{
if (!queues[thread_num].empty())
{
ExecutionState * state = queues[thread_num].front();
queues[thread_num].pop();
if (!queues[from_thread].empty())
return from_thread;
++from_thread;
if (from_thread >= queues.size())
from_thread = 0;
}
throw Exception("TaskQueue is empty.", ErrorCodes::LOGICAL_ERROR);
}
--size_;
ExecutionState * pop(size_t thread_num)
{
auto thread_with_tasks = getAnyThreadWithTasks(thread_num);
if (state->has_quota)
++quota_;
ExecutionState * state = queues[thread_with_tasks].front();
queues[thread_with_tasks].pop();
return state;
}
--size_;
++thread_num;
if (thread_num >= queues.size())
thread_num = 0;
}
if (state->has_quota)
++quota_;
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
return state;
}
size_t size() const { return size_; }
......
......@@ -33,6 +33,10 @@ ISimpleTransform::Status ISimpleTransform::prepare()
{
output.pushData(std::move(current_data));
transformed = false;
if (!no_more_data_needed)
return Status::PortFull;
}
/// Stop if don't need more data.
......@@ -52,12 +56,13 @@ ISimpleTransform::Status ISimpleTransform::prepare()
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_data = input.pullData();
current_data = input.pullData(true);
has_input = true;
if (current_data.exception)
......
......@@ -161,12 +161,17 @@ protected:
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
}
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags)
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags, bool set_not_needed = false)
{
flags = data_.swap(data, 0, HAS_DATA);
uintptr_t mask = HAS_DATA;
if (set_not_needed)
mask |= IS_NEEDED;
flags = data_.swap(data, 0, mask);
/// It's ok to check because this flag can be changed only by pulling thread.
if (unlikely((flags & IS_NEEDED) == 0))
if (unlikely((flags & IS_NEEDED) == 0) && !set_not_needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely((flags & HAS_DATA) == 0))
......@@ -266,14 +271,15 @@ private:
public:
using Port::Port;
Data ALWAYS_INLINE pullData()
Data ALWAYS_INLINE pullData(bool set_not_needed = false)
{
updateVersion();
if (!set_not_needed)
updateVersion();
assumeConnected();
std::uintptr_t flags = 0;
state->pull(data, flags);
state->pull(data, flags, set_not_needed);
is_finished = flags & State::IS_FINISHED;
......@@ -293,9 +299,9 @@ public:
return std::move(*data);
}
Chunk ALWAYS_INLINE pull()
Chunk ALWAYS_INLINE pull(bool set_not_needed = false)
{
auto data_ = pullData();
auto data_ = pullData(set_not_needed);
if (data_.exception)
std::rethrow_exception(data_.exception);
......
......@@ -234,7 +234,7 @@ void QueryPipeline::addDelayedStream(ProcessorPtr source)
addPipe({ std::move(processor) });
}
void QueryPipeline::resize(size_t num_streams, bool force)
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
{
checkInitialized();
......@@ -243,7 +243,13 @@ void QueryPipeline::resize(size_t num_streams, bool force)
has_resize = true;
auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
ProcessorPtr resize;
if (strict)
resize = std::make_shared<StrictResizeProcessor>(current_header, getNumStreams(), num_streams);
else
resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
auto stream = streams.begin();
for (auto & input : resize->getInputs())
connect(**(stream++), input);
......
......@@ -61,7 +61,7 @@ public:
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
void resize(size_t num_streams, bool force = false);
void resize(size_t num_streams, bool force = false, bool strict = false);
void enableQuotaForCurrentStreams();
......
#include <Processors/ResizeProcessor.h>
#include <iostream>
namespace DB
{
......@@ -257,5 +257,143 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
return Status::PortFull;
}
IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
if (!initialized)
{
initialized = true;
for (auto & input : inputs)
input_ports.push_back({.port = &input, .status = InputStatus::NotActive, .waiting_output = -1});
for (UInt64 i = 0; i < input_ports.size(); ++i)
disabled_input_ports.push(i);
for (auto & output : outputs)
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
}
for (auto & output_number : updated_outputs)
{
auto & output = output_ports[output_number];
if (output.port->isFinished())
{
if (output.status != OutputStatus::Finished)
{
++num_finished_outputs;
output.status = OutputStatus::Finished;
}
continue;
}
if (output.port->canPush())
{
if (output.status != OutputStatus::NeedData)
{
output.status = OutputStatus::NeedData;
waiting_outputs.push(output_number);
}
}
}
if (num_finished_outputs == outputs.size())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
std::queue<UInt64> inputs_with_data;
for (auto & input_number : updated_inputs)
{
auto & input = input_ports[input_number];
if (input.port->isFinished())
{
if (input.status != InputStatus::Finished)
{
input.status = InputStatus::Finished;
++num_finished_inputs;
waiting_outputs.push(input.waiting_output);
}
continue;
}
if (input.port->hasData())
{
if (input.status != InputStatus::NotActive)
{
input.status = InputStatus::NotActive;
inputs_with_data.push(input_number);
}
}
}
while (!inputs_with_data.empty())
{
auto input_number = inputs_with_data.front();
auto & input_with_data = input_ports[input_number];
inputs_with_data.pop();
if (input_with_data.waiting_output == -1)
throw Exception("No associated output for input with data.", ErrorCodes::LOGICAL_ERROR);
auto & waiting_output = output_ports[input_with_data.waiting_output];
if (waiting_output.status != OutputStatus::NeedData)
throw Exception("Invalid status for associated output.", ErrorCodes::LOGICAL_ERROR);
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_deeded = */ true));
waiting_output.status = OutputStatus::NotActive;
if (input_with_data.port->isFinished())
{
input_with_data.status = InputStatus::Finished;
++num_finished_inputs;
}
else
disabled_input_ports.push(input_number);
}
if (num_finished_inputs == inputs.size())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
/// Enable more inputs if needed.
while (!disabled_input_ports.empty() && !waiting_outputs.empty())
{
auto & input = input_ports[disabled_input_ports.front()];
disabled_input_ports.pop();
input.port->setNeeded();
input.status = InputStatus::NeedData;
input.waiting_output = waiting_outputs.front();
waiting_outputs.pop();
}
while (!waiting_outputs.empty())
{
auto & output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
output.status = OutputStatus::Finished;
output.port->finish();
++num_finished_outputs;
}
if (disabled_input_ports.empty())
return Status::NeedData;
return Status::PortFull;
}
}
......@@ -74,4 +74,60 @@ private:
std::vector<OutputPortWithStatus> output_ports;
};
class StrictResizeProcessor : public IProcessor
{
public:
/// TODO Check that there is non zero number of inputs and outputs.
StrictResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
InputPorts::iterator current_input;
OutputPorts::iterator current_output;
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> disabled_input_ports;
std::queue<UInt64> waiting_outputs;
bool initialized = false;
enum class OutputStatus
{
NotActive,
NeedData,
Finished,
};
enum class InputStatus
{
NotActive,
NeedData,
Finished,
};
struct InputPortWithStatus
{
InputPort * port;
InputStatus status;
ssize_t waiting_output;
};
struct OutputPortWithStatus
{
OutputPort * port;
OutputStatus status;
};
std::vector<InputPortWithStatus> input_ports;
std::vector<OutputPortWithStatus> output_ports;
};
}
......@@ -78,6 +78,7 @@ public:
{
std::atomic<UInt32> next_bucket_to_merge = 0;
std::array<std::atomic<Int32>, NUM_BUCKETS> source_for_bucket;
std::atomic<bool> is_cancelled = false;
SharedData()
{
......@@ -112,7 +113,7 @@ protected:
if (bucket_num >= NUM_BUCKETS)
return {};
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num);
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled);
Chunk chunk = convertToChunk(block);
shared_data->source_for_bucket[bucket_num] = source_number;
......@@ -201,6 +202,9 @@ public:
for (auto & input : inputs)
input.close();
if (shared_data)
shared_data->is_cancelled.store(true);
return Status::Finished;
}
......@@ -429,11 +433,16 @@ IProcessor::Status AggregatingTransform::prepare()
}
}
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
if (is_consume_finished)
input.setNeeded();
current_chunk = input.pull();
current_chunk = input.pull(/*set_not_needed = */ !is_consume_finished);
read_current_chunk = true;
if (is_consume_finished)
......
......@@ -74,16 +74,8 @@ IProcessor::Status MergingSortedTransform::prepare()
return Status::Finished;
}
if (!output.isNeeded())
{
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
if (output.hasData())
return Status::PortFull;
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
bool is_port_full = !output.canPush();
/// Special case for single input.
if (inputs.size() == 1)
......@@ -96,14 +88,20 @@ IProcessor::Status MergingSortedTransform::prepare()
}
input.setNeeded();
if (input.hasData())
output.push(input.pull());
{
if (!is_port_full)
output.push(input.pull());
return Status::PortFull;
}
return Status::NeedData;
}
/// Push if has data.
if (merged_data.mergedRows())
if (merged_data.mergedRows() && !is_port_full)
output.push(merged_data.pull());
if (!is_initialized)
......@@ -119,7 +117,7 @@ IProcessor::Status MergingSortedTransform::prepare()
if (!cursors[i].empty())
{
input.setNotNeeded();
// input.setNotNeeded();
continue;
}
......@@ -159,6 +157,10 @@ IProcessor::Status MergingSortedTransform::prepare()
{
if (is_finished)
{
if (is_port_full)
return Status::PortFull;
for (auto & input : inputs)
input.close();
......@@ -192,6 +194,9 @@ IProcessor::Status MergingSortedTransform::prepare()
need_data = false;
}
if (is_port_full)
return Status::PortFull;
return Status::Ready;
}
}
......
......@@ -241,12 +241,13 @@ IProcessor::Status SortingTransform::prepareConsume()
if (input.isFinished())
return Status::Finished;
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull();
current_chunk = input.pull(true);
}
/// Now consume.
......
......@@ -114,6 +114,8 @@ public:
/// Returns true if the blocks shouldn't be pushed to associated views on insert.
virtual bool noPushingToViews() const { return false; }
virtual bool hasEvenlyDistributedRead() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
......
......@@ -37,6 +37,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool hasEvenlyDistributedRead() const override { return true; }
private:
bool multithreaded;
bool even_distribution;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册