WindowTransform.cpp 54.4 KB
Newer Older
A
Alexander Kuzmenkov 已提交
1 2
#include <Processors/Transforms/WindowTransform.h>

A
Alexander Kuzmenkov 已提交
3
#include <AggregateFunctions/AggregateFunctionFactory.h>
A
Alexander Kuzmenkov 已提交
4
#include <Common/Arena.h>
5
#include <DataTypes/DataTypesNumber.h>
6
#include <DataTypes/getLeastSupertype.h>
A
Alexander Kuzmenkov 已提交
7 8 9
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>

10

A
Alexander Kuzmenkov 已提交
11 12 13
namespace DB
{

14 15
namespace ErrorCodes
{
A
Alexander Kuzmenkov 已提交
16
    extern const int BAD_ARGUMENTS;
A
Alexander Kuzmenkov 已提交
17
    extern const int NOT_IMPLEMENTED;
18 19
}

20 21 22 23
// Interface for true window functions. It's not much of an interface, they just
// accept the guts of WindowTransform and do 'something'. Given a small number of
// true window functions, and the fact that the WindowTransform internals are
// pretty much well defined in domain terms (e.g. frame boundaries), this is
A
Alexander Kuzmenkov 已提交
24
// somewhat acceptable.
A
cleanpu  
Alexander Kuzmenkov 已提交
25 26
class IWindowFunction
{
27
public:
A
cleanup  
Alexander Kuzmenkov 已提交
28
    virtual ~IWindowFunction() = default;
29

A
Alexander Kuzmenkov 已提交
30
    // Must insert the result for current_row.
31 32
    virtual void windowInsertResultInto(const WindowTransform * transform,
        size_t function_index) = 0;
33 34
};

A
Alexander Kuzmenkov 已提交
35 36 37 38 39 40 41
// Compares ORDER BY column values at given rows to find the boundaries of frame:
// [compared] with [reference] +/- offset. Return value is -1/0/+1, like in
// sorting predicates -- -1 means [compared] is less than [reference] +/- offset.
template <typename ColumnType>
static int compareValuesWithOffset(const IColumn * _compared_column,
    size_t compared_row, const IColumn * _reference_column,
    size_t reference_row,
42
    const Field & _offset,
A
Alexander Kuzmenkov 已提交
43 44 45 46 47 48 49 50
    bool offset_is_preceding)
{
    // Casting the columns to the known type here makes it faster, probably
    // because the getData call can be devirtualized.
    const auto * compared_column = assert_cast<const ColumnType *>(
        _compared_column);
    const auto * reference_column = assert_cast<const ColumnType *>(
        _reference_column);
51
    const auto offset = _offset.get<typename ColumnType::ValueType>();
A
fixes  
Alexander Kuzmenkov 已提交
52
    assert(offset >= 0);
A
Alexander Kuzmenkov 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106

    const auto compared_value_data = compared_column->getDataAt(compared_row);
    assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
    auto compared_value = unalignedLoad<typename ColumnType::ValueType>(
        compared_value_data.data);

    const auto reference_value_data = reference_column->getDataAt(reference_row);
    assert(reference_value_data.size == sizeof(typename ColumnType::ValueType));
    auto reference_value = unalignedLoad<typename ColumnType::ValueType>(
        reference_value_data.data);

    bool is_overflow;
    bool overflow_to_negative;
    if (offset_is_preceding)
    {
        is_overflow = __builtin_sub_overflow(reference_value, offset,
            &reference_value);
        overflow_to_negative = offset > 0;
    }
    else
    {
        is_overflow = __builtin_add_overflow(reference_value, offset,
            &reference_value);
        overflow_to_negative = offset < 0;
    }

//    fmt::print(stderr,
//        "compared [{}] = {}, ref [{}] = {}, offset {} preceding {} overflow {} to negative {}\n",
//        compared_row, toString(compared_value),
//        reference_row, toString(reference_value),
//        toString(offset), offset_is_preceding,
//        is_overflow, overflow_to_negative);

    if (is_overflow)
    {
        if (overflow_to_negative)
        {
            // Overflow to the negative, [compared] must be greater.
            return 1;
        }
        else
        {
            // Overflow to the positive, [compared] must be less.
            return -1;
        }
    }
    else
    {
        // No overflow, compare normally.
        return compared_value < reference_value ? -1
            : compared_value == reference_value ? 0 : 1;
    }
}

107 108 109 110 111 112 113 114 115 116 117 118 119 120
// A specialization of compareValuesWithOffset for floats.
template <typename ColumnType>
static int compareValuesWithOffsetFloat(const IColumn * _compared_column,
    size_t compared_row, const IColumn * _reference_column,
    size_t reference_row,
    const Field & _offset,
    bool offset_is_preceding)
{
    // Casting the columns to the known type here makes it faster, probably
    // because the getData call can be devirtualized.
    const auto * compared_column = assert_cast<const ColumnType *>(
        _compared_column);
    const auto * reference_column = assert_cast<const ColumnType *>(
        _reference_column);
A
fixes  
Alexander Kuzmenkov 已提交
121 122
    const auto offset = _offset.get<typename ColumnType::ValueType>();
    assert(offset >= 0);
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153

    const auto compared_value_data = compared_column->getDataAt(compared_row);
    assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
    auto compared_value = unalignedLoad<typename ColumnType::ValueType>(
        compared_value_data.data);

    const auto reference_value_data = reference_column->getDataAt(reference_row);
    assert(reference_value_data.size == sizeof(typename ColumnType::ValueType));
    auto reference_value = unalignedLoad<typename ColumnType::ValueType>(
        reference_value_data.data);

    // Floats overflow to Inf and the comparison will work normally, so we don't
    // have to do anything.
    if (offset_is_preceding)
    {
        reference_value -= offset;
    }
    else
    {
        reference_value += offset;
    }

    const auto result =  compared_value < reference_value ? -1
        : compared_value == reference_value ? 0 : 1;

//    fmt::print(stderr, "compared {}, offset {}, reference {}, result {}\n",
//        compared_value, offset, reference_value, result);

    return result;
}

A
Alexander Kuzmenkov 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
// Helper macros to dispatch on type of the ORDER BY column
#define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \
else if (typeid_cast<const TYPE *>(column)) \
{ \
    /* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \
    compare_values_with_offset = FUNCTION<TYPE>; /* NOLINT */ \
}

#define APPLY_FOR_TYPES(FUNCTION) \
if (false) /* NOLINT */ \
{ \
    /* Do nothing, a starter condition. */ \
} \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt8>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt16>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt32>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt64>) \
171 172 173 174 175 176 177 178 179 180
\
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int8>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int16>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int32>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int64>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int128>) \
\
APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector<Float32>) \
APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector<Float64>) \
\
A
Alexander Kuzmenkov 已提交
181 182 183 184 185 186 187
else \
{ \
    throw Exception(ErrorCodes::NOT_IMPLEMENTED, \
        "The RANGE OFFSET frame for '{}' ORDER BY column is not implemented", \
        demangle(typeid(*column).name())); \
}

A
Alexander Kuzmenkov 已提交
188 189 190
WindowTransform::WindowTransform(const Block & input_header_,
        const Block & output_header_,
        const WindowDescription & window_description_,
A
Alexander Kuzmenkov 已提交
191 192 193 194
        const std::vector<WindowFunctionDescription> & functions)
    : IProcessor({input_header_}, {output_header_})
    , input(inputs.front())
    , output(outputs.front())
A
Alexander Kuzmenkov 已提交
195 196
    , input_header(input_header_)
    , window_description(window_description_)
A
Alexander Kuzmenkov 已提交
197
{
A
Alexander Kuzmenkov 已提交
198 199
    workspaces.reserve(functions.size());
    for (const auto & f : functions)
A
Alexander Kuzmenkov 已提交
200 201
    {
        WindowFunctionWorkspace workspace;
A
Alexander Kuzmenkov 已提交
202 203
        workspace.aggregate_function = f.aggregate_function;
        const auto & aggregate_function = workspace.aggregate_function;
A
Alexander Kuzmenkov 已提交
204 205 206 207 208
        if (!arena && aggregate_function->allocatesMemoryInArena())
        {
            arena = std::make_unique<Arena>();
        }

A
Alexander Kuzmenkov 已提交
209 210
        workspace.argument_column_indices.reserve(f.argument_names.size());
        for (const auto & argument_name : f.argument_names)
A
Alexander Kuzmenkov 已提交
211 212 213 214
        {
            workspace.argument_column_indices.push_back(
                input_header.getPositionByName(argument_name));
        }
A
Alexander Kuzmenkov 已提交
215
        workspace.argument_columns.assign(f.argument_names.size(), nullptr);
A
Alexander Kuzmenkov 已提交
216

217 218 219 220 221 222 223 224
        workspace.window_function_impl = aggregate_function->asWindowFunction();
        if (!workspace.window_function_impl)
        {
            workspace.aggregate_function_state.reset(
                aggregate_function->sizeOfData(),
                aggregate_function->alignOfData());
            aggregate_function->create(workspace.aggregate_function_state.data());
        }
A
Alexander Kuzmenkov 已提交
225 226 227 228 229 230 231 232 233 234

        workspaces.push_back(std::move(workspace));
    }

    partition_by_indices.reserve(window_description.partition_by.size());
    for (const auto & column : window_description.partition_by)
    {
        partition_by_indices.push_back(
            input_header.getPositionByName(column.column_name));
    }
235 236 237 238 239 240 241

    order_by_indices.reserve(window_description.order_by.size());
    for (const auto & column : window_description.order_by)
    {
        order_by_indices.push_back(
            input_header.getPositionByName(column.column_name));
    }
A
Alexander Kuzmenkov 已提交
242 243 244 245 246 247 248 249 250 251

    // Choose a row comparison function for RANGE OFFSET frame based on the
    // type of the ORDER BY column.
    if (window_description.frame.type == WindowFrame::FrameType::Range
        && (window_description.frame.begin_type
                == WindowFrame::BoundaryType::Offset
            || window_description.frame.end_type
                == WindowFrame::BoundaryType::Offset))
    {
        assert(order_by_indices.size() == 1);
252 253
        const auto & entry = input_header.getByPosition(order_by_indices[0]);
        const IColumn * column = entry.column.get();
A
Alexander Kuzmenkov 已提交
254
        APPLY_FOR_TYPES(compareValuesWithOffset)
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

        // Check that the offset type matches the window type.
        // Convert the offsets to the ORDER BY column type. We can't just check
        // that it matches, because e.g. the int literals are always (U)Int64,
        // but the column might be Int8 and so on.
        if (window_description.frame.begin_type
            == WindowFrame::BoundaryType::Offset)
        {
            window_description.frame.begin_offset = convertFieldToTypeOrThrow(
                window_description.frame.begin_offset,
                *entry.type);
        }
        if (window_description.frame.end_type
            == WindowFrame::BoundaryType::Offset)
        {
            window_description.frame.end_offset = convertFieldToTypeOrThrow(
                window_description.frame.end_offset,
                *entry.type);
        }
A
Alexander Kuzmenkov 已提交
274
    }
A
Alexander Kuzmenkov 已提交
275
}
A
Alexander Kuzmenkov 已提交
276

A
Alexander Kuzmenkov 已提交
277
WindowTransform::~WindowTransform()
A
Alexander Kuzmenkov 已提交
278
{
A
Alexander Kuzmenkov 已提交
279
    // Some states may be not created yet if the creation failed.
280
    for (auto & ws : workspaces)
A
Alexander Kuzmenkov 已提交
281
    {
282 283
        if (!ws.window_function_impl)
        {
A
Alexander Kuzmenkov 已提交
284
            ws.aggregate_function->destroy(
285 286
                ws.aggregate_function_state.data());
        }
A
Alexander Kuzmenkov 已提交
287
    }
A
Alexander Kuzmenkov 已提交
288 289
}

290
void WindowTransform::advancePartitionEnd()
A
Alexander Kuzmenkov 已提交
291
{
292 293 294 295
    if (partition_ended)
    {
        return;
    }
A
Alexander Kuzmenkov 已提交
296

297 298
    const RowNumber end = blocksEnd();

299
//    fmt::print(stderr, "end {}, partition_end {}\n", end, partition_end);
300

301 302 303 304 305
    // If we're at the total end of data, we must end the partition. This is one
    // of the few places in calculations where we need special handling for end
    // of data, other places will work as usual based on
    // `partition_ended` = true, because end of data is logically the same as
    // any other end of partition.
306 307 308
    // We must check this first, because other calculations might not be valid
    // when we're at the end of data.
    if (input_is_finished)
A
Alexander Kuzmenkov 已提交
309
    {
310
        partition_ended = true;
311 312 313
        // We receive empty chunk at the end of data, so the partition_end must
        // be already at the end of data.
        assert(partition_end == end);
314 315 316
        return;
    }

317 318
    // If we got to the end of the block already, but we are going to get more
    // input data, wait for it.
319 320 321 322 323 324 325 326 327 328 329 330
    if (partition_end == end)
    {
        return;
    }

    // We process one block at a time, but we can process each block many times,
    // if it contains multiple partitions. The `partition_end` is a
    // past-the-end pointer, so it must be already in the "next" block we haven't
    // processed yet. This is also the last block we have.
    // The exception to this rule is end of data, for which we checked above.
    assert(end.block == partition_end.block + 1);

331 332 333 334 335 336 337 338 339 340
    // Try to advance the partition end pointer.
    const size_t n = partition_by_indices.size();
    if (n == 0)
    {
        // No PARTITION BY. All input is one partition, which will end when the
        // input ends.
        partition_end = end;
        return;
    }

341 342 343 344
    // Check for partition end.
    // The partition ends when the PARTITION BY columns change. We need
    // some reference columns for comparison. We might have already
    // dropped the blocks where the partition starts, but any row in the
345 346 347
    // partition will do. We use the current_row for this. It might be the same
    // as the partition_end if we're at the first row of the first partition, so
    // we will compare it to itself, but it still works correctly.
348 349
    const auto block_rows = blockRowsNumber(partition_end);
    for (; partition_end.row < block_rows; ++partition_end.row)
350 351
    {
        size_t i = 0;
A
cleanup  
Alexander Kuzmenkov 已提交
352
        for (; i < n; i++)
A
Alexander Kuzmenkov 已提交
353
        {
354
            const auto * ref = inputAt(current_row)[partition_by_indices[i]].get();
355 356
            const auto * c = inputAt(partition_end)[partition_by_indices[i]].get();
            if (c->compareAt(partition_end.row,
357
                    current_row.row, *ref,
358 359 360 361 362
                    1 /* nan_direction_hint */) != 0)
            {
                break;
            }
        }
A
Alexander Kuzmenkov 已提交
363

364 365 366 367
        if (i < n)
        {
            partition_ended = true;
            return;
A
Alexander Kuzmenkov 已提交
368
        }
369 370
    }

371 372 373 374
    // Went until the end of block, go to the next.
    assert(partition_end.row == block_rows);
    ++partition_end.block;
    partition_end.row = 0;
375

376 377 378 379
    // Went until the end of data and didn't find the new partition.
    assert(!partition_ended && partition_end == blocksEnd());
}

380
auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int offset) const
A
Alexander Kuzmenkov 已提交
381
{
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
    RowNumber x = _x;

    if (offset > 0)
    {
        for (;;)
        {
            assertValid(x);
            assert(offset >= 0);

            const auto block_rows = blockRowsNumber(x);
            x.row += offset;
            if (x.row >= block_rows)
            {
                offset = x.row - block_rows;
                x.row = 0;
                x.block++;

                if (x == blocksEnd())
                {
                    break;
                }
            }
            else
            {
                offset = 0;
                break;
            }
        }
    }
    else if (offset < 0)
    {
        for (;;)
        {
            assertValid(x);
            assert(offset <= 0);

            if (x.row >= static_cast<uint64_t>(-offset))
            {
                x.row -= -offset;
                offset = 0;
                break;
            }

A
Alexander Kuzmenkov 已提交
425 426 427 428 429 430 431 432
            // Move to the first row in current block. Note that the offset is
            // negative.
            offset += x.row;
            x.row = 0;

            // Move to the last row of the previous block, if we are not at the
            // first one. Offset also is incremented by one, because we pass over
            // the first row of this block.
433 434 435 436 437 438
            if (x.block == first_block_number)
            {
                break;
            }

            --x.block;
A
Alexander Kuzmenkov 已提交
439
            offset += 1;
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
            x.row = blockRowsNumber(x) - 1;
        }
    }

    return std::tuple{x, offset};
}

auto WindowTransform::moveRowNumber(const RowNumber & _x, int offset) const
{
    auto [x, o] = moveRowNumberNoCheck(_x, offset);

#ifndef NDEBUG
    // Check that it was reversible.
    auto [xx, oo] = moveRowNumberNoCheck(x, -(offset - o));

//    fmt::print(stderr, "{} -> {}, result {}, {}, new offset {}, twice {}, {}\n",
//        _x, offset, x, o, -(offset - o), xx, oo);
    assert(xx == _x);
    assert(oo == 0);
#endif

    return std::tuple{x, o};
}


void WindowTransform::advanceFrameStartRowsOffset()
{
    // Just recalculate it each time by walking blocks.
    const auto [moved_row, offset_left] = moveRowNumber(current_row,
469
        window_description.frame.begin_offset.get<UInt64>()
A
tmp  
Alexander Kuzmenkov 已提交
470
            * (window_description.frame.begin_preceding ? -1 : 1));
471 472 473 474 475

    frame_start = moved_row;

    assertValid(frame_start);

A
Alexander Kuzmenkov 已提交
476 477
//    fmt::print(stderr, "frame start {} left {} partition start {}\n",
//        frame_start, offset_left, partition_start);
478

A
Alexander Kuzmenkov 已提交
479
    if (frame_start <= partition_start)
480 481 482 483 484 485 486
    {
        // Got to the beginning of partition and can't go further back.
        frame_start = partition_start;
        frame_started = true;
        return;
    }

A
Alexander Kuzmenkov 已提交
487
    if (partition_end <= frame_start)
488 489
    {
        // A FOLLOWING frame start ran into the end of partition.
A
Alexander Kuzmenkov 已提交
490 491
        frame_start = partition_end;
        frame_started = partition_ended;
A
Alexander Kuzmenkov 已提交
492
        return;
493 494
    }

A
Alexander Kuzmenkov 已提交
495 496
    // Handled the equality case above. Now the frame start is inside the
    // partition, if we walked all the offset, it's final.
497 498
    assert(partition_start < frame_start);
    frame_started = offset_left == 0;
A
Alexander Kuzmenkov 已提交
499 500 501 502

    // If we ran into the start of data (offset left is negative), we won't be
    // able to make progress. Should have handled this case above.
    assert(offset_left >= 0);
503 504
}

A
tmp  
Alexander Kuzmenkov 已提交
505 506 507

void WindowTransform::advanceFrameStartRangeOffset()
{
A
Alexander Kuzmenkov 已提交
508
    // See the comment for advanceFrameEndRangeOffset().
A
Alexander Kuzmenkov 已提交
509
    const int direction = window_description.order_by[0].direction;
A
Alexander Kuzmenkov 已提交
510 511
    const bool preceding = window_description.frame.begin_preceding
        == (direction > 0);
A
Alexander Kuzmenkov 已提交
512 513
    const auto * reference_column
        = inputAt(current_row)[order_by_indices[0]].get();
A
tmp  
Alexander Kuzmenkov 已提交
514 515
    for (; frame_start < partition_end; advanceRowNumber(frame_start))
    {
A
Alexander Kuzmenkov 已提交
516 517
        // The first frame value is [current_row] with offset, so we advance
        // while [frames_start] < [current_row] with offset.
A
Alexander Kuzmenkov 已提交
518 519 520
        const auto * compared_column
            = inputAt(frame_start)[order_by_indices[0]].get();
        if (compare_values_with_offset(compared_column, frame_start.row,
A
tmp  
Alexander Kuzmenkov 已提交
521 522
            reference_column, current_row.row,
            window_description.frame.begin_offset,
A
Alexander Kuzmenkov 已提交
523
            preceding)
A
Alexander Kuzmenkov 已提交
524
                * direction >= 0)
A
tmp  
Alexander Kuzmenkov 已提交
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
        {
            frame_started = true;
            return;
        }
    }

    frame_started = partition_ended;
}

void WindowTransform::advanceFrameStart()
{
    if (frame_started)
    {
        return;
    }

    const auto frame_start_before = frame_start;

543 544 545 546 547 548
    switch (window_description.frame.begin_type)
    {
        case WindowFrame::BoundaryType::Unbounded:
            // UNBOUNDED PRECEDING, just mark it valid. It is initialized when
            // the new partition starts.
            frame_started = true;
A
tmp  
Alexander Kuzmenkov 已提交
549
            break;
550
        case WindowFrame::BoundaryType::Current:
551 552 553 554 555 556 557
            // CURRENT ROW differs between frame types only in how the peer
            // groups are accounted.
            assert(partition_start <= peer_group_start);
            assert(peer_group_start < partition_end);
            assert(peer_group_start <= current_row);
            frame_start = peer_group_start;
            frame_started = true;
A
tmp  
Alexander Kuzmenkov 已提交
558
            break;
559 560 561 562 563 564
        case WindowFrame::BoundaryType::Offset:
            switch (window_description.frame.type)
            {
                case WindowFrame::FrameType::Rows:
                    advanceFrameStartRowsOffset();
                    break;
A
tmp  
Alexander Kuzmenkov 已提交
565
                case WindowFrame::FrameType::Range:
A
Alexander Kuzmenkov 已提交
566
                    advanceFrameStartRangeOffset();
A
tmp  
Alexander Kuzmenkov 已提交
567 568 569 570 571 572
                    break;
                default:
                    throw Exception(ErrorCodes::NOT_IMPLEMENTED,
                        "Frame start type '{}' for frame '{}' is not implemented",
                        WindowFrame::toString(window_description.frame.begin_type),
                        WindowFrame::toString(window_description.frame.type));
573 574 575 576
            }
            break;
    }

A
Alexander Kuzmenkov 已提交
577
    assert(frame_start_before <= frame_start);
578 579
    if (frame_start == frame_start_before)
    {
A
Alexander Kuzmenkov 已提交
580 581 582 583 584 585 586 587
        // If the frame start didn't move, this means we validated that the frame
        // starts at the point we reached earlier but were unable to validate.
        // This probably only happens in degenerate cases where the frame start
        // is further than the end of partition, and the partition ends at the
        // last row of the block, but we can only tell for sure after a new
        // block arrives. We still have to update the state of aggregate
        // functions when the frame start becomes valid, so we continue.
        assert(frame_started);
588 589 590 591 592 593 594 595 596 597 598
    }

    assert(partition_start <= frame_start);
    assert(frame_start <= partition_end);
    if (partition_ended && frame_start == partition_end)
    {
        // Check that if the start of frame (e.g. FOLLOWING) runs into the end
        // of partition, it is marked as valid -- we can't advance it any
        // further.
        assert(frame_started);
    }
A
Alexander Kuzmenkov 已提交
599 600 601
}

bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
602
{
A
Alexander Kuzmenkov 已提交
603
    if (x == y)
604
    {
A
Alexander Kuzmenkov 已提交
605 606
        // For convenience, a row is always its own peer.
        return true;
607
    }
A
Alexander Kuzmenkov 已提交
608

A
Alexander Kuzmenkov 已提交
609
    if (window_description.frame.type == WindowFrame::FrameType::Rows)
610
    {
A
Alexander Kuzmenkov 已提交
611 612
        // For ROWS frame, row is only peers with itself (checked above);
        return false;
613
    }
614

615
    // For RANGE and GROUPS frames, rows that compare equal w/ORDER BY are peers.
A
Alexander Kuzmenkov 已提交
616
    assert(window_description.frame.type == WindowFrame::FrameType::Range);
617 618 619
    const size_t n = order_by_indices.size();
    if (n == 0)
    {
A
Alexander Kuzmenkov 已提交
620 621
        // No ORDER BY, so all rows are peers.
        return true;
622 623
    }

A
Alexander Kuzmenkov 已提交
624 625
    size_t i = 0;
    for (; i < n; i++)
626
    {
A
Alexander Kuzmenkov 已提交
627 628 629 630
        const auto * column_x = inputAt(x)[order_by_indices[i]].get();
        const auto * column_y = inputAt(y)[order_by_indices[i]].get();
        if (column_x->compareAt(x.row, y.row, *column_y,
                1 /* nan_direction_hint */) != 0)
631
        {
A
Alexander Kuzmenkov 已提交
632
            return false;
633
        }
A
Alexander Kuzmenkov 已提交
634
    }
635

A
Alexander Kuzmenkov 已提交
636 637 638 639 640
    return true;
}

void WindowTransform::advanceFrameEndCurrentRow()
{
A
Alexander Kuzmenkov 已提交
641 642
//    fmt::print(stderr, "starting from frame_end {}\n", frame_end);

A
Alexander Kuzmenkov 已提交
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
    // We only process one block here, and frame_end must be already in it: if
    // we didn't find the end in the previous block, frame_end is now the first
    // row of the current block. We need this knowledge to write a simpler loop
    // (only loop over rows and not over blocks), that should hopefully be more
    // efficient.
    // partition_end is either in this new block or past-the-end.
    assert(frame_end.block  == partition_end.block
        || frame_end.block + 1 == partition_end.block);

    if (frame_end == partition_end)
    {
        // The case when we get a new block and find out that the partition has
        // ended.
        assert(partition_ended);
        frame_ended = partition_ended;
        return;
    }

A
Alexander Kuzmenkov 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
    // We advance until the partition end. It's either in the current block or
    // in the next one, which is also the past-the-end block. Figure out how
    // many rows we have to process.
    uint64_t rows_end;
    if (partition_end.row == 0)
    {
        assert(partition_end == blocksEnd());
        rows_end = blockRowsNumber(frame_end);
    }
    else
    {
        assert(frame_end.block == partition_end.block);
        rows_end = partition_end.row;
    }
    // Equality would mean "no data to process", for which we checked above.
    assert(frame_end.row < rows_end);

//    fmt::print(stderr, "first row {} last {}\n", frame_end.row, rows_end);

A
cleanup  
Alexander Kuzmenkov 已提交
680
    // Advance frame_end while it is still peers with the current row.
A
Alexander Kuzmenkov 已提交
681
    for (; frame_end.row < rows_end; ++frame_end.row)
A
Alexander Kuzmenkov 已提交
682
    {
A
cleanup  
Alexander Kuzmenkov 已提交
683
        if (!arePeers(current_row, frame_end))
684
        {
A
Alexander Kuzmenkov 已提交
685
//            fmt::print(stderr, "{} and {} don't match\n", reference, frame_end);
A
Alexander Kuzmenkov 已提交
686
            frame_ended = true;
687 688 689 690
            return;
        }
    }

A
Alexander Kuzmenkov 已提交
691 692 693 694 695 696 697
    // Might have gotten to the end of the current block, have to properly
    // update the row number.
    if (frame_end.row == blockRowsNumber(frame_end))
    {
        ++frame_end.block;
        frame_end.row = 0;
    }
698

A
Alexander Kuzmenkov 已提交
699 700 701
    // Got to the end of partition (frame ended as well then) or end of data.
    assert(frame_end == partition_end);
    frame_ended = partition_ended;
702 703
}

704 705 706 707 708 709 710
void WindowTransform::advanceFrameEndUnbounded()
{
    // The UNBOUNDED FOLLOWING frame ends when the partition ends.
    frame_end = partition_end;
    frame_ended = partition_ended;
}

A
Alexander Kuzmenkov 已提交
711 712 713 714 715
void WindowTransform::advanceFrameEndRowsOffset()
{
    // Walk the specified offset from the current row. The "+1" is needed
    // because the frame_end is a past-the-end pointer.
    const auto [moved_row, offset_left] = moveRowNumber(current_row,
716
        window_description.frame.end_offset.get<UInt64>()
A
tmp  
Alexander Kuzmenkov 已提交
717 718
            * (window_description.frame.end_preceding ? -1 : 1)
            + 1);
A
Alexander Kuzmenkov 已提交
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745

    if (partition_end <= moved_row)
    {
        // Clamp to the end of partition. It might not have ended yet, in which
        // case wait for more data.
        frame_end = partition_end;
        frame_ended = partition_ended;
        return;
    }

    if (moved_row <= partition_start)
    {
        // Clamp to the start of partition.
        frame_end = partition_start;
        frame_ended = true;
        return;
    }

    // Frame end inside partition, if we walked all the offset, it's final.
    frame_end = moved_row;
    frame_ended = offset_left == 0;

    // If we ran into the start of data (offset left is negative), we won't be
    // able to make progress. Should have handled this case above.
    assert(offset_left >= 0);
}

A
Alexander Kuzmenkov 已提交
746 747
void WindowTransform::advanceFrameEndRangeOffset()
{
A
Alexander Kuzmenkov 已提交
748 749
    // PRECEDING/FOLLOWING change direction for DESC order.
    // See CD 9075-2:201?(E) 7.14 <window clause> p. 429.
A
Alexander Kuzmenkov 已提交
750
    const int direction = window_description.order_by[0].direction;
A
Alexander Kuzmenkov 已提交
751 752
    const bool preceding = window_description.frame.end_preceding
        == (direction > 0);
A
Alexander Kuzmenkov 已提交
753 754
    const auto * reference_column
        = inputAt(current_row)[order_by_indices[0]].get();
A
Alexander Kuzmenkov 已提交
755 756 757 758 759
    for (; frame_end < partition_end; advanceRowNumber(frame_end))
    {
        // The last frame value is current_row with offset, and we need a
        // past-the-end pointer, so we advance while
        // [frame_end] <= [current_row] with offset.
A
Alexander Kuzmenkov 已提交
760 761 762
        const auto * compared_column
            = inputAt(frame_end)[order_by_indices[0]].get();
        if (compare_values_with_offset(compared_column, frame_end.row,
A
Alexander Kuzmenkov 已提交
763 764
            reference_column, current_row.row,
            window_description.frame.end_offset,
A
Alexander Kuzmenkov 已提交
765
            preceding)
A
Alexander Kuzmenkov 已提交
766 767 768 769 770 771 772 773 774 775
                * direction > 0)
        {
            frame_ended = true;
            return;
        }
    }

    frame_ended = partition_ended;
}

776 777
void WindowTransform::advanceFrameEnd()
{
A
Alexander Kuzmenkov 已提交
778 779
    // No reason for this function to be called again after it succeeded.
    assert(!frame_ended);
780 781 782

    const auto frame_end_before = frame_end;

783 784 785 786 787 788 789 790 791
    switch (window_description.frame.end_type)
    {
        case WindowFrame::BoundaryType::Current:
            advanceFrameEndCurrentRow();
            break;
        case WindowFrame::BoundaryType::Unbounded:
            advanceFrameEndUnbounded();
            break;
        case WindowFrame::BoundaryType::Offset:
A
Alexander Kuzmenkov 已提交
792 793 794 795 796
            switch (window_description.frame.type)
            {
                case WindowFrame::FrameType::Rows:
                    advanceFrameEndRowsOffset();
                    break;
A
Alexander Kuzmenkov 已提交
797
                case WindowFrame::FrameType::Range:
A
Alexander Kuzmenkov 已提交
798
                    advanceFrameEndRangeOffset();
A
Alexander Kuzmenkov 已提交
799
                    break;
A
Alexander Kuzmenkov 已提交
800 801 802 803 804 805
                default:
                    throw Exception(ErrorCodes::NOT_IMPLEMENTED,
                        "The frame end type '{}' is not implemented",
                        WindowFrame::toString(window_description.frame.end_type));
            }
            break;
806
    }
807

A
Alexander Kuzmenkov 已提交
808 809
//    fmt::print(stderr, "frame_end {} -> {}\n", frame_end_before, frame_end);

810 811 812 813 814 815
    // We might not have advanced the frame end if we found out we reached the
    // end of input or the partition, or if we still don't know the frame start.
    if (frame_end_before == frame_end)
    {
        return;
    }
816
}
817

818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
// Update the aggregation states after the frame has changed.
void WindowTransform::updateAggregationState()
{
//    fmt::print(stderr, "update agg states [{}, {}) -> [{}, {})\n",
//        prev_frame_start, prev_frame_end, frame_start, frame_end);

    // Assert that the frame boundaries are known, have proper order wrt each
    // other, and have not gone back wrt the previous frame.
    assert(frame_started);
    assert(frame_ended);
    assert(frame_start <= frame_end);
    assert(prev_frame_start <= prev_frame_end);
    assert(prev_frame_start <= frame_start);
    assert(prev_frame_end <= frame_end);

    // We might have to reset aggregation state and/or add some rows to it.
    // Figure out what to do.
    bool reset_aggregation = false;
    RowNumber rows_to_add_start;
    RowNumber rows_to_add_end;
    if (frame_start == prev_frame_start)
    {
        // The frame start didn't change, add the tail rows.
        reset_aggregation = false;
        rows_to_add_start = prev_frame_end;
        rows_to_add_end = frame_end;
844 845 846
    }
    else
    {
847 848 849 850 851 852 853
        // The frame start changed, reset the state and aggregate over the
        // entire frame. This can be made per-function after we learn to
        // subtract rows from some types of aggregation states, but for now we
        // always have to reset when the frame start changes.
        reset_aggregation = true;
        rows_to_add_start = frame_start;
        rows_to_add_end = frame_end;
854
    }
855

856 857
    for (auto & ws : workspaces)
    {
858 859 860 861 862 863
        if (ws.window_function_impl)
        {
            // No need to do anything for true window functions.
            continue;
        }

A
Alexander Kuzmenkov 已提交
864
        const auto * a = ws.aggregate_function.get();
865 866 867
        auto * buf = ws.aggregate_function_state.data();

        if (reset_aggregation)
A
Alexander Kuzmenkov 已提交
868
        {
869 870 871
//            fmt::print(stderr, "(2) reset aggregation\n");
            a->destroy(buf);
            a->create(buf);
872
        }
873

A
Alexander Kuzmenkov 已提交
874 875 876 877 878 879 880 881 882 883 884
        // To achieve better performance, we will have to loop over blocks and
        // rows manually, instead of using advanceRowNumber().
        // For this purpose, the past-the-end block can be different than the
        // block of the past-the-end row (it's usually the next block).
        const auto past_the_end_block = rows_to_add_end.row == 0
            ? rows_to_add_end.block
            : rows_to_add_end.block + 1;

        for (auto block_number = rows_to_add_start.block;
             block_number < past_the_end_block;
             ++block_number)
885
        {
A
Alexander Kuzmenkov 已提交
886 887
            auto & block = blockAt(block_number);

A
Alexander Kuzmenkov 已提交
888
            if (ws.cached_block_number != block_number)
889
            {
A
Alexander Kuzmenkov 已提交
890 891 892 893 894 895
                for (size_t i = 0; i < ws.argument_column_indices.size(); ++i)
                {
                    ws.argument_columns[i] = block.input_columns[
                        ws.argument_column_indices[i]].get();
                }
                ws.cached_block_number = block_number;
896 897
            }

A
Alexander Kuzmenkov 已提交
898 899 900 901 902 903 904 905 906
            // First and last blocks may be processed partially, and other blocks
            // are processed in full.
            const auto first_row = block_number == rows_to_add_start.block
                ? rows_to_add_start.row : 0;
            const auto past_the_end_row = block_number == rows_to_add_end.block
                ? rows_to_add_end.row : block.rows;

            // We should add an addBatch analog that can accept a starting offset.
            // For now, add the values one by one.
907
            auto * columns = ws.argument_columns.data();
A
Alexander Kuzmenkov 已提交
908
            // Removing arena.get() from the loop makes it faster somehow...
909
            auto * arena_ptr = arena.get();
A
Alexander Kuzmenkov 已提交
910 911
            for (auto row = first_row; row < past_the_end_row; ++row)
            {
912
                a->add(buf, columns, row, arena_ptr);
A
Alexander Kuzmenkov 已提交
913
            }
914 915
        }
    }
916 917 918

    prev_frame_start = frame_start;
    prev_frame_end = frame_end;
919 920
}

A
Alexander Kuzmenkov 已提交
921
void WindowTransform::writeOutCurrentRow()
922
{
A
Alexander Kuzmenkov 已提交
923 924
    assert(current_row < partition_end);
    assert(current_row.block >= first_block_number);
925

A
Alexander Kuzmenkov 已提交
926
    const auto & block = blockAt(current_row);
927 928 929
    for (size_t wi = 0; wi < workspaces.size(); ++wi)
    {
        auto & ws = workspaces[wi];
930 931 932

        if (ws.window_function_impl)
        {
933
            ws.window_function_impl->windowInsertResultInto(this, wi);
934 935 936
        }
        else
        {
937
            IColumn * result_column = block.output_columns[wi].get();
A
Alexander Kuzmenkov 已提交
938
            const auto * a = ws.aggregate_function.get();
939 940 941 942 943
            auto * buf = ws.aggregate_function_state.data();
            // FIXME does it also allocate the result on the arena?
            // We'll have to pass it out with blocks then...
            a->insertResultInto(buf, *result_column, arena.get());
        }
944 945
    }
}
946

947 948
void WindowTransform::appendChunk(Chunk & chunk)
{
A
cleanup  
Alexander Kuzmenkov 已提交
949 950
//    fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
//        input_is_finished);
951 952 953 954 955

    // First, prepare the new input block and add it to the queue. We might not
    // have it if it's end of data, though.
    if (!input_is_finished)
    {
A
Alexander Kuzmenkov 已提交
956
        assert(chunk.hasRows());
957 958
        blocks.push_back({});
        auto & block = blocks.back();
959 960 961 962 963
        // Use the number of rows from the Chunk, because it is correct even in
        // the case where the Chunk has no columns. Not sure if this actually
        // happens, because even in the case of `count() over ()` we have a dummy
        // input column.
        block.rows = chunk.getNumRows();
964 965 966 967 968 969 970
        block.input_columns = chunk.detachColumns();

        for (auto & ws : workspaces)
        {
            // Aggregate functions can't work with constant columns, so we have to
            // materialize them like the Aggregator does.
            for (const auto column_index : ws.argument_column_indices)
971
            {
972 973 974
                block.input_columns[column_index]
                    = std::move(block.input_columns[column_index])
                        ->convertToFullColumnIfConst();
A
Alexander Kuzmenkov 已提交
975
            }
976

A
Alexander Kuzmenkov 已提交
977 978
            block.output_columns.push_back(ws.aggregate_function->getReturnType()
                ->createColumn());
979
            block.output_columns.back()->reserve(block.rows);
980 981 982 983 984 985 986
        }
    }

    // Start the calculations. First, advance the partition end.
    for (;;)
    {
        advancePartitionEnd();
987 988
//        fmt::print(stderr, "partition [{}, {}), {}\n",
//            partition_start, partition_end, partition_ended);
989 990 991 992 993 994 995 996 997

        // Either we ran out of data or we found the end of partition (maybe
        // both, but this only happens at the total end of data).
        assert(partition_ended || partition_end == blocksEnd());
        if (partition_ended && partition_end == blocksEnd())
        {
            assert(input_is_finished);
        }

A
Alexander Kuzmenkov 已提交
998 999 1000 1001
        // After that, try to calculate window functions for each next row.
        // We can continue until the end of partition or current end of data,
        // which is precisely the definition of `partition_end`.
        while (current_row < partition_end)
1002
        {
1003 1004 1005 1006
//            fmt::print(stderr, "(1) row {} frame [{}, {}) {}, {}\n",
//                current_row, frame_start, frame_end,
//                frame_started, frame_ended);

1007 1008 1009 1010 1011
            // We now know that the current row is valid, so we can update the
            // peer group start.
            if (!arePeers(peer_group_start, current_row))
            {
                peer_group_start = current_row;
1012 1013
                peer_group_start_row_number = current_row_number;
                ++peer_group_number;
1014 1015
            }

1016
            // Advance the frame start.
1017
            advanceFrameStart();
1018 1019 1020 1021 1022 1023

            if (!frame_started)
            {
                // Wait for more input data to find the start of frame.
                assert(!input_is_finished);
                assert(!partition_ended);
1024
                return;
1025 1026
            }

1027 1028 1029 1030 1031 1032 1033
            // frame_end must be greater or equal than frame_start, so if the
            // frame_start is already past the current frame_end, we can start
            // from it to save us some work.
            if (frame_end < frame_start)
            {
                frame_end = frame_start;
            }
1034

1035 1036
            // Advance the frame end.
            advanceFrameEnd();
A
Alexander Kuzmenkov 已提交
1037

1038 1039
            if (!frame_ended)
            {
1040 1041 1042
                // Wait for more input data to find the end of frame.
                assert(!input_is_finished);
                assert(!partition_ended);
1043 1044 1045
                return;
            }

1046 1047 1048 1049
//            fmt::print(stderr, "(2) row {} frame [{}, {}) {}, {}\n",
//                current_row, frame_start, frame_end,
//                frame_started, frame_ended);

1050 1051 1052
            // The frame can be empty sometimes, e.g. the boundaries coincide
            // or the start is after the partition end. But hopefully start is
            // not after end.
A
Alexander Kuzmenkov 已提交
1053 1054
            assert(frame_started);
            assert(frame_ended);
1055
            assert(frame_start <= frame_end);
1056

1057 1058 1059 1060 1061 1062 1063
            // Now that we know the new frame boundaries, update the aggregation
            // states. Theoretically we could do this simultaneously with moving
            // the frame boundaries, but it would require some care not to
            // perform unnecessary work while we are still looking for the frame
            // start, so do it the simple way for now.
            updateAggregationState();

A
Alexander Kuzmenkov 已提交
1064 1065
            // Write out the aggregation results.
            writeOutCurrentRow();
1066

A
Alexander Kuzmenkov 已提交
1067
            // Move to the next row. The frame will have to be recalculated.
1068 1069
            // The peer group start is updated at the beginning of the loop,
            // because current_row might now be past-the-end.
A
Alexander Kuzmenkov 已提交
1070
            advanceRowNumber(current_row);
1071
            ++current_row_number;
A
Alexander Kuzmenkov 已提交
1072 1073
            first_not_ready_row = current_row;
            frame_ended = false;
1074
            frame_started = false;
1075
        }
1076

1077 1078 1079 1080 1081
        if (input_is_finished)
        {
            // We finalized the last partition in the above loop, and don't have
            // to do anything else.
            return;
1082 1083 1084 1085
        }

        if (!partition_ended)
        {
1086 1087 1088
            // Wait for more input data to find the end of partition.
            // Assert that we processed all the data we currently have, and that
            // we are going to receive more data.
1089
            assert(partition_end == blocksEnd());
1090
            assert(!input_is_finished);
1091 1092 1093 1094
            break;
        }

        // Start the next partition.
1095
        partition_start = partition_end;
1096 1097
        advanceRowNumber(partition_end);
        partition_ended = false;
A
cleanup  
Alexander Kuzmenkov 已提交
1098 1099
        // We have to reset the frame and other pointers when the new partition
        // starts.
1100 1101
        frame_start = partition_start;
        frame_end = partition_start;
1102 1103
        prev_frame_start = partition_start;
        prev_frame_end = partition_start;
1104
        assert(current_row == partition_start);
1105
        current_row_number = 1;
1106
        peer_group_start = partition_start;
1107 1108
        peer_group_start_row_number = 1;
        peer_group_number = 1;
1109

A
cleanup  
Alexander Kuzmenkov 已提交
1110 1111
//        fmt::print(stderr, "reinitialize agg data at start of {}\n",
//            new_partition_start);
1112 1113
        // Reinitialize the aggregate function states because the new partition
        // has started.
1114 1115
        for (auto & ws : workspaces)
        {
1116 1117 1118 1119 1120
            if (ws.window_function_impl)
            {
                continue;
            }

A
Alexander Kuzmenkov 已提交
1121
            const auto * a = ws.aggregate_function.get();
1122 1123
            auto * buf = ws.aggregate_function_state.data();

1124 1125
            a->destroy(buf);
        }
A
Alexander Kuzmenkov 已提交
1126

1127 1128 1129 1130 1131 1132
        // Release the arena we use for aggregate function states, so that it
        // doesn't grow without limit. Not sure if it's actually correct, maybe
        // it allocates the return values in the Arena as well...
        if (arena)
        {
            arena = std::make_unique<Arena>();
A
Alexander Kuzmenkov 已提交
1133 1134
        }

1135 1136
        for (auto & ws : workspaces)
        {
1137 1138 1139 1140 1141
            if (ws.window_function_impl)
            {
                continue;
            }

A
Alexander Kuzmenkov 已提交
1142
            const auto * a = ws.aggregate_function.get();
1143
            auto * buf = ws.aggregate_function_state.data();
A
Alexander Kuzmenkov 已提交
1144

1145 1146 1147
            a->create(buf);
        }
    }
A
Alexander Kuzmenkov 已提交
1148 1149
}

A
Alexander Kuzmenkov 已提交
1150 1151
IProcessor::Status WindowTransform::prepare()
{
A
cleanup  
Alexander Kuzmenkov 已提交
1152 1153 1154
//    fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n",
//        next_output_block_number, first_not_ready_row, first_block_number,
//        blocks.size());
1155

A
Alexander Kuzmenkov 已提交
1156 1157
    if (output.isFinished())
    {
1158 1159
        // The consumer asked us not to continue (or we decided it ourselves),
        // so we abort.
A
Alexander Kuzmenkov 已提交
1160 1161 1162 1163
        input.close();
        return Status::Finished;
    }

1164 1165 1166 1167 1168 1169 1170 1171 1172
    if (output_data.exception)
    {
        // An exception occurred during processing.
        output.pushData(std::move(output_data));
        output.finish();
        input.close();
        return Status::Finished;
    }

1173
    assert(first_not_ready_row.block >= first_block_number);
A
cleanup  
Alexander Kuzmenkov 已提交
1174 1175 1176
    // The first_not_ready_row might be past-the-end if we have already
    // calculated the window functions for all input rows. That's why the
    // equality is also valid here.
1177 1178 1179 1180 1181 1182 1183
    assert(first_not_ready_row.block <= first_block_number + blocks.size());
    assert(next_output_block_number >= first_block_number);

    // Output the ready data prepared by work().
    // We inspect the calculation state and create the output chunk right here,
    // because this is pretty lightweight.
    if (next_output_block_number < first_not_ready_row.block)
A
Alexander Kuzmenkov 已提交
1184
    {
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
        if (output.canPush())
        {
            // Output the ready block.
            const auto i = next_output_block_number - first_block_number;
            auto & block = blocks[i];
            auto columns = block.input_columns;
            for (auto & res : block.output_columns)
            {
                columns.push_back(ColumnPtr(std::move(res)));
            }
1195
            output_data.chunk.setColumns(columns, block.rows);
1196

1197 1198 1199 1200 1201 1202
//            fmt::print(stderr, "output block {} as chunk '{}'\n",
//                next_output_block_number,
//                output_data.chunk.dumpStructure());

            ++next_output_block_number;

1203 1204
            output.pushData(std::move(output_data));
        }
A
Alexander Kuzmenkov 已提交
1205

1206 1207
        // We don't need input.setNotNeeded() here, because we already pull with
        // the set_not_needed flag.
A
Alexander Kuzmenkov 已提交
1208 1209 1210
        return Status::PortFull;
    }

1211
    if (input_is_finished)
A
Alexander Kuzmenkov 已提交
1212
    {
1213 1214 1215 1216 1217
        // The input data ended at the previous prepare() + work() cycle,
        // and we don't have ready output data (checked above). We must be
        // finished.
        assert(next_output_block_number == first_block_number + blocks.size());
        assert(first_not_ready_row == blocksEnd());
A
Alexander Kuzmenkov 已提交
1218

1219 1220
        // FIXME do we really have to do this?
        output.finish();
A
Alexander Kuzmenkov 已提交
1221

1222 1223
        return Status::Finished;
    }
A
Alexander Kuzmenkov 已提交
1224

1225 1226 1227
    // Consume input data if we have any ready.
    if (!has_input && input.hasData())
    {
1228 1229 1230 1231 1232
        // Pulling with set_not_needed = true and using an explicit setNeeded()
        // later is somewhat more efficient, because after the setNeeded(), the
        // required input block will be generated in the same thread and passed
        // to our prepare() + work() methods in the same thread right away, so
        // hopefully we will work on hot (cached) data.
A
Alexander Kuzmenkov 已提交
1233
        input_data = input.pullData(true /* set_not_needed */);
A
Alexander Kuzmenkov 已提交
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

        // If we got an exception from input, just return it and mark that we're
        // finished.
        if (input_data.exception)
        {
            output.pushData(std::move(input_data));
            output.finish();

            return Status::PortFull;
        }

A
Alexander Kuzmenkov 已提交
1245 1246
        has_input = true;

1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
        // Now we have new input and can try to generate more output in work().
        return Status::Ready;
    }

    // We 1) don't have any ready output (checked above),
    // 2) don't have any more input (also checked above).
    // Will we get any more input?
    if (input.isFinished())
    {
        // We won't, time to finalize the calculation in work(). We should only
        // do this once.
        assert(!input_is_finished);
        input_is_finished = true;
        return Status::Ready;
A
Alexander Kuzmenkov 已提交
1261 1262
    }

1263 1264 1265
    // We have to wait for more input.
    input.setNeeded();
    return Status::NeedData;
A
Alexander Kuzmenkov 已提交
1266 1267 1268 1269
}

void WindowTransform::work()
{
A
Alexander Kuzmenkov 已提交
1270 1271
    // Exceptions should be skipped in prepare().
    assert(!input_data.exception);
A
Alexander Kuzmenkov 已提交
1272

1273 1274
    assert(has_input || input_is_finished);

A
Alexander Kuzmenkov 已提交
1275 1276
    try
    {
1277 1278
        has_input = false;
        appendChunk(input_data.chunk);
A
Alexander Kuzmenkov 已提交
1279 1280 1281 1282 1283 1284 1285 1286
    }
    catch (DB::Exception &)
    {
        output_data.exception = std::current_exception();
        has_input = false;
        return;
    }

1287 1288 1289
    // We don't really have to keep the entire partition, and it can be big, so
    // we want to drop the starting blocks to save memory.
    // We can drop the old blocks if we already returned them as output, and the
1290 1291 1292
    // frame and the current row are already past them. Note that the frame
    // start can be further than current row for some frame specs (e.g. EXCLUDE
    // CURRENT ROW), so we have to check both.
1293
    const auto first_used_block = std::min(next_output_block_number,
1294 1295
        std::min(frame_start.block, current_row.block));

1296 1297
    if (first_block_number < first_used_block)
    {
A
cleanup  
Alexander Kuzmenkov 已提交
1298 1299
//        fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number,
//            first_used_block);
1300 1301

        blocks.erase(blocks.begin(),
1302
            blocks.begin() + (first_used_block - first_block_number));
1303
        first_block_number = first_used_block;
A
Alexander Kuzmenkov 已提交
1304

1305 1306
        assert(next_output_block_number >= first_block_number);
        assert(frame_start.block >= first_block_number);
A
Alexander Kuzmenkov 已提交
1307
        assert(current_row.block >= first_block_number);
1308
        assert(peer_group_start.block >= first_block_number);
1309
    }
A
Alexander Kuzmenkov 已提交
1310 1311
}

1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
// A basic implementation for a true window function. It pretends to be an
// aggregate function, but refuses to work as such.
struct WindowFunction
    : public IAggregateFunctionHelper<WindowFunction>
    , public IWindowFunction
{
    std::string name;

    WindowFunction(const std::string & name_, const DataTypes & argument_types_,
               const Array & parameters_)
        : IAggregateFunctionHelper<WindowFunction>(argument_types_, parameters_)
        , name(name_)
    {}

    IWindowFunction * asWindowFunction() override { return this; }

    [[noreturn]] void fail() const
    {
        throw Exception(ErrorCodes::BAD_ARGUMENTS,
            "The function '{}' can only be used as a window function, not as an aggregate function",
            getName());
    }

    String getName() const override { return name; }
    void create(AggregateDataPtr __restrict) const override { fail(); }
    void destroy(AggregateDataPtr __restrict) const noexcept override {}
    bool hasTrivialDestructor() const override { return true; }
    size_t sizeOfData() const override { return 0; }
    size_t alignOfData() const override { return 1; }
    void add(AggregateDataPtr __restrict, const IColumn **, size_t, Arena *) const override { fail(); }
    void merge(AggregateDataPtr __restrict, ConstAggregateDataPtr, Arena *) const override { fail(); }
    void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &) const override { fail(); }
    void deserialize(AggregateDataPtr __restrict, ReadBuffer &, Arena *) const override { fail(); }
    void insertResultInto(AggregateDataPtr __restrict, IColumn &, Arena *) const override { fail(); }
};

struct WindowFunctionRank final : public WindowFunction
{
    WindowFunctionRank(const std::string & name_,
            const DataTypes & argument_types_, const Array & parameters_)
        : WindowFunction(name_, argument_types_, parameters_)
    {}

    DataTypePtr getReturnType() const override
    { return std::make_shared<DataTypeUInt64>(); }

1358 1359
    void windowInsertResultInto(const WindowTransform * transform,
        size_t function_index) override
1360
    {
1361 1362
        IColumn & to = *transform->blockAt(transform->current_row)
            .output_columns[function_index];
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
        assert_cast<ColumnUInt64 &>(to).getData().push_back(
            transform->peer_group_start_row_number);
    }
};

struct WindowFunctionDenseRank final : public WindowFunction
{
    WindowFunctionDenseRank(const std::string & name_,
            const DataTypes & argument_types_, const Array & parameters_)
        : WindowFunction(name_, argument_types_, parameters_)
    {}

    DataTypePtr getReturnType() const override
    { return std::make_shared<DataTypeUInt64>(); }

1378 1379
    void windowInsertResultInto(const WindowTransform * transform,
        size_t function_index) override
1380
    {
1381 1382
        IColumn & to = *transform->blockAt(transform->current_row)
            .output_columns[function_index];
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
        assert_cast<ColumnUInt64 &>(to).getData().push_back(
            transform->peer_group_number);
    }
};

struct WindowFunctionRowNumber final : public WindowFunction
{
    WindowFunctionRowNumber(const std::string & name_,
            const DataTypes & argument_types_, const Array & parameters_)
        : WindowFunction(name_, argument_types_, parameters_)
    {}

    DataTypePtr getReturnType() const override
    { return std::make_shared<DataTypeUInt64>(); }

1398 1399
    void windowInsertResultInto(const WindowTransform * transform,
        size_t function_index) override
1400
    {
1401 1402
        IColumn & to = *transform->blockAt(transform->current_row)
            .output_columns[function_index];
1403 1404 1405 1406 1407
        assert_cast<ColumnUInt64 &>(to).getData().push_back(
            transform->current_row_number);
    }
};

A
fixes  
Alexander Kuzmenkov 已提交
1408
// ClickHouse-specific variant of lag/lead that respects the window frame.
1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
template <bool is_lead>
struct WindowFunctionLagLeadInFrame final : public WindowFunction
{
    WindowFunctionLagLeadInFrame(const std::string & name_,
            const DataTypes & argument_types_, const Array & parameters_)
        : WindowFunction(name_, argument_types_, parameters_)
    {
        if (!parameters.empty())
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                "Function {} cannot be parameterized", name_);
        }

        if (argument_types.empty())
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                "Function {} takes at least one argument", name_);
        }

        if (argument_types.size() == 1)
        {
            return;
        }

        if (!isInt64FieldType(argument_types[1]->getDefault().getType()))
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                "Offset must be an integer, '{}' given",
                argument_types[1]->getName());
        }

        if (argument_types.size() == 2)
        {
            return;
        }

        if (!getLeastSupertype({argument_types[0], argument_types[2]}))
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                "The default value type '{}' is not convertible to the argument type '{}'",
                argument_types[2]->getName(),
                argument_types[0]->getName());
        }

        if (argument_types.size() > 3)
        {
            throw Exception(ErrorCodes::BAD_ARGUMENTS,
                "Function '{}' accepts at most 3 arguments, {} given",
                name, argument_types.size());
        }
    }

    DataTypePtr getReturnType() const override
    { return argument_types[0]; }

    void windowInsertResultInto(const WindowTransform * transform,
        size_t function_index) override
    {
A
cleanup  
Alexander Kuzmenkov 已提交
1467
        const auto & current_block = transform->blockAt(transform->current_row);
1468
        IColumn & to = *current_block.output_columns[function_index];
A
cleanup  
Alexander Kuzmenkov 已提交
1469
        const auto & workspace = transform->workspaces[function_index];
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514

        int offset = 1;
        if (argument_types.size() > 1)
        {
            offset = (*current_block.input_columns[
                    workspace.argument_column_indices[1]])[
                        transform->current_row.row].get<Int64>();
            if (offset < 0)
            {
                throw Exception(ErrorCodes::BAD_ARGUMENTS,
                    "The offset for function {} must be nonnegative, {} given",
                    getName(), offset);
            }
        }

        const auto [target_row, offset_left] = transform->moveRowNumber(
            transform->current_row, offset * (is_lead ? 1 : -1));

        if (offset_left != 0
            || target_row < transform->frame_start
            || transform->frame_end <= target_row)
        {
            // Offset is outside the frame.
            if (argument_types.size() > 2)
            {
                // Column with default values is specified.
                to.insertFrom(*current_block.input_columns[
                            workspace.argument_column_indices[2]],
                    transform->current_row.row);
            }
            else
            {
                to.insertDefault();
            }
        }
        else
        {
            // Offset is inside the frame.
            to.insertFrom(*transform->blockAt(target_row).input_columns[
                    workspace.argument_column_indices[0]],
                target_row.row);
        }
    }
};

1515 1516
void registerWindowFunctions(AggregateFunctionFactory & factory)
{
A
cleanup  
Alexander Kuzmenkov 已提交
1517 1518 1519 1520 1521 1522 1523 1524 1525
    // Why didn't I implement lag/lead yet? Because they are a mess. I imagine
    // they are from the older generation of window functions, when the concept
    // of frame was not yet invented, so they ignore the frame and use the
    // partition instead. This means we have to track a separate frame for
    // these functions, which would  make the window transform completely
    // impenetrable to human mind. We can't just get away with materializing
    // the whole partition like Postgres does, because using a linear amount
    // of additional memory is not an option when we have a lot of data. We must
    // be able to process at least the lag/lead in streaming fashion.
1526 1527 1528 1529
    // A partial solution for constant offsets is rewriting, say `lag(value, offset)
    // to `any(value) over (rows between offset preceding and offset preceding)`.
    // We also implement non-standard functions `lag/lead_in_frame`, that are
    // analogous to `lag/lead`, but respect the frame.
A
cleanup  
Alexander Kuzmenkov 已提交
1530 1531 1532 1533 1534
    // Functions like cume_dist() do require materializing the entire
    // partition, but it's probably also simpler to implement them by rewriting
    // to a (rows between unbounded preceding and unbounded following) frame,
    // instead of adding separate logic for them.

1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
    factory.registerFunction("rank", [](const std::string & name,
            const DataTypes & argument_types, const Array & parameters)
        {
            return std::make_shared<WindowFunctionRank>(name, argument_types,
                parameters);
        });

    factory.registerFunction("dense_rank", [](const std::string & name,
            const DataTypes & argument_types, const Array & parameters)
        {
            return std::make_shared<WindowFunctionDenseRank>(name, argument_types,
                parameters);
        });

    factory.registerFunction("row_number", [](const std::string & name,
            const DataTypes & argument_types, const Array & parameters)
        {
            return std::make_shared<WindowFunctionRowNumber>(name, argument_types,
                parameters);
        });
1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568

    factory.registerFunction("lag_in_frame", [](const std::string & name,
            const DataTypes & argument_types, const Array & parameters)
        {
            return std::make_shared<WindowFunctionLagLeadInFrame<false>>(
                name, argument_types, parameters);
        });

    factory.registerFunction("lead_in_frame", [](const std::string & name,
            const DataTypes & argument_types, const Array & parameters)
        {
            return std::make_shared<WindowFunctionLagLeadInFrame<true>>(
                name, argument_types, parameters);
        });
1569
}
A
Alexander Kuzmenkov 已提交
1570

A
Alexander Kuzmenkov 已提交
1571
}