CacheDictionary.cpp 34.9 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include <Common/setThreadName.h>
16
#include "CacheDictionary.inc.h"
P
proller 已提交
17 18
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
19

20 21
namespace ProfileEvents
{
P
proller 已提交
22 23 24 25 26 27 28 29 30 31
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
32 33 34 35
}

namespace CurrentMetrics
{
P
proller 已提交
36
extern const Metric DictCacheRequests;
37 38 39
}


40 41 42 43
namespace DB
{
namespace ErrorCodes
{
44 45 46
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
47
    extern const int LOGICAL_ERROR;
48
    extern const int TOO_SMALL_BUFFER_SIZE;
49 50 51
}


P
proller 已提交
52
inline size_t CacheDictionary::getCellIdx(const Key id) const
53
{
54 55 56
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
57 58 59
}


P
proller 已提交
60
CacheDictionary::CacheDictionary(
K
kreuzerkrieg 已提交
61 62 63 64
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
    const DictionaryLifetime dict_lifetime_,
65 66 67
    const size_t size_,
    const bool allow_read_expired_keys_,
    const size_t max_update_queue_size_,
68 69
    const size_t update_queue_push_timeout_milliseconds_,
    const size_t each_update_finish_timeout_seconds_)
K
kreuzerkrieg 已提交
70 71 72 73
    : name{name_}
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
74 75 76
    , allow_read_expired_keys(allow_read_expired_keys_)
    , max_update_queue_size(max_update_queue_size_)
    , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
77
    , each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
78
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
79
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
80 81 82
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
83
    , update_queue(max_update_queue_size_)
84
{
85
    if (!this->source_ptr->supportsSelectiveLoad())
86
        throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
87

88
    createAttributes();
N
mvc  
Nikita Mikhaylov 已提交
89 90 91 92 93 94 95 96
    update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); });
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
    update_thread.join();
97 98 99
}


A
Alexey Milovidov 已提交
100
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
101
{
102
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
103

104
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
105 106 107
}


108
/// Allow to use single value in same way as array.
P
proller 已提交
109 110 111 112 113 114 115 116
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
117 118 119


template <typename AncestorType>
P
proller 已提交
120
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
121
{
122 123
    /// Transform all children to parents until ancestor id or null_value will be reached.

124
    size_t out_size = out.size();
P
proller 已提交
125
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
126 127 128

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

129
    PaddedPODArray<Key> children(out_size, 0);
130 131 132 133 134 135 136 137
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

138
        while (out_idx < out_size)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
157
            /// Loop detected
158 159
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
160
                out[out_idx] = 1;
161
            }
A
alexey-milovidov 已提交
162
            /// Found intermediate parent, add this value to search at next loop iteration
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
182 183 184
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
185
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
186
{
187
    isInImpl(child_ids, ancestor_ids, out);
188
}
189

P
proller 已提交
190
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
191
{
192
    isInImpl(child_ids, ancestor_id, out);
193
}
194

P
proller 已提交
195
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
196
{
197
    /// Special case with single child value.
198

199
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
200

201 202 203
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
204

205 206 207 208
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
209

210 211
        if (parent[0] == null_value)
            break;
212

213 214 215
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
216

217
    /// Assuming short hierarchy, so linear search is Ok.
218
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
219
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
220
}
221

A
Alexey Milovidov 已提交
222
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
223
{
224
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
225
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
226

227
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
228

P
proller 已提交
229
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
230 231 232
}

void CacheDictionary::getString(
P
proller 已提交
233
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
234
{
235
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
236
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
237

P
proller 已提交
238
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
239 240 241
}

void CacheDictionary::getString(
P
proller 已提交
242
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
243
{
244
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
245
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
246

P
proller 已提交
247
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
248 249 250
}


251
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
252 253 254 255 256 257 258 259
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
289 290
}

A
Alexey Milovidov 已提交
291
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
292
{
N
mvc  
Nikita Mikhaylov 已提交
293 294 295 296 297
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

298
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
299 300
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
301

302
    size_t cache_hit = 0;
N
mvc  
Nikita Mikhaylov 已提交
303

304 305 306 307 308 309 310 311 312 313 314
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
315 316 317 318 319 320 321

            auto insert_to_answer_routine = [&] ()
            {
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            };

322 323 324
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
325 326 327
                {
                    cache_expired_ids[id].push_back(row);

328 329
                    if (allow_read_expired_keys)
                        insert_to_answer_routine();
N
mvc  
Nikita Mikhaylov 已提交
330
                }
331
                else
N
mvc  
Nikita Mikhaylov 已提交
332 333 334
                {
                    cache_not_found_ids[id].push_back(row);
                }
335 336 337 338
            }
            else
            {
                ++cache_hit;
339
                insert_to_answer_routine();
340 341 342 343
            }
        }
    }

344 345
    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
346 347 348
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
349
    hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
350

351
    if (cache_not_found_ids.empty())
N
mvc  
Nikita Mikhaylov 已提交
352
    {
353 354 355 356 357 358 359 360 361 362 363 364 365
        /// Nothing to update - return;
        if (cache_expired_ids.empty())
            return;

        if (allow_read_expired_keys)
        {
            std::vector<Key> required_expired_ids;
            required_expired_ids.reserve(cache_expired_ids.size());
            std::transform(
                    std::begin(cache_expired_ids), std::end(cache_expired_ids),
                    std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });

            /// Callbacks are empty because we don't want to receive them after an unknown period of time.
N
Nikita Mikhaylov 已提交
366
            auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
367 368 369 370 371

            tryPushToUpdateQueueOrThrow(update_unit_ptr);
            /// Update is async - no need to wait.
            return;
        }
N
mvc  
Nikita Mikhaylov 已提交
372
    }
373

374 375
    /// At this point we have two situations.
    /// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
N
mvc  
Nikita Mikhaylov 已提交
376 377
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
378
    std::vector<Key> required_ids;
379 380 381 382 383 384 385
    required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
    std::transform(
            std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(
            std::begin(cache_expired_ids), std::end(cache_expired_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
386

N
Nikita Mikhaylov 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    auto on_cell_updated = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = true;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto on_id_not_found = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = false;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids);
N
mvc  
Nikita Mikhaylov 已提交
404

405 406
    tryPushToUpdateQueueOrThrow(update_unit_ptr);
    waitForCurrentUpdateFinish(update_unit_ptr);
N
Nikita Mikhaylov 已提交
407
    prepareAnswer(update_unit_ptr, on_cell_updated, on_id_not_found);
408 409 410 411 412
}


void CacheDictionary::createAttributes()
{
413 414
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
415 416

    bytes_allocated += size * sizeof(CellMetadata);
417
    bytes_allocated += attributes_size * sizeof(attributes.front());
418 419 420 421 422 423 424 425

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
426
            hierarchical_attribute = &attributes.back();
427

K
kreuzerkrieg 已提交
428
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
429
                throw Exception{name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
430 431
        }
    }
432 433
}

A
Alexey Milovidov 已提交
434
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
435
{
A
Alexey Milovidov 已提交
436
    Attribute attr{type, {}, {}};
437 438 439

    switch (type)
    {
P
proller 已提交
440
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
441
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
442
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
443 444
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
445
        break;
A
Amos Bird 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
461
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
462 463
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
464 465 466 467 468 469 470
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
471 472
}

A
Alexey Milovidov 已提交
473
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
474
{
475 476
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
477
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
478 479
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
480
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
481 482
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
483
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
484 485
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
486
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
487 488
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
489
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
490 491
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
492
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
493 494
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
495
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
496 497
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
498
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
499 500
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
501
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
502 503
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
504
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
505 506
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
507
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
508 509
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
510

K
kreuzerkrieg 已提交
511
        case AttributeUnderlyingType::utDecimal32:
512 513
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
514
        case AttributeUnderlyingType::utDecimal64:
515 516
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
517
        case AttributeUnderlyingType::utDecimal128:
518 519 520
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
521
        case AttributeUnderlyingType::utString:
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
537 538
}

A
Alexey Milovidov 已提交
539
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
540
{
541 542
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
543
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
544 545
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
546
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
547 548
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
549
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
550 551
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
552
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
553 554
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
555
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
556 557
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
558
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
559 560
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
561
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
562 563
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
564
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
565 566
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
567
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
568 569
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
570
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
571 572
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
573
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
574 575 576
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
577
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
578 579
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
580
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
581 582
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
583
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
584 585
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
586

K
kreuzerkrieg 已提交
587
        case AttributeUnderlyingType::utString:
588 589 590 591 592 593 594 595 596
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

597 598
            const auto str_size = string.size();
            if (str_size != 0)
599
            {
600 601 602
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
603 604 605 606 607 608 609
            }
            else
                string_ref = {};

            break;
        }
    }
610 611
}

A
Alexey Milovidov 已提交
612
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
613
{
614 615
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
616
        throw Exception{name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
617 618

    return attributes[it->second];
619 620
}

621 622
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
623 624
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
625 626 627 628
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
629 630
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

631 632 633
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
634
        auto & cell = cells[idx];
635
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
636 637 638 639 640 641 642
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

643
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
644
{
645
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
646
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
647 648
}

649 650 651 652 653 654
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

655 656
void registerDictionaryCache(DictionaryFactory & factory)
{
P
proller 已提交
657 658 659 660
    auto create_layout = [=](const std::string & name,
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
661 662
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
663
        if (dict_struct.key)
664 665
            throw Exception{"'key' is not supported for dictionary of layout 'cache'",
                            ErrorCodes::UNSUPPORTED_METHOD};
666 667

        if (dict_struct.range_min || dict_struct.range_max)
P
proller 已提交
668 669 670 671
            throw Exception{name
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
672
        const auto & layout_prefix = config_prefix + ".layout";
673

N
Nikita Mikhaylov 已提交
674
        const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
675
        if (size == 0)
676 677
            throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};
678 679 680

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
P
proller 已提交
681 682
            throw Exception{name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
                            ErrorCodes::BAD_ARGUMENTS};
683

P
proller 已提交
684
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
685

N
Nikita Mikhaylov 已提交
686
        const size_t max_update_queue_size =
687 688 689 690 691 692 693 694
                config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
        if (max_update_queue_size == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool allow_read_expired_keys =
                config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);

N
Nikita Mikhaylov 已提交
695
        const size_t update_queue_push_timeout_milliseconds =
696 697 698 699 700
                config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
        if (update_queue_push_timeout_milliseconds < 10)
            throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
                            ErrorCodes::BAD_ARGUMENTS};

701 702 703 704 705 706
        const size_t each_update_finish_timeout_seconds =
                config.getUInt64(layout_prefix + ".each_update_finish_timeout_seconds", 600);
        if (each_update_finish_timeout_seconds == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
                            ErrorCodes::BAD_ARGUMENTS};

707 708
        return std::make_unique<CacheDictionary>(
                name, dict_struct, std::move(source_ptr), dict_lifetime, size,
709
                allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
710
    };
711
    factory.registerLayout("cache", create_layout, false);
712 713
}

N
mvc  
Nikita Mikhaylov 已提交
714 715
void CacheDictionary::updateThreadFunction()
{
716
    setThreadName("AsyncUpdater");
N
Nikita Mikhaylov 已提交
717
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
718
    {
N
Nikita Mikhaylov 已提交
719 720 721
        UpdateUnitPtr first_popped;
        update_queue.pop(first_popped);

722 723 724 725
        /// Here we pop as many unit pointers from update queue as we can.
        /// We fix current size to avoid livelock (or too long waiting),
        /// when this thread pops from the queue and other threads push to the queue.
        const size_t current_queue_size = update_queue.size();
N
Nikita Mikhaylov 已提交
726 727 728 729 730 731 732 733 734

        /// Word "bunch" must present in this log message, because it is being checked in tests.
        if (current_queue_size > 0)
            LOG_DEBUG(log, "Performing a bunch of keys update in cache dictionary.");

        /// We use deque since there is first_popped pointer.
        /// And we have to add to the update_request without breaking order.
        std::deque<UpdateUnitPtr> update_request(current_queue_size);

735 736 737
        for (auto & unit_ptr: update_request)
            update_queue.pop(unit_ptr);

N
Nikita Mikhaylov 已提交
738 739
        update_request.push_front(first_popped);

740 741
        /// Here we prepare total count of all requested ids
        /// not to do useless allocations later.
N
Nikita Mikhaylov 已提交
742
        size_t total_requested_keys_count = 0;
743
        for (auto & unit_ptr: update_request)
N
Nikita Mikhaylov 已提交
744
            total_requested_keys_count += unit_ptr->requested_ids.size();
745 746

        std::vector<Key> concatenated_requested_ids;
N
Nikita Mikhaylov 已提交
747
        concatenated_requested_ids.reserve(total_requested_keys_count);
748 749 750 751
        for (auto & unit_ptr: update_request)
            std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
                    [&] (const Key & key) {concatenated_requested_ids.push_back(key);});

N
Nikita Mikhaylov 已提交
752
        try
N
mvc  
Nikita Mikhaylov 已提交
753
        {
754
            auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(concatenated_requested_ids.size());
N
Nikita Mikhaylov 已提交
755

756 757 758 759
            /// Copy shared_ptr to let this map be alive until other thread finish his stuff.
            /// It is thread safe because writing to the map happens before reading from multiple threads.
            for (auto & unit_ptr: update_request)
                unit_ptr->found_ids_mask_ptr = found_ids_mask_ptr;
N
Nikita Mikhaylov 已提交
760

761
            for (const auto id : concatenated_requested_ids)
N
Nikita Mikhaylov 已提交
762 763
                found_ids_mask_ptr->insert({id, 0});

764 765 766 767 768
            /// Update a bunch of ids.
            update(concatenated_requested_ids, *found_ids_mask_ptr);

            /// Notify all threads about finished updating the bunch of ids
            /// where their own ids were included.
769
            std::unique_lock<std::mutex> lock(update_mutex);
N
Nikita Mikhaylov 已提交
770

771 772
            for (auto & unit_ptr: update_request)
                unit_ptr->is_done = true;
N
Nikita Mikhaylov 已提交
773

774
            is_update_finished.notify_all();
N
mvc  
Nikita Mikhaylov 已提交
775
        }
N
Nikita Mikhaylov 已提交
776 777
        catch (...)
        {
778
            std::unique_lock<std::mutex> lock(update_mutex);
779 780 781 782 783
            /// It is a big trouble, because one bad query can make other threads fail with not relative exception.
            /// So at this point all threads (and queries) will receive the same exception.
            for (auto & unit_ptr: update_request)
                unit_ptr->current_exception = std::current_exception();

784
            is_update_finished.notify_all();
N
Nikita Mikhaylov 已提交
785
        }
N
mvc  
Nikita Mikhaylov 已提交
786 787 788
    }
}

789 790 791
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
    std::unique_lock<std::mutex> lock(update_mutex);
792 793 794 795 796 797 798
    const auto sleeping_result = is_update_finished.wait_for(
            lock,
            std::chrono::minutes(each_update_finish_timeout_seconds),
            [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });

    if (!sleeping_result)
        throw DB::Exception("Keys updating timed out", ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
799 800 801 802 803 804

    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
}

void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
N
mvc  
Nikita Mikhaylov 已提交
805
{
806 807 808
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
        throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
        std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
N
mvc  
Nikita Mikhaylov 已提交
809 810
}

N
Nikita Mikhaylov 已提交
811 812 813 814 815 816 817 818

void CacheDictionary::update(const std::vector<Key> & requested_ids, std::unordered_map<Key, UInt8> & remaining_ids) const
{
    CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());

    const auto now = std::chrono::system_clock::now();

N
Nikita Mikhaylov 已提交
819 820
    size_t found_num = 0;

N
Nikita Mikhaylov 已提交
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
    if (now > backoff_end_time)
    {
        try
        {
            if (error_count)
            {
                /// Recover after error: we have to clone the source here because
                /// it could keep connections which should be reset after error.
                source_ptr = source_ptr->clone();
            }

            Stopwatch watch;
            /// Go to external storage. Might be very slow and blocking.
            auto stream = source_ptr->loadIds(requested_ids);

            const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};

            stream->readPrefix();

            while (const auto block = stream->read())
            {
                const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
                if (!id_column)
                    throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};

                const auto & ids = id_column->getData();

                /// cache column pointers
                const auto column_ptrs = ext::map<std::vector>(
                        ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });

                for (const auto i : ext::range(0, ids.size()))
                {
                    const auto id = ids[i];

                    const auto find_result = findCellIdx(id, now);
                    const auto & cell_idx = find_result.cell_idx;

                    auto & cell = cells[cell_idx];

                    for (const auto attribute_idx : ext::range(0, attributes.size()))
                    {
                        const auto & attribute_column = *column_ptrs[attribute_idx];
                        auto & attribute = attributes[attribute_idx];

                        setAttributeValue(attribute, cell_idx, attribute_column[i]);
                    }

                    /// if cell id is zero and zero does not map to this cell, then the cell is unused
                    if (cell.id == 0 && cell_idx != zero_cell_idx)
                        element_count.fetch_add(1, std::memory_order_relaxed);

                    cell.id = id;
                    if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
                    {
                        std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
                        cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
                    }
                    else
                        cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

                    /// mark corresponding id as found
                    remaining_ids[id] = 1;
N
Nikita Mikhaylov 已提交
884
                    ++found_num;
N
Nikita Mikhaylov 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
                }
            }

            stream->readSuffix();

            error_count = 0;
            last_exception = std::exception_ptr{};
            backoff_end_time = std::chrono::system_clock::time_point{};

            ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
        }
        catch (...)
        {
            ++error_count;
            last_exception = std::current_exception();
            backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));

            tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
                                                 "', next update is scheduled at " + ext::to_string(backoff_end_time));
        }
    }


N
Nikita Mikhaylov 已提交
908
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, remaining_ids.size() - found_num);
N
Nikita Mikhaylov 已提交
909 910 911 912
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
    ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}

913
}