提交 5c87a91a 编写于 作者: M Marek Vavruša 提交者: alexey-milovidov

SummingSortedBlockInputStream: fix explicitly configured columns to sum

This fixes a variant of SummingMergeTree() in which
the columns to sum are explicitly configured.
Previously columns not in that list were ignored,
instead of writing last value.

This also fixes summation of invalid maps with
with only one key column and no value columns.

Modified test to work around compaction limitation
in which a zero-value column isn’t compacted
immediately if the inputs are non-zero but the
output is zero (+1 -1).
上级 fa098b46
......@@ -157,6 +157,11 @@ Block SummingSortedBlockInputStream::readImpl()
desc.state.resize(desc.function->sizeOfData());
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i);
}
}
}
......@@ -165,7 +170,11 @@ Block SummingSortedBlockInputStream::readImpl()
{
/// map should contain at least two elements (key -> value)
if (map.second.size() < 2)
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
......@@ -173,7 +182,11 @@ Block SummingSortedBlockInputStream::readImpl()
if (isInPrimaryKey(description, merged_block.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
// Wrap aggregated columns in a tuple to match function signature
DataTypes argument_types = {};
......@@ -216,7 +229,11 @@ Block SummingSortedBlockInputStream::readImpl()
}
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
if (map_desc.key_col_nums.size() == 1)
{
......@@ -230,10 +247,8 @@ Block SummingSortedBlockInputStream::readImpl()
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto i : map_desc.key_col_nums)
column_numbers_not_to_aggregate.push_back(i);
for (auto i : map_desc.val_col_nums)
column_numbers_not_to_aggregate.push_back(i);
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
maps_to_sum.emplace_back(std::move(map_desc));
}
}
......@@ -290,7 +305,6 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
current_key.swap(next_key);
setRow(current_row, current);
current_row_is_zero = false;
/// Reset aggregation states for next row
for (auto & desc : columns_to_aggregate)
......@@ -298,9 +312,14 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
desc.function->create(desc.state.data());
desc.created = true;
}
// Start aggregations with current row
current_row_is_zero = !addRow(current_row, current);
}
else
{
current_row_is_zero = !addRow(current_row, current);
// Merge maps only for same rows
for (auto & desc : maps_to_sum)
{
......@@ -309,9 +328,6 @@ void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std:
}
}
if (addRow(current_row, current))
current_row_is_zero = false;
if (!current->isLast())
{
current->next();
......@@ -445,6 +461,8 @@ bool SummingSortedBlockInputStream::addRow(Row & row, TSortCursor & cursor)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
desc.function->add(desc.state.data(), columns.data(), cursor->pos, nullptr);
// Note: we can't detect whether the aggregation result is non-empty here yet
res = true;
}
}
}
......
......@@ -10,6 +10,7 @@ SELECT * FROM test.empty_summing;
INSERT INTO test.empty_summing VALUES ('2015-01-01', 1, 4),('2015-01-01', 2, -9),('2015-01-01', 3, -14);
INSERT INTO test.empty_summing VALUES ('2015-01-01', 1, -2),('2015-01-01', 1, -2),('2015-01-01', 3, 14);
INSERT INTO test.empty_summing VALUES ('2015-01-01', 1, 0),('2015-01-01', 3, 0);
OPTIMIZE TABLE test.empty_summing;
SELECT * FROM test.empty_summing;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册