CacheDictionary.cpp 37.0 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include <Common/setThreadName.h>
16
#include "CacheDictionary.inc.h"
P
proller 已提交
17 18
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
19

20 21
namespace ProfileEvents
{
P
proller 已提交
22 23 24 25 26 27 28 29 30 31
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
32 33 34 35
}

namespace CurrentMetrics
{
P
proller 已提交
36
extern const Metric DictCacheRequests;
37 38 39
}


40 41 42 43
namespace DB
{
namespace ErrorCodes
{
44 45 46
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
47
    extern const int LOGICAL_ERROR;
48
    extern const int TOO_SMALL_BUFFER_SIZE;
49 50 51
}


P
proller 已提交
52
inline size_t CacheDictionary::getCellIdx(const Key id) const
53
{
54 55 56
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
57 58 59
}


P
proller 已提交
60
CacheDictionary::CacheDictionary(
61
    const std::string & database_,
K
kreuzerkrieg 已提交
62 63 64
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
N
cleanup  
Nikita Mikhaylov 已提交
65 66 67 68 69 70
    DictionaryLifetime dict_lifetime_,
    size_t size_,
    bool allow_read_expired_keys_,
    size_t max_update_queue_size_,
    size_t update_queue_push_timeout_milliseconds_,
    size_t max_threads_for_updates_)
71 72 73
    : database(database_)
    , name(name_)
    , full_name{database_.empty() ? name_ : (database_ + "." + name_)}
K
kreuzerkrieg 已提交
74 75 76
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
77 78 79
    , allow_read_expired_keys(allow_read_expired_keys_)
    , max_update_queue_size(max_update_queue_size_)
    , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
N
Nikita Mikhaylov 已提交
80
    , max_threads_for_updates(max_threads_for_updates_)
81
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
82
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
83 84 85
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
86
    , update_queue(max_update_queue_size_)
N
Nikita Mikhaylov 已提交
87
    , update_pool(max_threads_for_updates)
88
{
89
    if (!this->source_ptr->supportsSelectiveLoad())
90
        throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
91

92
    createAttributes();
N
Nikita Mikhaylov 已提交
93
    for (size_t i = 0; i < max_threads_for_updates; ++i)
N
better  
Nikita Mikhaylov 已提交
94
        update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
N
mvc  
Nikita Mikhaylov 已提交
95 96 97 98 99 100
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
N
Nikita Mikhaylov 已提交
101 102
    for (size_t i = 0; i < max_threads_for_updates; ++i)
    {
N
Nikita Mikhaylov 已提交
103 104 105
        auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
        update_queue.push(empty_finishing_ptr);
    }
N
better  
Nikita Mikhaylov 已提交
106
    update_pool.wait();
107 108 109
}


A
Alexey Milovidov 已提交
110
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
111
{
112
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
113

114
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
115 116 117
}


118
/// Allow to use single value in same way as array.
P
proller 已提交
119 120 121 122 123 124 125 126
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
127 128 129


template <typename AncestorType>
P
proller 已提交
130
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
131
{
132 133
    /// Transform all children to parents until ancestor id or null_value will be reached.

134
    size_t out_size = out.size();
P
proller 已提交
135
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
136 137 138

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

139
    PaddedPODArray<Key> children(out_size, 0);
140 141 142 143 144 145 146 147
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

148
        while (out_idx < out_size)
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
167
            /// Loop detected
168 169
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
170
                out[out_idx] = 1;
171
            }
A
alexey-milovidov 已提交
172
            /// Found intermediate parent, add this value to search at next loop iteration
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
192 193 194
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
195
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
196
{
197
    isInImpl(child_ids, ancestor_ids, out);
198
}
199

P
proller 已提交
200
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
201
{
202
    isInImpl(child_ids, ancestor_id, out);
203
}
204

P
proller 已提交
205
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
206
{
207
    /// Special case with single child value.
208

209
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
210

211 212 213
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
214

215 216 217 218
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
219

220 221
        if (parent[0] == null_value)
            break;
222

223 224 225
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
226

227
    /// Assuming short hierarchy, so linear search is Ok.
228
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
229
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
230
}
231

A
Alexey Milovidov 已提交
232
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
233
{
234
    auto & attribute = getAttribute(attribute_name);
235
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
236

237
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
238

P
proller 已提交
239
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
240 241 242
}

void CacheDictionary::getString(
P
proller 已提交
243
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
244
{
245
    auto & attribute = getAttribute(attribute_name);
246
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
247

P
proller 已提交
248
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
249 250 251
}

void CacheDictionary::getString(
P
proller 已提交
252
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
253
{
254
    auto & attribute = getAttribute(attribute_name);
255
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
256

P
proller 已提交
257
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
258 259 260
}


261
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
262 263 264 265 266 267 268 269
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
299 300
}

A
Alexey Milovidov 已提交
301
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
302
{
N
mvc  
Nikita Mikhaylov 已提交
303 304 305 306 307
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

308
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
309 310
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
311

312
    size_t cache_hit = 0;
N
mvc  
Nikita Mikhaylov 已提交
313

314 315 316 317 318 319 320 321 322 323 324
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
325 326 327

            auto insert_to_answer_routine = [&] ()
            {
N
Nikita Mikhaylov 已提交
328
                out[row] = !cells[cell_idx].isDefault();
329 330
            };

331 332 333
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
334 335 336
                {
                    cache_expired_ids[id].push_back(row);

337 338
                    if (allow_read_expired_keys)
                        insert_to_answer_routine();
N
mvc  
Nikita Mikhaylov 已提交
339
                }
340
                else
N
mvc  
Nikita Mikhaylov 已提交
341 342 343
                {
                    cache_not_found_ids[id].push_back(row);
                }
344 345 346 347
            }
            else
            {
                ++cache_hit;
348
                insert_to_answer_routine();
349 350 351 352
            }
        }
    }

353 354
    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
355 356 357
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
358
    hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
359

360
    if (cache_not_found_ids.empty())
N
mvc  
Nikita Mikhaylov 已提交
361
    {
362 363 364 365 366 367 368 369 370 371 372 373 374
        /// Nothing to update - return;
        if (cache_expired_ids.empty())
            return;

        if (allow_read_expired_keys)
        {
            std::vector<Key> required_expired_ids;
            required_expired_ids.reserve(cache_expired_ids.size());
            std::transform(
                    std::begin(cache_expired_ids), std::end(cache_expired_ids),
                    std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });

            /// Callbacks are empty because we don't want to receive them after an unknown period of time.
N
Nikita Mikhaylov 已提交
375
            auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
376 377 378 379 380

            tryPushToUpdateQueueOrThrow(update_unit_ptr);
            /// Update is async - no need to wait.
            return;
        }
N
mvc  
Nikita Mikhaylov 已提交
381
    }
382

383 384
    /// At this point we have two situations.
    /// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
N
mvc  
Nikita Mikhaylov 已提交
385 386
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
387
    std::vector<Key> required_ids;
388 389 390 391 392 393 394
    required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
    std::transform(
            std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(
            std::begin(cache_expired_ids), std::end(cache_expired_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
395

N
Nikita Mikhaylov 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
    auto on_cell_updated = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = true;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto on_id_not_found = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = false;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

N
Nikita Mikhaylov 已提交
412
    auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids, on_cell_updated, on_id_not_found);
N
mvc  
Nikita Mikhaylov 已提交
413

414 415
    tryPushToUpdateQueueOrThrow(update_unit_ptr);
    waitForCurrentUpdateFinish(update_unit_ptr);
416 417 418 419 420
}


void CacheDictionary::createAttributes()
{
421 422
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
423 424

    bytes_allocated += size * sizeof(CellMetadata);
425
    bytes_allocated += attributes_size * sizeof(attributes.front());
426 427 428 429 430 431 432 433

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
434
            hierarchical_attribute = &attributes.back();
435

K
kreuzerkrieg 已提交
436
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
437
                throw Exception{full_name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
438 439
        }
    }
440 441
}

A
Alexey Milovidov 已提交
442
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
443
{
A
Alexey Milovidov 已提交
444
    Attribute attr{type, {}, {}};
445 446 447

    switch (type)
    {
P
proller 已提交
448
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
449
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
450
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
451 452
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
453
        break;
A
Amos Bird 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
469
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
470 471
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
472 473 474 475 476 477 478
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
479 480
}

A
Alexey Milovidov 已提交
481
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
482
{
483 484
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
485
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
486 487
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
488
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
489 490
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
491
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
492 493
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
494
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
495 496
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
497
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
498 499
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
500
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
501 502
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
503
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
504 505
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
506
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
507 508
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
509
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
510 511
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
512
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
513 514
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
515
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
516 517
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
518

K
kreuzerkrieg 已提交
519
        case AttributeUnderlyingType::utDecimal32:
520 521
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
522
        case AttributeUnderlyingType::utDecimal64:
523 524
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
525
        case AttributeUnderlyingType::utDecimal128:
526 527 528
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
529
        case AttributeUnderlyingType::utString:
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
545 546
}

A
Alexey Milovidov 已提交
547
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
548
{
549 550
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
551
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
552 553
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
554
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
555 556
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
557
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
558 559
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
560
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
561 562
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
563
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
564 565
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
566
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
567 568
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
569
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
570 571
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
572
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
573 574
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
575
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
576 577
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
578
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
579 580
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
581
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
582 583 584
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
585
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
586 587
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
588
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
589 590
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
591
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
592 593
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
594

K
kreuzerkrieg 已提交
595
        case AttributeUnderlyingType::utString:
596 597 598 599 600 601 602 603 604
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

605 606
            const auto str_size = string.size();
            if (str_size != 0)
607
            {
608 609 610
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
611 612 613 614 615 616 617
            }
            else
                string_ref = {};

            break;
        }
    }
618 619
}

A
Alexey Milovidov 已提交
620
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
621
{
622 623
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
624
        throw Exception{full_name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
625 626

    return attributes[it->second];
627 628
}

629 630
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
631 632
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
633 634 635 636
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
637 638
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

639 640 641
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
642
        auto & cell = cells[idx];
643
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
644 645 646 647 648 649 650
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

651
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
652
{
653
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
654
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
655 656
}

657 658 659 660 661 662
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

663 664
void registerDictionaryCache(DictionaryFactory & factory)
{
665
    auto create_layout = [=](const std::string & full_name,
P
proller 已提交
666 667 668
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
669 670
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
671
        if (dict_struct.key)
672 673
            throw Exception{"'key' is not supported for dictionary of layout 'cache'",
                            ErrorCodes::UNSUPPORTED_METHOD};
674 675

        if (dict_struct.range_min || dict_struct.range_max)
676
            throw Exception{full_name
P
proller 已提交
677 678 679
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
680
        const auto & layout_prefix = config_prefix + ".layout";
681

N
Nikita Mikhaylov 已提交
682
        const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
683
        if (size == 0)
N
better  
Nikita Mikhaylov 已提交
684
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells",
685
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};
686 687 688

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
689
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
P
proller 已提交
690
                            ErrorCodes::BAD_ARGUMENTS};
691

692 693
        const String database = config.getString(config_prefix + ".database", "");
        const String name = config.getString(config_prefix + ".name");
P
proller 已提交
694
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
695

N
Nikita Mikhaylov 已提交
696
        const size_t max_update_queue_size =
697 698 699 700 701 702 703 704
                config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
        if (max_update_queue_size == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool allow_read_expired_keys =
                config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);

N
Nikita Mikhaylov 已提交
705
        const size_t update_queue_push_timeout_milliseconds =
706 707 708 709 710
                config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
        if (update_queue_push_timeout_milliseconds < 10)
            throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
                            ErrorCodes::BAD_ARGUMENTS};

N
Nikita Mikhaylov 已提交
711 712 713 714 715 716
        const size_t max_threads_for_updates =
                config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
        if (max_threads_for_updates == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have zero threads for updates.",
                            ErrorCodes::BAD_ARGUMENTS};

717
        return std::make_unique<CacheDictionary>(
718
                database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
N
Nikita Mikhaylov 已提交
719
                allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
720
                max_threads_for_updates);
721
    };
722
    factory.registerLayout("cache", create_layout, false);
723 724
}

N
mvc  
Nikita Mikhaylov 已提交
725 726
void CacheDictionary::updateThreadFunction()
{
727
    setThreadName("AsyncUpdater");
N
Nikita Mikhaylov 已提交
728
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
729
    {
N
Nikita Mikhaylov 已提交
730 731 732
        UpdateUnitPtr first_popped;
        update_queue.pop(first_popped);

N
Nikita Mikhaylov 已提交
733 734 735
        if (finished)
            break;

736 737 738 739
        /// Here we pop as many unit pointers from update queue as we can.
        /// We fix current size to avoid livelock (or too long waiting),
        /// when this thread pops from the queue and other threads push to the queue.
        const size_t current_queue_size = update_queue.size();
N
Nikita Mikhaylov 已提交
740 741

        if (current_queue_size > 0)
N
Nikita Mikhaylov 已提交
742
            LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with "
N
Nikita Mikhaylov 已提交
743
                            << current_queue_size + 1 << " keys");
N
Nikita Mikhaylov 已提交
744

N
better  
Nikita Mikhaylov 已提交
745
        std::vector<UpdateUnitPtr> update_request;
N
Nikita Mikhaylov 已提交
746 747
        update_request.reserve(current_queue_size + 1);
        update_request.emplace_back(first_popped);
N
Nikita Mikhaylov 已提交
748

N
Nikita Mikhaylov 已提交
749
        UpdateUnitPtr current_unit_ptr;
N
Nikita Mikhaylov 已提交
750

N
Nikita Mikhaylov 已提交
751
        while (update_request.size() && update_queue.tryPop(current_unit_ptr))
N
Nikita Mikhaylov 已提交
752
            update_request.emplace_back(std::move(current_unit_ptr));
N
Nikita Mikhaylov 已提交
753

N
Nikita Mikhaylov 已提交
754
        BunchUpdateUnit bunch_update_unit(update_request);
755

N
Nikita Mikhaylov 已提交
756
        try
N
mvc  
Nikita Mikhaylov 已提交
757
        {
758
            /// Update a bunch of ids.
N
Nikita Mikhaylov 已提交
759
            update(bunch_update_unit);
760 761 762

            /// Notify all threads about finished updating the bunch of ids
            /// where their own ids were included.
763
            std::unique_lock<std::mutex> lock(update_mutex);
N
Nikita Mikhaylov 已提交
764

765 766
            for (auto & unit_ptr: update_request)
                unit_ptr->is_done = true;
N
Nikita Mikhaylov 已提交
767

768
            is_update_finished.notify_all();
N
mvc  
Nikita Mikhaylov 已提交
769
        }
N
Nikita Mikhaylov 已提交
770 771
        catch (...)
        {
772
            std::unique_lock<std::mutex> lock(update_mutex);
773 774 775 776 777
            /// It is a big trouble, because one bad query can make other threads fail with not relative exception.
            /// So at this point all threads (and queries) will receive the same exception.
            for (auto & unit_ptr: update_request)
                unit_ptr->current_exception = std::current_exception();

778
            is_update_finished.notify_all();
N
Nikita Mikhaylov 已提交
779
        }
N
mvc  
Nikita Mikhaylov 已提交
780 781 782
    }
}

N
Nikita Mikhaylov 已提交
783
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
784 785
{
    std::unique_lock<std::mutex> lock(update_mutex);
786 787 788 789 790 791 792 793 794 795

    /*
     * We wait here without any timeout to avoid SEGFAULT's.
     * Consider timeout for wait had expired and main query's thread ended with exception
     * or some other error. But the UpdateUnit with callbacks is left in the queue.
     * It has these callback that capture god knows what from the current thread
     * (most of the variables lies on the stack of finished thread) that
     * intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
     * */
    is_update_finished.wait(
796 797 798
            lock,
            [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });

799 800 801 802
    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
}

N
Nikita Mikhaylov 已提交
803
void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const
N
mvc  
Nikita Mikhaylov 已提交
804
{
805
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
N
Nikita Mikhaylov 已提交
806 807 808 809
        throw DB::Exception(
                "Cannot push to internal update queue in dictionary " + getFullName() + ". Timelimit of " +
                std::to_string(update_queue_push_timeout_milliseconds) + " ms. exceeded. Current queue size is " +
                std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
N
mvc  
Nikita Mikhaylov 已提交
810 811
}

N
Nikita Mikhaylov 已提交
812
void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
N
Nikita Mikhaylov 已提交
813 814
{
    CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
N
Nikita Mikhaylov 已提交
815
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size());
N
Nikita Mikhaylov 已提交
816

N
Nikita Mikhaylov 已提交
817 818 819
    std::unordered_map<Key, UInt8> remaining_ids{bunch_update_unit.getRequestedIds().size()};
    for (const auto id : bunch_update_unit.getRequestedIds())
        remaining_ids.insert({id, 0});
N
Nikita Mikhaylov 已提交
820

N
Nikita Mikhaylov 已提交
821
    const auto now = std::chrono::system_clock::now();
N
Nikita Mikhaylov 已提交
822

N
Nikita Mikhaylov 已提交
823
    if (now > backoff_end_time.load())
N
Nikita Mikhaylov 已提交
824 825 826 827 828 829 830 831 832 833 834
    {
        try
        {
            if (error_count)
            {
                /// Recover after error: we have to clone the source here because
                /// it could keep connections which should be reset after error.
                source_ptr = source_ptr->clone();
            }

            Stopwatch watch;
N
Nikita Mikhaylov 已提交
835
            auto stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
N
Nikita Mikhaylov 已提交
836

N
Nikita Mikhaylov 已提交
837
            const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
N
Nikita Mikhaylov 已提交
838 839

            stream->readPrefix();
N
Nikita Mikhaylov 已提交
840
            while (const auto block = stream->read())
N
Nikita Mikhaylov 已提交
841
            {
N
Nikita Mikhaylov 已提交
842 843
                const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
                if (!id_column)
N
Nikita Mikhaylov 已提交
844
                    throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
N
Nikita Mikhaylov 已提交
845

N
Nikita Mikhaylov 已提交
846
                const auto & ids = id_column->getData();
N
Nikita Mikhaylov 已提交
847

N
Nikita Mikhaylov 已提交
848 849
                /// cache column pointers
                const auto column_ptrs = ext::map<std::vector>(
N
Nikita Mikhaylov 已提交
850
                        ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
N
Nikita Mikhaylov 已提交
851

N
Nikita Mikhaylov 已提交
852 853 854
                for (const auto i : ext::range(0, ids.size()))
                {
                    const auto id = ids[i];
N
Nikita Mikhaylov 已提交
855

N
Nikita Mikhaylov 已提交
856
                    const auto find_result = findCellIdx(id, now);
N
Nikita Mikhaylov 已提交
857
                    const auto & cell_idx = find_result.cell_idx;
N
Nikita Mikhaylov 已提交
858

N
Nikita Mikhaylov 已提交
859
                    auto & cell = cells[cell_idx];
N
Nikita Mikhaylov 已提交
860

N
Nikita Mikhaylov 已提交
861 862
                    for (const auto attribute_idx : ext::range(0, attributes.size()))
                    {
N
Nikita Mikhaylov 已提交
863 864
                        const auto & attribute_column = *column_ptrs[attribute_idx];
                        auto & attribute = attributes[attribute_idx];
N
Nikita Mikhaylov 已提交
865

N
Nikita Mikhaylov 已提交
866
                        setAttributeValue(attribute, cell_idx, attribute_column[i]);
N
Nikita Mikhaylov 已提交
867
                    }
N
Nikita Mikhaylov 已提交
868 869 870 871 872 873 874 875

                    /// if cell id is zero and zero does not map to this cell, then the cell is unused
                    if (cell.id == 0 && cell_idx != zero_cell_idx)
                        element_count.fetch_add(1, std::memory_order_relaxed);

                    cell.id = id;
                    if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
                    {
N
Nikita Mikhaylov 已提交
876
                        std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
N
Nikita Mikhaylov 已提交
877
                        cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
N
Nikita Mikhaylov 已提交
878 879
                    }
                    else
N
Nikita Mikhaylov 已提交
880 881
                        cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

N
Nikita Mikhaylov 已提交
882 883

                    bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
N
Nikita Mikhaylov 已提交
884 885
                    /// mark corresponding id as found
                    remaining_ids[id] = 1;
N
Nikita Mikhaylov 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
                }
            }

            stream->readSuffix();

            error_count = 0;
            last_exception = std::exception_ptr{};
            backoff_end_time = std::chrono::system_clock::time_point{};

            ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
        }
        catch (...)
        {
            ++error_count;
            last_exception = std::current_exception();
            backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));

N
Nikita Mikhaylov 已提交
903
            tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() +
N
Nikita Mikhaylov 已提交
904
                                                 "', next update is scheduled at " + ext::to_string(backoff_end_time.load()));
N
Nikita Mikhaylov 已提交
905 906 907
        }
    }

N
Nikita Mikhaylov 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
    size_t not_found_num = 0, found_num = 0;

    const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};

    /// Check which ids have not been found and require setting null_value
    for (const auto & id_found_pair : remaining_ids)
    {
        if (id_found_pair.second)
        {
            ++found_num;
            continue;
        }
        ++not_found_num;

        const auto id = id_found_pair.first;

        const auto find_result = findCellIdx(id, now);
        const auto & cell_idx = find_result.cell_idx;
        auto & cell = cells[cell_idx];

        if (error_count)
        {
            if (find_result.outdated)
            {
                /// We have expired data for that `id` so we can continue using it.
                bool was_default = cell.isDefault();
                cell.setExpiresAt(backoff_end_time);
                if (was_default)
                    cell.setDefault();
                if (was_default)
                    bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
                else
                    bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
                continue;
            }
            /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
            std::rethrow_exception(last_exception);
        }

        /// Check if cell had not been occupied before and increment element counter if it hadn't
        if (cell.id == 0 && cell_idx != zero_cell_idx)
            element_count.fetch_add(1, std::memory_order_relaxed);

        cell.id = id;

        if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
        {
            std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
            cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
        }
        else
            cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

        /// Set null_value for each attribute
        cell.setDefault();
        for (auto & attribute : attributes)
            setDefaultAttributeValue(attribute, cell_idx);

        /// inform caller that the cell has not been found
        bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
    }
N
Nikita Mikhaylov 已提交
969

N
Nikita Mikhaylov 已提交
970
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
N
Nikita Mikhaylov 已提交
971 972 973 974
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
    ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}

975
}