ColumnUnique.h 18.5 KB
Newer Older
1
#pragma once
2 3
#include <Columns/IColumnUnique.h>
#include <Common/HashTable/HashMap.h>
4 5 6 7
#include <ext/range.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
8
#include <Columns/ColumnString.h>
N
Nikolai Kochetov 已提交
9
#include <DataTypes/DataTypeNullable.h>
10
#include <DataTypes/NumberTraits.h>
11

12
class NullMap;
13 14 15 16 17 18 19 20 21 22 23 24 25


template <typename ColumnType>
struct StringRefWrapper
{
    const ColumnType * column = nullptr;
    size_t row = 0;

    StringRef ref;

    StringRefWrapper(const ColumnType * column, size_t row) : column(column), row(row) {}
    StringRefWrapper(StringRef ref) : ref(ref) {}
    StringRefWrapper(const StringRefWrapper & other) = default;
26 27
    StringRefWrapper & operator =(int) { column = nullptr; ref.data = nullptr; return *this; }
    bool operator ==(int) const { return nullptr == column && nullptr == ref.data; }
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    StringRefWrapper() {}

    operator StringRef() const { return column ? column->getDataAt(row) : ref; }

    bool operator==(const StringRefWrapper<ColumnType> & other) const
    {
        return (column && column == other.column && row == other.row) || StringRef(*this) == other;
    }

};

namespace ZeroTraits
{
    template <typename ColumnType>
    bool check(const StringRefWrapper<ColumnType> x) { return nullptr == x.column; }

    template <typename ColumnType>
    void set(StringRefWrapper<ColumnType> & x) { x.column = nullptr; }
};


49 50 51
namespace DB
{

52 53
template <typename ColumnType>
class ColumnUnique final : public COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>>
54
{
55
    friend class COWPtrHelper<IColumnUnique, ColumnUnique<ColumnType>;
56 57

private:
58
    explicit ColumnUnique(MutableColumnPtr && holder, bool is_nullable);
59
    explicit ColumnUnique(const IDataType & type);
60
    ColumnUnique(const ColumnUnique & other) : column_holder(other.column_holder), is_nullable(other.is_nullable) {}
61

62
public:
63 64 65
    ColumnPtr getNestedColumn() const override;
    const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; }

66 67
    size_t uniqueInsert(const Field & x) override;
    size_t uniqueInsertFrom(const IColumn & src, size_t n) override;
68 69 70
    MutableColumnPtr uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length) override;
    IColumnUnique::IndexesWithOverflow uniqueInsertRangeWithOverflow(const IColumn & src, size_t start, size_t length,
                                                                     size_t max_dictionary_size) override;
71 72 73
    size_t uniqueInsertData(const char * pos, size_t length) override;
    size_t uniqueInsertDataWithTerminatingZero(const char * pos, size_t length) override;
    size_t uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos) override;
74
    IColumnUnique::SerializableState getSerializableState() const override;
75

76 77 78 79
    size_t getDefaultValueIndex() const override { return is_nullable ? 1 : 0; }
    size_t getNullValueIndex() const override;
    bool canContainNulls() const override { return is_nullable; }

80 81 82
    Field operator[](size_t n) const override { return (*getNestedColumn())[n]; }
    void get(size_t n, Field & res) const override { getNestedColumn()->get(n, res); }
    StringRef getDataAt(size_t n) const override { return getNestedColumn()->getDataAt(n); }
N
Nikolai Kochetov 已提交
83 84
    StringRef getDataAtWithTerminatingZero(size_t n) const override
    {
85
        return getNestedColumn()->getDataAtWithTerminatingZero(n);
N
Nikolai Kochetov 已提交
86
    }
87 88 89
    UInt64 get64(size_t n) const override { return getNestedColumn()->get64(n); }
    UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
    Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
90
    bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
N
Nikolai Kochetov 已提交
91 92 93 94 95 96
    StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override
    {
        return column_holder->serializeValueIntoArena(n, arena, begin);
    }
    void updateHashWithValue(size_t n, SipHash & hash) const override
    {
97
        return getNestedColumn()->updateHashWithValue(n, hash);
N
Nikolai Kochetov 已提交
98
    }
99

N
Nikolai Kochetov 已提交
100 101
    int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
    {
102 103
        auto & column_unique = static_cast<const IColumnUnique&>(rhs);
        return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint);
104
    }
105

106 107 108 109 110 111 112
    void getExtremes(Field & min, Field & max) const override { column_holder->getExtremes(min, max); }
    bool valuesHaveFixedSize() const override { return column_holder->valuesHaveFixedSize(); }
    bool isFixedAndContiguous() const override { return column_holder->isFixedAndContiguous(); }
    size_t sizeOfValueIfFixed() const override { return column_holder->sizeOfValueIfFixed(); }
    bool isNumeric() const override { return column_holder->isNumeric(); }

    size_t byteSize() const override { return column_holder->byteSize(); }
N
Nikolai Kochetov 已提交
113 114
    size_t allocatedBytes() const override
    {
115 116
        return column_holder->allocatedBytes()
               + (index ? index->getBufferSizeInBytes() : 0)
117
               + (cached_null_mask ? cached_null_mask->allocatedBytes() : 0);
118 119 120
    }
    void forEachSubcolumn(IColumn::ColumnCallback callback) override
    {
121
        callback(column_holder);
N
Nikolai Kochetov 已提交
122
    }
123 124 125

private:

126
    using IndexMapType = HashMap<StringRefWrapper<ColumnType>, UInt64, StringRefHash>;
127

128
    ColumnPtr column_holder;
N
Nikolai Kochetov 已提交
129

130 131
    /// For DataTypeNullable, stores null map.
    mutable ColumnPtr cached_null_mask;
N
Nikolai Kochetov 已提交
132

133
    /// Lazy initialized.
134 135 136
    std::unique_ptr<IndexMapType> index;

    bool is_nullable;
137

138 139 140
    size_t numSpecialValues() const { return is_nullable ? 2 : 1; }

    void buildIndex();
141
    ColumnType * getRawColumnPtr() { return static_cast<ColumnType *>(column_holder->assumeMutable().get()); }
142
    const ColumnType * getRawColumnPtr() const { return static_cast<const ColumnType *>(column_holder.get()); }
143
    UInt64 insertIntoMap(const StringRefWrapper<ColumnType> & ref, UInt64 value);
144

145 146
    template <typename IndexType>
    MutableColumnPtr uniqueInsertRangeImpl(
147 148 149
        const IColumn & src,
        size_t start,
        size_t length,
150
        typename ColumnVector<IndexType>::MutablePtr && positions_column,
151 152
        ColumnType * overflowed_keys,
        size_t max_dictionary_size);
153 154
};

155 156
template <typename ColumnType>
ColumnUnique<ColumnType>::ColumnUnique(const IDataType & type) : is_nullable(type.isNullable())
N
Nikolai Kochetov 已提交
157
{
158 159
    const auto & holder_type = is_nullable ? *static_cast<const DataTypeNullable &>(type).getNestedType() : type;
    column_holder = holder_type.createColumn()->cloneResized(numSpecialValues());
N
Nikolai Kochetov 已提交
160 161
}

162 163
template <typename ColumnType>
ColumnUnique<ColumnType>::ColumnUnique(MutableColumnPtr && holder, bool is_nullable)
164
    : column_holder(std::move(holder)), is_nullable(is_nullable)
165
{
166 167
    if (column_holder->size() < numSpecialValues())
        throw Exception("Too small holder column for ColumnUnique.", ErrorCodes::ILLEGAL_COLUMN);
168
    if (column_holder->isColumnNullable())
169
        throw Exception("Holder column for ColumnUnique can't be nullable.", ErrorCodes::ILLEGAL_COLUMN);
170 171
}

172 173
template <typename ColumnType>
ColumnPtr ColumnUnique<ColumnType>::getNestedColumn() const
N
Nikolai Kochetov 已提交
174 175 176
{
    if (is_nullable)
    {
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
        size_t size = getRawColumnPtr()->size();
        if (!cached_null_mask)
        {
            ColumnUInt8::MutablePtr null_mask = ColumnUInt8::create(size, UInt8(0));
            null_mask->getData()[getNullValueIndex()] = 1;
            cached_null_mask = std::move(null_mask);
        }

        if (cached_null_mask->size() != size)
        {
            MutableColumnPtr null_mask = (*std::move(cached_null_mask)).mutate();
            static_cast<ColumnUInt8 &>(*null_mask).getData().resize_fill(size);
            cached_null_mask = std::move(null_mask);
        }

        return ColumnNullable::create(column_holder, cached_null_mask);
N
Nikolai Kochetov 已提交
193 194 195 196
    }
    return column_holder;
}

197 198
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::getNullValueIndex() const
199 200
{
    if (!is_nullable)
201
        throw Exception("ColumnUnique can't contain null values.", ErrorCodes::LOGICAL_ERROR);
202 203 204 205

    return 0;
}

206 207
template <typename ColumnType>
void ColumnUnique<ColumnType>::buildIndex()
208 209 210 211 212 213 214 215 216
{
    if (index)
        return;

    auto column = getRawColumnPtr();
    index = std::make_unique<IndexMapType>();

    for (auto row : ext::range(numSpecialValues(), column->size()))
    {
217
        (*index)[StringRefWrapper<ColumnType>(column, row)] = row;
218 219 220
    }
}

221 222
template <typename ColumnType>
IndexType ColumnUnique<ColumnType>::insertIntoMap(const StringRefWrapper<ColumnType> & ref, IndexType value)
223 224 225 226
{
    if (!index)
        buildIndex();

227 228
    using IteratorType = typename IndexMapType::iterator;
    IteratorType it;
229 230 231 232 233 234 235 236 237
    bool inserted;
    index->emplace(ref, it, inserted);

    if (inserted)
        it->second = value;

    return it->second;
}

238 239
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
240 241 242 243 244
{
    if (x.getType() == Field::Types::Null)
        return getNullValueIndex();

    auto column = getRawColumnPtr();
245
    auto prev_size = static_cast<IndexType>(column->size());
246 247 248 249 250

    if ((*column)[getDefaultValueIndex()] == x)
        return getDefaultValueIndex();

    column->insert(x);
251
    auto pos = insertIntoMap(StringRefWrapper<ColumnType>(column, prev_size), prev_size);
252 253 254
    if (pos != prev_size)
        column->popBack(1);

255
    return pos;
256 257
}

258 259
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertFrom(const IColumn & src, size_t n)
260
{
261 262 263
    if (is_nullable && src.isNullAt(n))
        return getNullValueIndex();

264 265 266 267
    auto ref = src.getDataAt(n);
    return uniqueInsertData(ref.data, ref.size);
}

268 269
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertData(const char * pos, size_t length)
270
{
271 272 273
    if (!index)
        buildIndex();

274 275
    auto column = getRawColumnPtr();

276
    if (column->getDataAt(getDefaultValueIndex()) == StringRef(pos, length))
277 278
        return getDefaultValueIndex();

279
    UInt64 size = column->size();
280
    auto iter = index->find(StringRefWrapper<ColumnType>(StringRef(pos, length)));
281

282
    if (iter == index->end())
283
    {
284
        column->insertData(pos, length);
285
        return insertIntoMap(StringRefWrapper<ColumnType>(column, size), size);
286 287
    }

288
    return iter->second;
289 290
}

291 292
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueInsertDataWithTerminatingZero(const char * pos, size_t length)
293 294
{
    if (std::is_same<ColumnType, ColumnString>::value)
295
        return uniqueInsertData(pos, length - 1);
296 297

    if (column_holder->valuesHaveFixedSize())
298
        return uniqueInsertData(pos, length);
299 300 301 302 303

    /// Don't know if data actually has terminating zero. So, insert it firstly.

    auto column = getRawColumnPtr();
    size_t prev_size = column->size();
304
    column->insertDataWithTerminatingZero(pos, length);
305 306 307 308 309 310 311

    if (column->compareAt(getDefaultValueIndex(), prev_size, *column, 1) == 0)
    {
        column->popBack(1);
        return getDefaultValueIndex();
    }

312
    auto position = insertIntoMap(StringRefWrapper<ColumnType>(column, prev_size), prev_size);
313
    if (position != prev_size)
314 315
        column->popBack(1);

316
    return static_cast<size_t>(position);
317 318
}

319 320
template <typename ColumnType>
size_t ColumnUnique<ColumnType>::uniqueDeserializeAndInsertFromArena(const char * pos, const char *& new_pos)
321 322 323 324 325 326 327 328 329 330 331
{
    auto column = getRawColumnPtr();
    size_t prev_size = column->size();
    new_pos = column->deserializeAndInsertFromArena(pos);

    if (column->compareAt(getDefaultValueIndex(), prev_size, *column, 1) == 0)
    {
        column->popBack(1);
        return getDefaultValueIndex();
    }

332
    auto index_pos = insertIntoMap(StringRefWrapper<ColumnType>(column, prev_size), prev_size);
333 334 335 336 337 338
    if (index_pos != prev_size)
        column->popBack(1);

    return static_cast<size_t>(index_pos);
}

339 340 341
template <typename ColumnType>
template <typename IndexType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeImpl(
342 343 344
    const IColumn & src,
    size_t start,
    size_t length,
345
    typename ColumnVector<IndexType>::MutablePtr && positions_column,
346 347
    ColumnType * overflowed_keys,
    size_t max_dictionary_size)
348 349 350 351 352 353
{
    if (!index)
        buildIndex();

    const ColumnType * src_column;
    const NullMap * null_map = nullptr;
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
    auto & positions = positions_column->getData();

    using SuperiorIndexType = NumberTraits::Construct<false, false, NumberTraits::nextSize(sizeof(IndexType))>::Type;
    auto updatePosition = [&](UInt64 & next_position, UInt64 num_added_rows) -> MutableColumnPtr
    {
        ++next_position;

        if (next_position > std::numeric_limits<IndexType>::max())
        {
            if (sizeof(SuperiorIndexType) == sizeof(IndexType))
                throw Exception("Can't find superior index type for type " + demangle(typeid(IndexType).name()),
                                ErrorCodes::LOGICAL_ERROR);

            auto expanded_column = ColumnVector<IndexType>::create(length);
            auto & expanded_data = expanded_column->getData();
            for (size_t i = 0; i < num_added_rows; ++i)
                expanded_data[i] = positions[i];

            return uniqueInsertRangeImpl<SuperiorIndexType>(
                    src,
                    start + num_added_rows,
                    length - num_added_rows,
                    std::move(expanded_column),
                    overflowed_keys,
                    max_dictionary_size);
        }

        return nullptr;
    };
383

384
    if (src.isColumnNullable())
385 386 387 388 389 390 391 392
    {
        auto nullable_column = static_cast<const ColumnNullable *>(&src);
        src_column = static_cast<const ColumnType *>(&nullable_column->getNestedColumn());
        null_map = &nullable_column->getNullMapData();
    }
    else
        src_column = static_cast<const ColumnType *>(&src);

393 394 395 396
    std::unique_ptr<IndexMapType> secondary_index;
    if (overflowed_keys)
        secondary_index = std::make_unique<IndexMapType>();

397 398
    auto column = getRawColumnPtr();

399
    UInt64 next_position = column->size();
400 401 402 403
    for (auto i : ext::range(0, length))
    {
        auto row = start + i;

N
Nikolai Kochetov 已提交
404
        if (null_map && (*null_map)[row])
405
            positions[i] = getNullValueIndex();
N
Nikolai Kochetov 已提交
406 407
        else if (column->compareAt(getDefaultValueIndex(), row, *src_column, 1) == 0)
            positions[i] = getDefaultValueIndex();
408 409
        else
        {
410
            auto it = index->find(StringRefWrapper<ColumnType>(src_column, row));
411 412
            if (it == index->end())
            {
413 414 415 416 417 418 419 420 421 422

                if (overflowed_keys && next_position >= max_dictionary_size + numSpecialValues())
                {
                    auto jt = secondary_index->find(StringRefWrapper<ColumnType>(src_column, row));
                    if (jt == secondary_index->end())
                    {
                        positions[i] = next_position;
                        auto ref = src_column->getDataAt(row);
                        overflowed_keys->insertData(ref.data, ref.size);
                        (*secondary_index)[StringRefWrapper<ColumnType>(src_column, row)] = next_position;
423 424 425

                        if (auto res = updatePosition(next_position, i))
                            return res;
426 427 428 429 430 431 432 433 434 435
                    }
                    else
                        positions[i] = jt->second;
                }
                else
                {
                    positions[i] = next_position;
                    auto ref = src_column->getDataAt(row);
                    column->insertData(ref.data, ref.size);
                    (*index)[StringRefWrapper<ColumnType>(column, next_position)] = next_position;
436 437 438

                    if (auto res = updatePosition(next_position, i))
                        return res;
439
                }
440 441 442 443 444
            }
            else
                positions[i] = it->second;
        }
    }
445 446

    return positions_column;
447 448
}

449 450
template <typename ColumnType>
MutableColumnPtr ColumnUnique<ColumnType>::uniqueInsertRangeFrom(const IColumn & src, size_t start, size_t length)
451
{
452 453 454 455 456 457 458 459 460 461 462 463
    size_t size = getRawColumnPtr()->size();

    auto callForType = [&](auto x)
    {
        using IndexType = decltype(x);
        if (size <= std::numeric_limits<IndexType>::max())
        {
            auto positions_column = ColumnVector<IndexType>::create(length);
            auto & positions = positions_column->getData();

            return uniqueInsertRangeImpl(src, start, length, positions, nullptr, 0);
        }
464

465 466 467 468 469 470 471 472 473 474 475 476 477 478
        return nullptr;
    };

    MutableColumnPtr positions_column;
    if (!positions_column)
        positions_column = callForType(UInt8());
    if (!positions_column)
        positions_column = callForType(UInt16());
    if (!positions_column)
        positions_column = callForType(UInt32());
    if (!positions_column)
        positions_column = callForType(UInt64());
    if (!positions_column)
        throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
479 480 481 482

    return positions_column;
}

483 484
template <typename ColumnType>
IColumnUnique::IndexesWithOverflow ColumnUnique<ColumnType>::uniqueInsertRangeWithOverflow(
485 486 487 488 489 490
    const IColumn & src,
    size_t start,
    size_t length,
    size_t max_dictionary_size)
{

491
    size_t size = getRawColumnPtr()->size();
492 493 494 495 496
    auto overflowed_keys = column_holder->cloneEmpty();
    auto overflowed_keys_ptr = typeid_cast<ColumnType *>(overflowed_keys.get());
    if (!overflowed_keys_ptr)
        throw Exception("Invalid keys type for ColumnUnique.", ErrorCodes::LOGICAL_ERROR);

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
    auto callForType = [&](auto x)
    {
        using IndexType = decltype(x);
        if (size <= std::numeric_limits<IndexType>::max())
        {
            auto positions_column = ColumnVector<IndexType>::create(length);
            auto & positions = positions_column->getData();

            return uniqueInsertRangeImpl(src, start, length, positions, overflowed_keys_ptr, max_dictionary_size);
        }

        return nullptr;
    };

    MutableColumnPtr positions_column;
    if (!positions_column)
        positions_column = callForType(UInt8());
    if (!positions_column)
        positions_column = callForType(UInt16());
    if (!positions_column)
        positions_column = callForType(UInt32());
    if (!positions_column)
        positions_column = callForType(UInt64());
    if (!positions_column)
        throw Exception("Can't find index type for ColumnUnique", ErrorCodes::LOGICAL_ERROR);
522 523 524 525 526 527 528

    IColumnUnique::IndexesWithOverflow indexes_with_overflow;
    indexes_with_overflow.indexes = std::move(positions_column);
    indexes_with_overflow.overflowed_keys = std::move(overflowed_keys);
    return indexes_with_overflow;
}

529 530
template <typename ColumnType>
IColumnUnique::SerializableState ColumnUnique<ColumnType>::getSerializableState() const
531 532 533 534 535 536 537
{
    IColumnUnique::SerializableState state;
    state.column = column_holder;
    state.offset = numSpecialValues();
    state.limit = column_holder->size() - state.offset;

    return state;
538
}
539 540

};