AddingSelectorTransform.cpp 2.4 KB
Newer Older
N
Nikolai Kochetov 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/SelectorInfo.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

AddingSelectorTransform::AddingSelectorTransform(
    const Block & header, size_t num_outputs_, ColumnNumbers key_columns_)
    : ISimpleTransform(header, header, false)
    , num_outputs(num_outputs_)
    , key_columns(std::move(key_columns_))
    , hash(0)
{
    setInputNotNeededAfterRead(false);

    if (num_outputs <= 1)
        throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs),
                        ErrorCodes::LOGICAL_ERROR);

    if (key_columns.empty())
        throw Exception("SplittingByHashTransform cannot split by empty set of key columns",
                        ErrorCodes::LOGICAL_ERROR);

    for (auto & column : key_columns)
        if (column >= header.columns())
            throw Exception("Invalid column number: " + std::to_string(column) +
                            ". There is only " + std::to_string(header.columns()) + " columns in header",
                            ErrorCodes::LOGICAL_ERROR);
}

static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash)
{
    auto num_rows = chunk.getNumRows();
    const auto & columns = chunk.getColumns();

    hash.reset(num_rows);

    for (const auto & column_number : key_columns)
        columns[column_number]->updateWeakHash32(hash);
}

static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs)
{
    /// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i.

    const auto & hash_data = hash.getData();
    size_t num_rows = hash_data.size();
    IColumn::Selector selector(num_rows);

    for (size_t row = 0; row < num_rows; ++row)
    {
        selector[row] = hash_data[row]; /// [0, 2^32)
        selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values.
        selector[row] >>= 32u; /// [0, num_outputs)
    }

    return selector;
}

void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk)
{
    auto chunk_info = std::make_shared<SelectorInfo>();

    calculateWeakHash32(input_chunk, key_columns, hash);
    chunk_info->selector = fillSelector(hash, num_outputs);

    input_chunk.swap(output_chunk);
    output_chunk.setChunkInfo(std::move(chunk_info));
}

}