提交 f37ed1fb 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

fix bugs in GraphiteMergeTree [#CLICKHOUSE-2984]:

* destructive rollup with Time=0 if no pattern matches
* differences in row count for Path and Value columns
* missed output rows on the boundaries between blocks
上级 f59ccb3d
......@@ -94,8 +94,8 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
if (selected_row.empty())
selected_row.columns.resize(num_columns);
if (current_selected_row.empty())
current_selected_row.columns.resize(num_columns);
}
if (has_collation)
......@@ -112,111 +112,114 @@ void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_column
{
const DateLUTImpl & date_lut = DateLUT::instance();
size_t merged_rows = 0;
size_t started_rows = 0;
/// Take rows in needed order and put them into `merged_block` until we get `max_block_size` rows.
///
/// Variables starting with current_* refer to the rows that were popped from the queue previously
/// and that will contribute towards current output row.
/// Variables starting with next_* refer to the row just popped from the queue.
/// Take rows in needed order and put them into `merged_block` until rows no more than `max_block_size`
while (!queue.empty())
{
TSortCursor current = queue.top();
next_path = current->all_columns[path_column_num]->getDataAt(current->pos);
next_time = current->all_columns[time_column_num]->get64(current->pos);
TSortCursor next_cursor = queue.top();
auto prev_pattern = current_pattern;
StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
bool path_differs = is_first || next_path != current_path;
is_first = false;
time_t next_time = next_cursor->all_columns[time_column_num]->get64(next_cursor->pos);
/// Is new key before rounding.
bool is_new_key = path_differs || next_time != current_time;
UInt64 current_version = current->all_columns[version_column_num]->get64(current->pos);
if (is_new_key)
{
current_path = next_path;
current_time = next_time;
/// For previous group of rows with same key, accumulate a row that has maximum version.
if (merged_rows)
accumulateRow(selected_row);
/// Accumulate the row that has maximum version in the previous group of rows wit the same key:
if (started_rows)
accumulateRow(current_selected_row);
const Graphite::Pattern * next_pattern = current_pattern;
if (path_differs)
current_pattern = selectPatternForPath(next_path);
next_pattern = selectPatternForPath(next_path);
if (current_pattern)
time_t next_time_rounded;
if (next_pattern)
{
UInt32 precision = selectPrecision(current_pattern->retentions, next_time);
UInt32 precision = selectPrecision(next_pattern->retentions, next_time);
next_time_rounded = roundTimeToPrecision(date_lut, next_time, precision);
}
/// If no patterns has matched - it means that no need to do rounding.
else
{
/// If no pattern has matched - take the value as-is.
next_time_rounded = next_time;
}
/// Key will be new after rounding. It means new result row.
bool will_be_new_key = path_differs || next_time_rounded != current_time_rounded;
if (will_be_new_key)
{
/// This is not the first row in block.
if (merged_rows)
if (started_rows)
{
finishCurrentRow(merged_columns);
/// if we have enough rows
if (merged_rows >= max_block_size)
/// We have enough rows - return, but don't advance the loop. At the beginning of the
/// next call to merge() the same next_cursor will be processed once more and
/// the next output row will be created from it.
if (started_rows >= max_block_size)
return;
}
startNextRow(merged_columns, current);
current_time_rounded = next_time_rounded;
/// At this point previous row has been fully processed, so we can advance the loop
/// (substitute current_* values for next_*, advance the cursor).
if (prev_pattern)
prev_pattern->function->destroy(place_for_aggregate_state.data());
if (current_pattern)
current_pattern->function->create(place_for_aggregate_state.data());
startNextRow(merged_columns, next_cursor, next_pattern);
++started_rows;
++merged_rows;
current_time_rounded = next_time_rounded;
}
current_path = next_path;
current_time = next_time;
}
/// Within all rows with same key, we should leave only one row with maximum version;
/// and for rows with same maximum version - only last row.
if (is_new_key || current_version >= current_max_version)
/// and for rows with same maximum version - only last row.
UInt64 next_version = next_cursor->all_columns[version_column_num]->get64(next_cursor->pos);
if (is_new_key || next_version >= current_max_version)
{
current_max_version = current_version;
setRowRef(selected_row, current);
current_max_version = next_version;
setRowRef(current_selected_row, next_cursor);
}
queue.pop();
if (!current->isLast())
if (!next_cursor->isLast())
{
current->next();
queue.push(current);
next_cursor->next();
queue.push(next_cursor);
}
else
{
/// We get the next block from the appropriate source, if there is one.
fetchNextBlock(current, queue);
fetchNextBlock(next_cursor, queue);
}
}
/// Write result row for the last group.
++merged_rows;
accumulateRow(selected_row);
finishCurrentRow(merged_columns);
finished = true;
if (current_pattern)
if (started_rows)
{
current_pattern->function->destroy(place_for_aggregate_state.data());
current_pattern = nullptr;
accumulateRow(current_selected_row);
finishCurrentRow(merged_columns);
}
finished = true;
}
template <class TSortCursor>
void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor)
void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern)
{
/// Copy unmodified column values.
for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
......@@ -225,8 +228,15 @@ void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged
merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos);
}
if (!current_pattern)
if (next_pattern)
{
next_pattern->function->create(place_for_aggregate_state.data());
aggregate_state_created = true;
}
else
merged_columns[value_column_num]->insertFrom(*cursor->all_columns[value_column_num], cursor->pos);
current_pattern = next_pattern;
}
......@@ -236,14 +246,18 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentRow(ColumnPlainPtrs & me
merged_columns[time_column_num]->insert(UInt64(current_time_rounded));
merged_columns[version_column_num]->insert(current_max_version);
if (current_pattern)
if (aggregate_state_created)
{
current_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
current_pattern->function->destroy(place_for_aggregate_state.data());
aggregate_state_created = false;
}
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
if (current_pattern)
if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &row.columns[value_column_num], row.row_num, nullptr);
}
......
......@@ -154,7 +154,7 @@ public:
~GraphiteRollupSortedBlockInputStream()
{
if (current_pattern)
if (aggregate_state_created)
current_pattern->function->destroy(place_for_aggregate_state.data());
}
......@@ -179,19 +179,17 @@ private:
/// All data has been read.
bool finished = false;
RowRef selected_row; /// Last row with maximum version for current primary key.
RowRef current_selected_row; /// Last row with maximum version for current primary key.
UInt64 current_max_version = 0;
bool is_first = true;
StringRef current_path;
time_t current_time = 0;
time_t current_time_rounded = 0;
StringRef next_path;
time_t next_time = 0;
time_t next_time_rounded = 0;
const Graphite::Pattern * current_pattern = nullptr;
std::vector<char> place_for_aggregate_state;
bool aggregate_state_created = false; /// Invariant: if true then current_pattern is not NULL.
const Graphite::Pattern * selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
......@@ -202,7 +200,7 @@ private:
/// Insert the values into the resulting columns, which will not be changed in the future.
template <class TSortCursor>
void startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor);
void startNextRow(ColumnPlainPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern);
/// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
void finishCurrentRow(ColumnPlainPtrs & merged_columns);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册