提交 0218ccb8 编写于 作者: A alexey-milovidov 提交者: Nikolai Kochetov

Merge pull request #10664 from ClickHouse/fix-10655

Fix 10655

(cherry picked from commit e4b3234d)
上级 4fb7dc90
......@@ -27,6 +27,8 @@ GroupingAggregatedTransform::GroupingAggregatedTransform(
void GroupingAggregatedTransform::readFromAllInputs()
{
auto in = inputs.begin();
read_from_all_inputs = true;
for (size_t i = 0; i < num_inputs; ++i, ++in)
{
if (in->isFinished())
......@@ -38,14 +40,15 @@ void GroupingAggregatedTransform::readFromAllInputs()
in->setNeeded();
if (!in->hasData())
return;
{
read_from_all_inputs = false;
continue;
}
auto chunk = in->pull();
read_from_input[i] = true;
addChunk(std::move(chunk), i);
}
read_from_all_inputs = true;
}
void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
......@@ -269,6 +272,7 @@ void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input)
void GroupingAggregatedTransform::work()
{
/// Convert single level data to two level.
if (!single_level_chunks.empty())
{
auto & header = getInputs().front().getHeader();
......
......@@ -28,11 +28,11 @@ private:
size_t num_inputs;
AggregatingTransformParamsPtr params;
std::vector<Int32> last_bucket_number;
std::map<Int32, Chunks> chunks_map;
std::vector<Int32> last_bucket_number; /// Last bucket read from each input.
std::map<Int32, Chunks> chunks_map; /// bucket -> chunks
Chunks overflow_chunks;
Chunks single_level_chunks;
Int32 current_bucket = 0;
Int32 current_bucket = 0; /// Currently processing bucket.
Int32 next_bucket_to_push = 0; /// Always <= current_bucket.
bool has_two_level = false;
......@@ -42,11 +42,17 @@ private:
bool expect_several_chunks_for_single_bucket_per_source = false;
/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);
/// Read from all inputs first chunk. It is needed to detect if any source has two-level aggregation.
void readFromAllInputs();
/// Push chunks if all inputs has single level.
bool tryPushSingleLevelData();
/// Push chunks from ready bucket if has one.
bool tryPushTwoLevelData();
/// Push overflow chunks if has any.
bool tryPushOverflowData();
/// Push chunks from bucket to output port.
void pushData(Chunks chunks, Int32 bucket, bool is_overflows);
};
......
<test>
<query>select sum(number) from remote('127.0.0{2,3,4,5}', numbers_mt(1000000000)) settings max_threads=4, distributed_aggregation_memory_efficient=1</query>
</test>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册