提交 629cb44d 编写于 作者: A Alexander Kuzmenkov

everything was wrong

上级 0d69249c
......@@ -577,7 +577,7 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (node->frame != WindowFrame{})
if (!(node->frame == WindowFrame{}))
node->frame.is_default = false;
......@@ -165,97 +165,103 @@ void WindowTransform::advancePartitionEnd()
partition_etalon = RowNumber{block_number, block_rows - 1};
void WindowTransform::advanceGroupEnd()
void WindowTransform::advanceFrameStart()
// Frame start is always UNBOUNDED PRECEDING for now, so we don't have to
// move it. It is initialized when the new partition starts.
bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
if (group_ended)
if (x == y)
// For convenience, a row is always its own peer.
return true;
switch (window_description.frame.type)
if (window_description.frame.type == WindowFrame::FrameType::Rows)
case WindowFrame::FrameType::Range:
case WindowFrame::FrameType::Groups:
case WindowFrame::FrameType::Rows:
// For ROWS frame, row is only peers with itself (checked above);
return false;
void WindowTransform::advanceGroupEndTrivial()
// ROWS mode, peer groups always contains only the current row.
// We cannot advance the groups if the group start is already beyond the
// end of partition.
assert(group_start < partition_end);
group_end = group_start;
group_ended = true;
void WindowTransform::advanceGroupEndOrderBy()
// For RANGE frame, rows that compare equal w/ORDER BY are peers.
assert(window_description.frame.type == WindowFrame::FrameType::Range);
const size_t n = order_by_indices.size();
if (n == 0)
// No ORDER BY, so all rows are the same group. The group will end
// with the partition.
group_end = partition_end;
group_ended = partition_ended;
// No ORDER BY, so all rows are peers.
return true;
// `partition_end` is either end of partition or end of data.
for (; group_end < partition_end; advanceRowNumber(group_end))
size_t i = 0;
for (; i < n; i++)
// Check for group end.
size_t i = 0;
for (; i < n; i++)
const auto * column_x = inputAt(x)[order_by_indices[i]].get();
const auto * column_y = inputAt(y)[order_by_indices[i]].get();
if (column_x->compareAt(x.row, y.row, *column_y,
1 /* nan_direction_hint */) != 0)
const auto * ref = inputAt(group_start)[order_by_indices[i]].get();
const auto * c = inputAt(group_end)[order_by_indices[i]].get();
if (c->compareAt(group_end.row, group_start.row, *ref,
1 /* nan_direction_hint */) != 0)
return false;
if (i < n)
return true;
void WindowTransform::advanceFrameEndCurrentRow()
// We only process one block here, and frame_end must be already in it: if
// we didn't find the end in the previous block, frame_end is now the first
// row of the current block. We need this knowledge to write a simpler loop
// (only loop over rows and not over blocks), that should hopefully be more
// efficient.
// partition_end is either in this new block or past-the-end.
assert(frame_end.block == partition_end.block
|| frame_end.block + 1 == partition_end.block);
if (frame_end == partition_end)
// The case when we get a new block and find out that the partition has
// ended.
frame_ended = partition_ended;
const auto block_rows = blockRowsNumber(frame_end);
// We could retreat the frame_end here, but for some reason I am reluctant
// to do this... It would have better data locality.
auto reference = current_row;
for (; frame_end.row < block_rows; ++frame_end.row)
if (!arePeers(reference, frame_end))
group_ended = true;
//fmt::print(stderr, "{} and {} don't match\n", reference, frame_end);
frame_ended = true;
reference = frame_end;
assert(group_end == partition_end);
if (partition_ended)
// A corner case -- the ORDER BY columns were the same, but the group
// still ended because the partition has ended.
group_ended = true;
// Got to the end of current block, have to properly update the row number.
frame_end.row = 0;
void WindowTransform::advanceFrameStart()
// Frame start is always UNBOUNDED PRECEDING for now, so we don't have to
// move it. It is initialized when the new partition starts.
// Got to the end of partition (frame ended as well then) or end of data.
assert(frame_end == partition_end);
frame_ended = partition_ended;
void WindowTransform::advanceFrameEnd()
// This should be called when we know the boundaries of the group (probably
// not a fundamental requirement, but currently it's written this way).
// No reason for this function to be called again after it succeeded.
const auto frame_end_before = frame_end;
// Frame end is always the current group end, for now.
// In ROWS mode the group is going to contain only the current row.
frame_end = group_end;
frame_ended = group_ended;
// The only frame end we have for now is CURRENT ROW.
// Add the columns over which we advanced the frame to the aggregate function
// states.
......@@ -321,13 +327,10 @@ void WindowTransform::advanceFrameEnd()
void WindowTransform::writeOutGroup()
void WindowTransform::writeOutCurrentRow()
// fmt::print(stderr, "write out group [{}..{})\n",
// group_start, group_end);
// Empty groups don't make sense.
assert(group_start < group_end);
assert(current_row < partition_end);
assert(current_row.block >= first_block_number);
for (size_t wi = 0; wi < workspaces.size(); ++wi)
......@@ -336,93 +339,11 @@ void WindowTransform::writeOutGroup()
const auto * a = f.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data();
// We'll calculate the value once for the first row in the group, and
// insert its copy for each other row in the group.
IColumn * reference_column = outputAt(group_start)[wi].get();
const size_t reference_row = group_start.row;
IColumn * result_column = outputAt(current_row)[wi].get();
// FIXME does it also allocate the result on the arena?
// We'll have to pass it out with blocks then...
a->insertResultInto(buf, *reference_column, arena.get());
// The row we just added to the end of the column must correspond to the
// first row of the group.
assert(reference_column->size() == reference_row + 1);
// fmt::print(stderr, "calculated value of function {} is '{}'\n",
// wi, toString((*reference_column)[reference_row]));
// Now duplicate the calculated value into all other rows.
auto first_row_to_copy_to = group_start;
// We use two explicit loops here instead of using advanceRowNumber(),
// because we want to batch the inserts per-block.
// Unfortunately this leads to tricky loop conditions, because the
// frame_end might be either a past-the-end block, or a valid block, in
// which case we also have to process its head. We have to avoid stepping
// into the past-the-end block because it might not be valid.
// Moreover, the past-the-end row is not in the past-the-end block, but
// in the block before it.
// And we also have to remember to reset the row number when moving to
// the next block.
uint64_t past_the_end_block;
uint64_t past_the_end_row;
if (group_end.row == 0)
// group_end might not be valid.
past_the_end_block = group_end.block;
// Otherwise a group would end at the start of data, this is not
// possible.
assert(group_end.block > 0);
const size_t first_valid_block = group_end.block - 1;
assert(first_valid_block >= first_block_number);
past_the_end_row = blocks[first_valid_block - first_block_number]
past_the_end_block = group_end.block + 1;
past_the_end_row = group_end.row;
for (auto block_index = first_row_to_copy_to.block;
block_index < past_the_end_block;
const auto & block = blocks[block_index - first_block_number];
// We process tail of the first block, all rows of intermediate
// blocks, and the head of the last block.
const auto block_first_row
= (block_index == first_row_to_copy_to.block)
? first_row_to_copy_to.row : 0;
const auto block_last_row = ((block_index + 1) == past_the_end_block)
? past_the_end_row : block.numRows();
// fmt::print(stderr,
// "group rest [{}, {}), pteb {}, pter {}, cur {}, fr {}, lr {}\n",
// group_start, group_end, past_the_end_block, group_end.row,
// block_index, block_first_row, block_last_row);
// The number of the elements left to insert may be zero, but we must
// notice it on the first block. Other blocks shouldn't be empty,
// because we don't generally have empty block, and advanceRowNumber()
// doesn't generate past-the-end row numbers, so we wouldn't get into
// a block we don't want to process.
if (block_first_row == block_last_row)
assert(block_index == first_row_to_copy_to.block);
reference_row, block_last_row - block_first_row);
a->insertResultInto(buf, *result_column, arena.get());
first_not_ready_row = group_end;
void WindowTransform::appendChunk(Chunk & chunk)
......@@ -434,6 +355,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
// have it if it's end of data, though.
if (!input_is_finished)
auto & block = blocks.back();
block.input_columns = chunk.detachColumns();
......@@ -470,25 +392,11 @@ void WindowTransform::appendChunk(Chunk & chunk)
// After that, advance the peer groups. We can advance peer groups until
// the end of partition or current end of data, which is precisely the
// description of `partition_end`.
while (group_start < partition_end)
// After that, try to calculate window functions for each next row.
// We can continue until the end of partition or current end of data,
// which is precisely the definition of `partition_end`.
while (current_row < partition_end)
// fmt::print(stderr, "group [{}, {}), {}\n", group_start, group_end,
// group_ended);
if (!group_ended)
// Wait for more input data to find the end of group.
// The group ended.
// Advance the frame start, updating the state of the aggregate
// functions.
......@@ -496,6 +404,9 @@ void WindowTransform::appendChunk(Chunk & chunk)
// functions.
// fmt::print(stderr, "row {} frame [{}, {}) {}\n",
// current_row, frame_start, frame_end, frame_ended);
if (!frame_ended)
// Wait for more input data to find the end of frame.
......@@ -504,16 +415,16 @@ void WindowTransform::appendChunk(Chunk & chunk)
// Write out the aggregation results
// The frame shouldn't be empty (probably?).
assert(frame_start < frame_end);
// Move to the next group.
// The frame will have to be recalculated.
frame_ended = false;
// Write out the aggregation results.
// Move to the next group.
group_ended = false;
group_start = group_end;
// Move to the next row. The frame will have to be recalculated.
first_not_ready_row = current_row;
frame_ended = false;
if (input_is_finished)
......@@ -543,10 +454,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
// for now.
frame_start = new_partition_start;
frame_end = new_partition_start;
group_start = new_partition_start;
group_end = new_partition_start;
// The group pointers are already reset to the partition start, see the
// above loop.
assert(current_row == new_partition_start);
// fmt::print(stderr, "reinitialize agg data at start of {}\n",
// new_partition_start);
......@@ -653,6 +561,17 @@ IProcessor::Status WindowTransform::prepare()
if (!has_input && input.hasData())
input_data = input.pullData(true /* set_not_needed */);
// If we got an exception from input, just return it and mark that we're
// finished.
if (input_data.exception)
return Status::PortFull;
has_input = true;
// Now we have new input and can try to generate more output in work().
......@@ -678,14 +597,8 @@ IProcessor::Status WindowTransform::prepare()
void WindowTransform::work()
if (input_data.exception)
/// Skip transform in case of exception.
output_data = std::move(input_data);
has_input = false;
has_output = true;
// Exceptions should be skipped in prepare().
assert(has_input || input_is_finished);
......@@ -697,7 +610,6 @@ void WindowTransform::work()
catch (DB::Exception &)
output_data.exception = std::current_exception();
has_output = true;
has_input = false;
......@@ -705,12 +617,12 @@ void WindowTransform::work()
// We don't really have to keep the entire partition, and it can be big, so
// we want to drop the starting blocks to save memory.
// We can drop the old blocks if we already returned them as output, and the
// frame, group and the partition etalon are already past them. Note that the
// frame start can be further than group start for some frame specs (e.g.
// EXCLUDE CURRENT ROW), so we have to check both.
// frame, the current row and the partition etalon are already past them.
// Note that the frame start can be further than current row for some frame
// specs (e.g. EXCLUDE CURRENT ROW), so we have to check both.
const auto first_used_block = std::min(next_output_block_number,
if (first_block_number < first_used_block)
......@@ -723,7 +635,7 @@ void WindowTransform::work()
assert(next_output_block_number >= first_block_number);
assert(frame_start.block >= first_block_number);
assert(group_start.block >= first_block_number);
assert(current_row.block >= first_block_number);
......@@ -61,17 +61,14 @@ struct RowNumber
* be sorted by PARTITION BY (in any order), then by ORDER BY.
* We need to track the following pointers:
* 1) boundaries of partition -- rows that compare equal w/PARTITION BY.
* 2) boundaries of peer group -- rows that compare equal w/ORDER BY (empty
* ORDER BY means all rows are peers).
* 3) boundaries of the frame.
* 2) current row for which we will compute the window functions.
* 3) boundaries of the frame for this row.
* Both the peer group and the frame are inside the partition, but can have any
* position relative to each other.
* All pointers only move forward. For partition and group boundaries, this is
* ensured by the order of input data. This property also trivially holds for
* the ROWS and GROUPS frames. For the RANGE frame, the proof requires the
* additional fact that the ranges are specified in terms of (the single)
* ORDER BY column.
* The value of the window function is the same for all rows of the peer group.
* All pointers only move forward. For partition boundaries, this is ensured by
* the order of input data. This property also trivially holds for the ROWS and
* GROUPS frames. For the RANGE frame, the proof requires the additional fact
* that the ranges are specified in terms of (the single) ORDER BY column.
class WindowTransform : public IProcessor /* public ISimpleTransform */
......@@ -105,13 +102,11 @@ public:
void advancePartitionEnd();
void advanceGroupEnd();
void advanceGroupEndOrderBy();
void advanceGroupEndTrivial();
void advanceGroupEndRange();
void advanceFrameStart();
void advanceFrameEnd();
void writeOutGroup();
void advanceFrameEndCurrentRow();
bool arePeers(const RowNumber & x, const RowNumber & y) const;
void writeOutCurrentRow();
Columns & inputAt(const RowNumber & x)
......@@ -179,7 +174,8 @@ private:
* Data (formerly) inherited from ISimpleTransform.
* Data (formerly) inherited from ISimpleTransform, needed for the
* implementation of the IProcessor interface.
InputPort & input;
OutputPort & output;
......@@ -231,21 +227,18 @@ public:
RowNumber partition_end;
bool partition_ended = false;
// Current peer group is [group_start, group_end) if group_ended,
// [group_start, ?) otherwise.
RowNumber group_start;
RowNumber group_end;
bool group_ended = false;
// This is the row for which we are computing the window functions now.
RowNumber current_row;
// The frame is [frame_start, frame_end) if frame_ended, and unknown
// otherwise. Note that when we move to the next peer group, both the
// otherwise. Note that when we move to the next row, both the
// frame_start and the frame_end may jump forward by an unknown amount of
// blocks, e.g. if we use a RANGE frame. This means that sometimes we don't
// know neither frame_end nor frame_start.
// We update the states of the window functions as we track the frame
// boundaries.
// After we have found the final boundaries of the frame, we can immediately
// output the result for the current group, w/o waiting for more data.
// output the result for the current row, w/o waiting for more data.
RowNumber frame_start;
RowNumber frame_end;
bool frame_ended = false;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册