ColumnUnique.h 18.3 KB
Newer Older
1
#pragma once
2
#include <Columns/IColumnUnique.h>
3 4
#include <Columns/ReverseIndex.h>

5 6
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
7
#include <Columns/ColumnString.h>
8
#include <Columns/ColumnFixedString.h>
9

N
Nikolai Kochetov 已提交
10
#include <DataTypes/DataTypeNullable.h>
11
#include <DataTypes/NumberTraits.h>
12

13 14
#include <Common/typeid_cast.h>
#include <ext/range.h>
15

16 17 18
namespace DB
{

19

20 21
template <typename ColumnType>
class ColumnUnique final : public COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>>
22
{
23
    friend class COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>>;
24 25

private:
26
    explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable);
27
    explicit ColumnUnique(const IDataType & type);
28
    ColumnUnique(const ColumnUnique & other);
29

30
public:
31 32 33
    MutableColumnPtr cloneEmpty() const override;

    const ColumnPtr & getNestedColumn() const override;
34 35
    const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; }

36 37
    size_t uniqueInsert(const Field & x) override;
    size_t uniqueInsertFrom(const IColumn & src, size_t n) override;
38 39 40
    MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
    IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length,
                                                                     size_t max_dictionary_size) override;
41 42 43
    size_t uniqueInsertData(const char * pos, size_t length) override;
    size_t uniqueInsertDataWithTerminatingZero(const char * pos, size_t length) override;
    size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override;
44

45 46 47 48
    size_t getDefaultValueIndex() const override { return is_nullable ? 1 : 0; }
    size_t getNullValueIndex() const override;
    bool canContainNulls() const override { return is_nullable; }

49 50 51
    Field operator[](size_t n) const override { return (*getNestedColumn())[n]; }
    void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); }
    StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); }
N
Nikolai Kochetov 已提交
52 53
    StringRef getDataAtWithTerminatingZero(size_t n) const override
    {
54
        return getNestedColumn()->getDataAtWithTerminatingZero(n);
N
Nikolai Kochetov 已提交
55
    }
56 57 58
    UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); }
    UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
    Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
59
    bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
N
Nikolai Kochetov 已提交
60 61 62 63 64 65
    StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override
    {
        return column_holder->serializeValueIntoArena(n, arena, begin);
    }
    void updateHashWithValue(size_t n, SipHash & hash) const override
    {
66
        return getNestedColumn()->updateHashWithValue(n, hash);
N
Nikolai Kochetov 已提交
67
    }
68

N
Nikolai Kochetov 已提交
69 70
    int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
    {
A
Alexey Milovidov 已提交
71
        auto & column_unique = static_cast<const IColumnUnique &>(rhs);
72
        return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint);
73
    }
74

75 76 77 78 79 80 81
    void getExtremes(Field & min, Field & max) const override { column_holder->getExtremes(min, max); }
    bool valuesHaveFixedSize() const override { return column_holder->valuesHaveFixedSize(); }
    bool isFixedAndContiguous() const override { return column_holder->isFixedAndContiguous(); }
    size_t sizeOfValueIfFixed() const override { return column_holder->sizeOfValueIfFixed(); }
    bool isNumeric() const override { return column_holder->isNumeric(); }

    size_t byteSize() const override { return column_holder->byteSize(); }
N
Nikolai Kochetov 已提交
82 83
    size_t allocatedBytes() const override
    {
84
        return column_holder->allocatedBytes()
85
               + index.allocatedBytes()
86
               + (cached_null_mask ? cached_null_mask->allocatedBytes() : 0);
87 88 89
    }
    void forEachSubcolumn(IColumn::ColumnCallback callback) override
    {
90
        callback(column_holder);
91
        index.setColumn(getRawColumnPtr());
N
Nikolai Kochetov 已提交
92
    }
93 94 95

private:

96
    ColumnPtr column_holder;
97 98
    bool is_nullable;
    ReverseIndex<UInt64, ColumnType> index;
N
Nikolai Kochetov 已提交
99

100 101
    /// For DataTypeNullable, stores null map.
    mutable ColumnPtr cached_null_mask;
102
    mutable ColumnPtr cached_column_nullable;
N
Nikolai Kochetov 已提交
103

104 105
    static size_t numSpecialValues(bool is_nullable) { return is_nullable ? 2 : 1; }
    size_t numSpecialValues() const { return numSpecialValues(is_nullable); }
106

107
    ColumnType * getRawColumnPtr() { return static_cast<ColumnType *>(column_holder->assumeMutable().get()); }
108
    const ColumnType * getRawColumnPtr() const { return static_cast<const ColumnType *>(column_holder.get()); }
109

110 111
    template <typename IndexType>
    MutableColumnPtr uniqueInsertRangeImpl(
112 113 114
        const IColumn & src,
        size_t start,
        size_t length,
115
        size_t num_added_rows,
116
        typename ColumnVector<IndexType>::MutablePtr && positions_column,
117
        ReverseIndex<UInt64, ColumnType> * secondary_index,
118
        size_t max_dictionary_size);
119 120
};

121 122 123 124 125 126
template <typename ColumnType>
MutableColumnPtr ColumnUnique<ColumnType>::cloneEmpty() const
{
    return ColumnUnique<ColumnType>::create(column_holder->cloneResized(numSpecialValues()), is_nullable);
}

127
template <typename ColumnType>
128 129 130 131 132 133 134 135 136 137 138 139
ColumnUnique<ColumnType>::ColumnUnique(const ColumnUnique & other)
    : column_holder(other.column_holder)
    , is_nullable(other.is_nullable)
    , index(numSpecialValues(is_nullable), 0)
{
    index.setColumn(getRawColumnPtr());
}

template <typename ColumnType>
ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type)
    : is_nullable(type.isNullable())
    , index(numSpecialValues(is_nullable), 0)
N
Nikolai Kochetov 已提交
140
{
141 142
    const auto & holder_type = is_nullable ? *static_cast<const DataTypeNullable &>(type).getNestedType() : type;
    column_holder = holder_type.createColumn()->cloneResized(numSpecialValues());
143
    index.setColumn(getRawColumnPtr());
N
Nikolai Kochetov 已提交
144 145
}

146 147
template <typename ColumnType>
ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nullable)
148 149 150
    : column_holder(std::move(holder))
    , is_nullable(is_nullable)
    , index(numSpecialValues(is_nullable), 0)
151
{
152 153
    if (column_holder->size() < numSpecialValues())
        throw Exception("Too small holder column for ColumnUnique.", ErrorCodes::ILLEGAL_COLUMN);
154
    if (column_holder->isColumnNullable())
155
        throw Exception("Holder column for ColumnUnique can't be nullable.", ErrorCodes::ILLEGAL_COLUMN);
156 157

    index.setColumn(getRawColumnPtr());
158 159
}

160
template <typename ColumnType>
161
const ColumnPtr & ColumnUnique<ColumnType>::getNestedColumn() const
N
Nikolai Kochetov 已提交
162 163 164
{
    if (is_nullable)
    {
165 166 167 168 169 170
        size_t size = getRawColumnPtr()->size();
        if (!cached_null_mask)
        {
            ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0));
            null_mask->getData()[getNullValueIndex()] = 1;
            cached_null_mask = std::move(null_mask);
171
            cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask);
172 173 174 175 176 177 178
        }

        if (cached_null_mask->size() != size)
        {
            MutableColumnPtr null_mask = (*std::move(cached_null_mask)).mutate();
            static_cast<ColumnUInt8 &>(*null_mask).getData().resize_fill(size);
            cached_null_mask = std::move(null_mask);
179
            cached_column_nullable = ColumnNullable::create(column_holder, cached_null_mask);
180 181
        }

182
        return cached_column_nullable;
N
Nikolai Kochetov 已提交
183 184 185 186
    }
    return column_holder;
}

187 188
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::getNullValueIndex() const
189 190
{
    if (!is_nullable)
191
        throw Exception("ColumnUnique can't contain null values.", ErrorCodes::LOGICAL_ERROR);
192 193 194 195

    return 0;
}

196 197
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
198 199 200 201 202
{
    if (x.getType() == Field::Types::Null)
        return getNullValueIndex();

    auto column = getRawColumnPtr();
203
    auto prev_size = static_cast<UInt64>(column->size());
204 205 206 207 208

    if ((*column)[getDefaultValueIndex()] == x)
        return getDefaultValueIndex();

    column->insert(x);
209
    auto pos = index.insert(prev_size);
210 211 212
    if (pos != prev_size)
        column->popBack(1);

213
    return pos;
214 215
}

216 217
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n)
218
{
219 220 221
    if (is_nullable && src.isNullAt(n))
        return getNullValueIndex();

222 223 224
    if (auto * nullable = typeid_cast<const ColumnNullable *>(&src))
        return uniqueInsertFrom(nullable->getNestedColumn(), n);

225 226 227 228
    auto ref = src.getDataAt(n);
    return uniqueInsertData(ref.data, ref.size);
}

229 230
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t length)
231 232 233
{
    auto column = getRawColumnPtr();

234
    if (column->getDataAt(getDefaultValueIndex()) == StringRef(pos, length))
235 236
        return getDefaultValueIndex();

237
    UInt64 size = column->size();
238
    UInt64 insertion_point = index.getInsertionPoint(StringRef(pos, length));
239

240
    if (insertion_point == size)
241
    {
242
        column->insertData(pos, length);
243
        index.insertFromLastRow();
244 245
    }

246
    return insertion_point;
247 248
}

249 250
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertDataWithTerminatingZero(const char * pos, size_t length)
251 252
{
    if (std::is_same<ColumnType, ColumnString>::value)
253
        return uniqueInsertData(pos, length - 1);
254 255

    if (column_holder->valuesHaveFixedSize())
256
        return uniqueInsertData(pos, length);
257 258 259 260 261

    /// Don't know if data actually has terminating zero. So, insert it firstly.

    auto column = getRawColumnPtr();
    size_t prev_size = column->size();
262
    column->insertDataWithTerminatingZero(pos, length);
263 264 265 266 267 268 269

    if (column->compareAt(getDefaultValueIndex(), prev_size, *column, 1) == 0)
    {
        column->popBack(1);
        return getDefaultValueIndex();
    }

270
    auto position = index.insert(prev_size);
271
    if (position != prev_size)
272 273
        column->popBack(1);

274
    return static_cast<size_t>(position);
275 276
}

277 278
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos)
279 280 281 282 283 284 285 286 287 288 289
{
    auto column = getRawColumnPtr();
    size_t prev_size = column->size();
    new_pos = column->deserializeAndInsertFromArena(pos);

    if (column->compareAt(getDefaultValueIndex(), prev_size, *column, 1) == 0)
    {
        column->popBack(1);
        return getDefaultValueIndex();
    }

290
    auto index_pos = index.insert(prev_size);
291 292 293 294 295 296
    if (index_pos != prev_size)
        column->popBack(1);

    return static_cast<size_t>(index_pos);
}

297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
template <typename IndexType>
static void checkIndexes(const ColumnVector<IndexType> & indexes, size_t max_dictionary_size)
{
    auto & data = indexes.getData();
    for (size_t i = 0; i < data.size(); ++i)
    {
        if (data[i] >= max_dictionary_size)
        {
            throw Exception("Found index " + toString(data[i]) + " at position " + toString(i)
                            + " which is grated or equal than dictionary size " + toString(max_dictionary_size),
                            ErrorCodes::LOGICAL_ERROR);
        }
    }
}

312 313 314
template <typename ColumnType>
template <typename IndexType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
315 316 317
    const IColumn & src,
    size_t start,
    size_t length,
318
    size_t num_added_rows,
319
    typename ColumnVector<IndexType>::MutablePtr && positions_column,
320
    ReverseIndex<UInt64, ColumnType> * secondary_index,
321
    size_t max_dictionary_size)
322 323 324
{
    const ColumnType * src_column;
    const NullMap * null_map = nullptr;
325 326
    auto & positions = positions_column->getData();

327
    auto update_position = [&](UInt64 & next_position) -> MutableColumnPtr
328
    {
329 330 331
        constexpr auto next_size = NumberTraits::nextSize(sizeof(IndexType));
        using SuperiorIndexType = typename NumberTraits::Construct<false, false, next_size>::Type;

332 333 334 335 336 337 338 339
        ++next_position;

        if (next_position > std::numeric_limits<IndexType>::max())
        {
            if (sizeof(SuperiorIndexType) == sizeof(IndexType))
                throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()),
                                ErrorCodes::LOGICAL_ERROR);

340
            auto expanded_column = ColumnVector<SuperiorIndexType>::create(length);
341 342 343 344 345 346
            auto & expanded_data = expanded_column->getData();
            for (size_t i = 0; i < num_added_rows; ++i)
                expanded_data[i] = positions[i];

            return uniqueInsertRangeImpl<SuperiorIndexType>(
                    src,
347 348 349
                    start,
                    length,
                    num_added_rows,
350
                    std::move(expanded_column),
351
                    secondary_index,
352 353 354 355 356
                    max_dictionary_size);
        }

        return nullptr;
    };
357

358
    if (auto nullable_column = typeid_cast<const ColumnNullable *>(&src))
359
    {
360
        src_column = typeid_cast<const ColumnType *>(&nullable_column->getNestedColumn());
361 362 363
        null_map = &nullable_column->getNullMapData();
    }
    else
364
        src_column = typeid_cast<const ColumnType *>(&src);
365

366 367 368
    if (src_column == nullptr)
        throw Exception("Invalid column type for ColumnUnique::insertRangeFrom. Expected " + column_holder->getName() +
                        ", got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
369

370 371
    auto column = getRawColumnPtr();

372
    UInt64 next_position = column->size();
373 374 375
    if (secondary_index)
        next_position += secondary_index->size();

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
    auto check_inserted_position = [&next_position](UInt64 inserted_position)
    {
        if (inserted_position != next_position)
            throw Exception("Inserted position " + toString(inserted_position)
                            + " is not equal with expected " + toString(next_position), ErrorCodes::LOGICAL_ERROR);
    };

    auto insert_key = [&](const StringRef & ref, ReverseIndex<UInt64, ColumnType> * cur_index)
    {
        positions[num_added_rows] = next_position;
        cur_index->getColumn()->insertData(ref.data, ref.size);
        auto inserted_pos = cur_index->insertFromLastRow();
        check_inserted_position(inserted_pos);
        return update_position(next_position);
    };

392
    for (; num_added_rows < length; ++num_added_rows)
393
    {
394
        auto row = start + num_added_rows;
395

N
Nikolai Kochetov 已提交
396
        if (null_map && (*null_map)[row])
397
            positions[num_added_rows] = getNullValueIndex();
N
Nikolai Kochetov 已提交
398
        else if (column->compareAt(getDefaultValueIndex(), row, *src_column, 1) == 0)
399
            positions[num_added_rows] = getDefaultValueIndex();
400 401
        else
        {
402 403 404 405 406
            auto ref = src_column->getDataAt(row);
            auto cur_index = &index;
            bool inserted = false;

            while (!inserted)
407
            {
408
                auto insertion_point = cur_index->getInsertionPoint(ref);
409

410
                if (insertion_point == cur_index->lastInsertionPoint())
411
                {
412
                    if (secondary_index && cur_index != secondary_index && next_position >= max_dictionary_size)
413
                    {
414 415
                        cur_index = secondary_index;
                        continue;
416
                    }
417

418
                    if (auto res = insert_key(ref, cur_index))
419
                        return res;
420
                }
421 422 423 424
                else
                   positions[num_added_rows] = insertion_point;

                inserted = true;
425 426 427
            }
        }
    }
428

429
    // checkIndexes(*positions_column, column->size() + (overflowed_keys ? overflowed_keys->size() : 0));
430
    return std::move(positions_column);
431 432
}

433 434
template <typename ColumnType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
435
{
436
    auto callForType = [this, &src, start, length](auto x) -> MutableColumnPtr
437
    {
438 439
        size_t size = getRawColumnPtr()->size();

440 441 442
        using IndexType = decltype(x);
        if (size <= std::numeric_limits<IndexType>::max())
        {
443
            auto positions = ColumnVector<IndexType>::create(length);
444
            return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions), nullptr, 0);
445
        }
446

447 448 449 450 451 452 453 454 455 456 457 458 459 460
        return nullptr;
    };

    MutableColumnPtr positions_column;
    if (!positions_column)
        positions_column = callForType(UInt8());
    if (!positions_column)
        positions_column = callForType(UInt16());
    if (!positions_column)
        positions_column = callForType(UInt32());
    if (!positions_column)
        positions_column = callForType(UInt64());
    if (!positions_column)
        throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
461 462 463 464

    return positions_column;
}

465 466
template <typename ColumnType>
IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWithOverflow(
467 468 469 470 471 472 473 474 475 476
    const IColumn & src,
    size_t start,
    size_t length,
    size_t max_dictionary_size)
{
    auto overflowed_keys = column_holder->cloneEmpty();
    auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get());
    if (!overflowed_keys_ptr)
        throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR);

477
    auto callForType = [this, &src, start, length, overflowed_keys_ptr, max_dictionary_size](auto x) -> MutableColumnPtr
478
    {
479 480
        size_t size = getRawColumnPtr()->size();

481 482 483
        using IndexType = decltype(x);
        if (size <= std::numeric_limits<IndexType>::max())
        {
484
            auto positions = ColumnVector<IndexType>::create(length);
485 486
            ReverseIndex<UInt64, ColumnType> secondary_index(0, max_dictionary_size);
            secondary_index.setColumn(overflowed_keys_ptr);
487
            return this->uniqueInsertRangeImpl<IndexType>(src, start, length, 0, std::move(positions),
488
                                                          &secondary_index, max_dictionary_size);
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
        }

        return nullptr;
    };

    MutableColumnPtr positions_column;
    if (!positions_column)
        positions_column = callForType(UInt8());
    if (!positions_column)
        positions_column = callForType(UInt16());
    if (!positions_column)
        positions_column = callForType(UInt32());
    if (!positions_column)
        positions_column = callForType(UInt64());
    if (!positions_column)
        throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
505 506 507 508 509 510 511 512

    IColumnUnique::IndexesWithOverflow indexes_with_overflow;
    indexes_with_overflow.indexes = std::move(positions_column);
    indexes_with_overflow.overflowed_keys = std::move(overflowed_keys);
    return indexes_with_overflow;
}

};