CacheDictionary.cpp 38.2 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include <Common/setThreadName.h>
16
#include "CacheDictionary.inc.h"
P
proller 已提交
17 18
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
19

20 21
namespace ProfileEvents
{
P
proller 已提交
22 23 24 25 26 27 28 29 30 31
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
32 33 34 35
}

namespace CurrentMetrics
{
P
proller 已提交
36
extern const Metric DictCacheRequests;
37 38 39
}


40 41 42 43
namespace DB
{
namespace ErrorCodes
{
44 45 46
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
47
    extern const int LOGICAL_ERROR;
48
    extern const int TOO_SMALL_BUFFER_SIZE;
49 50 51
}


P
proller 已提交
52
inline size_t CacheDictionary::getCellIdx(const Key id) const
53
{
54 55 56
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
57 58 59
}


P
proller 已提交
60
CacheDictionary::CacheDictionary(
61
    const std::string & database_,
K
kreuzerkrieg 已提交
62 63 64 65
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
    const DictionaryLifetime dict_lifetime_,
66 67 68
    const size_t size_,
    const bool allow_read_expired_keys_,
    const size_t max_update_queue_size_,
69 70
    const size_t update_queue_push_timeout_milliseconds_,
    const size_t each_update_finish_timeout_seconds_)
71 72 73
    : database(database_)
    , name(name_)
    , full_name{database_.empty() ? name_ : (database_ + "." + name_)}
K
kreuzerkrieg 已提交
74 75 76
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
77 78 79
    , allow_read_expired_keys(allow_read_expired_keys_)
    , max_update_queue_size(max_update_queue_size_)
    , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
80
    , each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
81
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
82
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
83 84 85
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
86
    , update_queue(max_update_queue_size_)
N
better  
Nikita Mikhaylov 已提交
87
    , update_pool(4)
88
{
89
    if (!this->source_ptr->supportsSelectiveLoad())
90
        throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
91

92
    createAttributes();
N
better  
Nikita Mikhaylov 已提交
93
    for (int i = 0; i < 4; ++i)
N
Nikita Mikhaylov 已提交
94
    {
N
better  
Nikita Mikhaylov 已提交
95
        update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
N
Nikita Mikhaylov 已提交
96
    }
N
mvc  
Nikita Mikhaylov 已提交
97 98 99 100 101 102
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
N
better  
Nikita Mikhaylov 已提交
103
    for (int i = 0; i < 4; ++i) {
N
Nikita Mikhaylov 已提交
104 105 106
        auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
        update_queue.push(empty_finishing_ptr);
    }
N
better  
Nikita Mikhaylov 已提交
107
    update_pool.wait();
108 109 110
}


A
Alexey Milovidov 已提交
111
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
112
{
113
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
114

115
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
116 117 118
}


119
/// Allow to use single value in same way as array.
P
proller 已提交
120 121 122 123 124 125 126 127
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
128 129 130


template <typename AncestorType>
P
proller 已提交
131
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
132
{
133 134
    /// Transform all children to parents until ancestor id or null_value will be reached.

135
    size_t out_size = out.size();
P
proller 已提交
136
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
137 138 139

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

140
    PaddedPODArray<Key> children(out_size, 0);
141 142 143 144 145 146 147 148
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

149
        while (out_idx < out_size)
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
168
            /// Loop detected
169 170
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
171
                out[out_idx] = 1;
172
            }
A
alexey-milovidov 已提交
173
            /// Found intermediate parent, add this value to search at next loop iteration
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
193 194 195
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
196
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
197
{
198
    isInImpl(child_ids, ancestor_ids, out);
199
}
200

P
proller 已提交
201
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
202
{
203
    isInImpl(child_ids, ancestor_id, out);
204
}
205

P
proller 已提交
206
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
207
{
208
    /// Special case with single child value.
209

210
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
211

212 213 214
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
215

216 217 218 219
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
220

221 222
        if (parent[0] == null_value)
            break;
223

224 225 226
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
227

228
    /// Assuming short hierarchy, so linear search is Ok.
229
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
230
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
231
}
232

A
Alexey Milovidov 已提交
233
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
234
{
235
    auto & attribute = getAttribute(attribute_name);
236
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
237

238
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
239

P
proller 已提交
240
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
241 242 243
}

void CacheDictionary::getString(
P
proller 已提交
244
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
245
{
246
    auto & attribute = getAttribute(attribute_name);
247
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
248

P
proller 已提交
249
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
250 251 252
}

void CacheDictionary::getString(
P
proller 已提交
253
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
254
{
255
    auto & attribute = getAttribute(attribute_name);
256
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
257

P
proller 已提交
258
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
259 260 261
}


262
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
263 264 265 266 267 268 269 270
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
300 301
}

A
Alexey Milovidov 已提交
302
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
303
{
N
mvc  
Nikita Mikhaylov 已提交
304 305 306 307 308
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

309
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
310 311
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
312

313
    size_t cache_hit = 0;
N
mvc  
Nikita Mikhaylov 已提交
314

315 316 317 318 319 320 321 322 323 324 325
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
326 327 328 329 330 331 332

            auto insert_to_answer_routine = [&] ()
            {
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            };

333 334 335
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
336 337 338
                {
                    cache_expired_ids[id].push_back(row);

339 340
                    if (allow_read_expired_keys)
                        insert_to_answer_routine();
N
mvc  
Nikita Mikhaylov 已提交
341
                }
342
                else
N
mvc  
Nikita Mikhaylov 已提交
343 344 345
                {
                    cache_not_found_ids[id].push_back(row);
                }
346 347 348 349
            }
            else
            {
                ++cache_hit;
350
                insert_to_answer_routine();
351 352 353 354
            }
        }
    }

355 356
    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
357 358 359
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
360
    hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
361

362
    if (cache_not_found_ids.empty())
N
mvc  
Nikita Mikhaylov 已提交
363
    {
364 365 366 367 368 369 370 371 372 373 374 375 376
        /// Nothing to update - return;
        if (cache_expired_ids.empty())
            return;

        if (allow_read_expired_keys)
        {
            std::vector<Key> required_expired_ids;
            required_expired_ids.reserve(cache_expired_ids.size());
            std::transform(
                    std::begin(cache_expired_ids), std::end(cache_expired_ids),
                    std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });

            /// Callbacks are empty because we don't want to receive them after an unknown period of time.
N
Nikita Mikhaylov 已提交
377
            auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
378 379 380 381 382

            tryPushToUpdateQueueOrThrow(update_unit_ptr);
            /// Update is async - no need to wait.
            return;
        }
N
mvc  
Nikita Mikhaylov 已提交
383
    }
384

385 386
    /// At this point we have two situations.
    /// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
N
mvc  
Nikita Mikhaylov 已提交
387 388
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
389
    std::vector<Key> required_ids;
390 391 392 393 394 395 396
    required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
    std::transform(
            std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(
            std::begin(cache_expired_ids), std::end(cache_expired_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
397

N
Nikita Mikhaylov 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    auto on_cell_updated = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = true;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto on_id_not_found = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = false;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids);
N
mvc  
Nikita Mikhaylov 已提交
415

416 417
    tryPushToUpdateQueueOrThrow(update_unit_ptr);
    waitForCurrentUpdateFinish(update_unit_ptr);
N
Nikita Mikhaylov 已提交
418
    prepareAnswer(update_unit_ptr, on_cell_updated, on_id_not_found);
419 420 421 422 423
}


void CacheDictionary::createAttributes()
{
424 425
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
426 427

    bytes_allocated += size * sizeof(CellMetadata);
428
    bytes_allocated += attributes_size * sizeof(attributes.front());
429 430 431 432 433 434 435 436

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
437
            hierarchical_attribute = &attributes.back();
438

K
kreuzerkrieg 已提交
439
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
440
                throw Exception{full_name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
441 442
        }
    }
443 444
}

A
Alexey Milovidov 已提交
445
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
446
{
A
Alexey Milovidov 已提交
447
    Attribute attr{type, {}, {}};
448 449 450

    switch (type)
    {
P
proller 已提交
451
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
452
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
453
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
454 455
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
456
        break;
A
Amos Bird 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
472
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
473 474
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
475 476 477 478 479 480 481
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
482 483
}

A
Alexey Milovidov 已提交
484
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
485
{
486 487
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
488
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
489 490
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
491
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
492 493
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
494
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
495 496
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
497
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
498 499
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
500
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
501 502
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
503
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
504 505
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
506
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
507 508
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
509
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
510 511
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
512
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
513 514
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
515
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
516 517
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
518
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
519 520
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
521

K
kreuzerkrieg 已提交
522
        case AttributeUnderlyingType::utDecimal32:
523 524
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
525
        case AttributeUnderlyingType::utDecimal64:
526 527
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
528
        case AttributeUnderlyingType::utDecimal128:
529 530 531
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
532
        case AttributeUnderlyingType::utString:
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
548 549
}

A
Alexey Milovidov 已提交
550
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
551
{
552 553
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
554
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
555 556
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
557
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
558 559
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
560
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
561 562
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
563
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
564 565
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
566
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
567 568
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
569
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
570 571
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
572
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
573 574
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
575
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
576 577
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
578
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
579 580
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
581
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
582 583
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
584
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
585 586 587
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
588
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
589 590
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
591
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
592 593
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
594
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
595 596
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
597

K
kreuzerkrieg 已提交
598
        case AttributeUnderlyingType::utString:
599 600 601 602 603 604 605 606 607
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

608 609
            const auto str_size = string.size();
            if (str_size != 0)
610
            {
611 612 613
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
614 615 616 617 618 619 620
            }
            else
                string_ref = {};

            break;
        }
    }
621 622
}

A
Alexey Milovidov 已提交
623
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
624
{
625 626
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
627
        throw Exception{full_name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
628 629

    return attributes[it->second];
630 631
}

632 633
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
634 635
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
636 637 638 639
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
640 641
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

642 643 644
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
645
        auto & cell = cells[idx];
646
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
647 648 649 650 651 652 653
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

654
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
655
{
656
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
657
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
658 659
}

660 661 662 663 664 665
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

666 667
void registerDictionaryCache(DictionaryFactory & factory)
{
668
    auto create_layout = [=](const std::string & full_name,
P
proller 已提交
669 670 671
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
672 673
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
674
        if (dict_struct.key)
675 676
            throw Exception{"'key' is not supported for dictionary of layout 'cache'",
                            ErrorCodes::UNSUPPORTED_METHOD};
677 678

        if (dict_struct.range_min || dict_struct.range_max)
679
            throw Exception{full_name
P
proller 已提交
680 681 682
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
683
        const auto & layout_prefix = config_prefix + ".layout";
684

N
Nikita Mikhaylov 已提交
685
        const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
686
        if (size == 0)
N
better  
Nikita Mikhaylov 已提交
687
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells",
688
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};
689 690 691

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
692
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
P
proller 已提交
693
                            ErrorCodes::BAD_ARGUMENTS};
694

695 696
        const String database = config.getString(config_prefix + ".database", "");
        const String name = config.getString(config_prefix + ".name");
P
proller 已提交
697
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
698

N
Nikita Mikhaylov 已提交
699
        const size_t max_update_queue_size =
700 701 702 703 704 705 706 707
                config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
        if (max_update_queue_size == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool allow_read_expired_keys =
                config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);

N
Nikita Mikhaylov 已提交
708
        const size_t update_queue_push_timeout_milliseconds =
709 710 711 712 713
                config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
        if (update_queue_push_timeout_milliseconds < 10)
            throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
                            ErrorCodes::BAD_ARGUMENTS};

714 715 716 717 718 719
        const size_t each_update_finish_timeout_seconds =
                config.getUInt64(layout_prefix + ".each_update_finish_timeout_seconds", 600);
        if (each_update_finish_timeout_seconds == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
                            ErrorCodes::BAD_ARGUMENTS};

720
        return std::make_unique<CacheDictionary>(
721
                database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
722
                allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
723
    };
724
    factory.registerLayout("cache", create_layout, false);
725 726
}

N
mvc  
Nikita Mikhaylov 已提交
727 728
void CacheDictionary::updateThreadFunction()
{
729
    setThreadName("AsyncUpdater");
N
Nikita Mikhaylov 已提交
730
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
731
    {
N
Nikita Mikhaylov 已提交
732

N
Nikita Mikhaylov 已提交
733

N
Nikita Mikhaylov 已提交
734 735 736
        UpdateUnitPtr first_popped;
        update_queue.pop(first_popped);

N
Nikita Mikhaylov 已提交
737 738 739
        if (finished)
            break;

N
Nikita Mikhaylov 已提交
740
        ///std::this_thread::sleep_for(std::chrono::milliseconds(10));
N
better  
Nikita Mikhaylov 已提交
741

742 743 744 745
        /// Here we pop as many unit pointers from update queue as we can.
        /// We fix current size to avoid livelock (or too long waiting),
        /// when this thread pops from the queue and other threads push to the queue.
        const size_t current_queue_size = update_queue.size();
N
Nikita Mikhaylov 已提交
746 747 748

        /// Word "bunch" must present in this log message, because it is being checked in tests.
        if (current_queue_size > 0)
N
Nikita Mikhaylov 已提交
749
            LOG_DEBUG(log, "Performing bunch of keys update in cache dictionary with " << current_queue_size + 1 << " keys"; );
N
Nikita Mikhaylov 已提交
750

N
better  
Nikita Mikhaylov 已提交
751
        std::vector<UpdateUnitPtr> update_request;
N
Nikita Mikhaylov 已提交
752

N
better  
Nikita Mikhaylov 已提交
753
        update_request.push_back(first_popped);
N
Nikita Mikhaylov 已提交
754

N
better  
Nikita Mikhaylov 已提交
755
        auto current_unit_ptr = UpdateUnitPtr();
N
Nikita Mikhaylov 已提交
756

N
better  
Nikita Mikhaylov 已提交
757
        while (update_queue.tryPop(current_unit_ptr))
N
Nikita Mikhaylov 已提交
758
        {
N
better  
Nikita Mikhaylov 已提交
759
            update_request.push_back(current_unit_ptr);
N
Nikita Mikhaylov 已提交
760
        }
N
Nikita Mikhaylov 已提交
761

762 763
        /// Here we prepare total count of all requested ids
        /// not to do useless allocations later.
N
Nikita Mikhaylov 已提交
764
        size_t total_requested_keys_count = 0;
765
        for (auto & unit_ptr: update_request)
N
Nikita Mikhaylov 已提交
766
            total_requested_keys_count += unit_ptr->requested_ids.size();
767 768

        std::vector<Key> concatenated_requested_ids;
N
Nikita Mikhaylov 已提交
769
        concatenated_requested_ids.reserve(total_requested_keys_count);
770 771 772 773
        for (auto & unit_ptr: update_request)
            std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
                    [&] (const Key & key) {concatenated_requested_ids.push_back(key);});

N
Nikita Mikhaylov 已提交
774
        try
N
mvc  
Nikita Mikhaylov 已提交
775
        {
776
            auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(concatenated_requested_ids.size());
N
Nikita Mikhaylov 已提交
777

778 779 780 781
            /// Copy shared_ptr to let this map be alive until other thread finish his stuff.
            /// It is thread safe because writing to the map happens before reading from multiple threads.
            for (auto & unit_ptr: update_request)
                unit_ptr->found_ids_mask_ptr = found_ids_mask_ptr;
N
Nikita Mikhaylov 已提交
782

783
            for (const auto id : concatenated_requested_ids)
N
Nikita Mikhaylov 已提交
784 785
                found_ids_mask_ptr->insert({id, 0});

786 787 788 789 790
            /// Update a bunch of ids.
            update(concatenated_requested_ids, *found_ids_mask_ptr);

            /// Notify all threads about finished updating the bunch of ids
            /// where their own ids were included.
791
            std::unique_lock<std::mutex> lock(update_mutex);
N
Nikita Mikhaylov 已提交
792

793 794
            for (auto & unit_ptr: update_request)
                unit_ptr->is_done = true;
N
Nikita Mikhaylov 已提交
795

796
            is_update_finished.notify_all();
N
mvc  
Nikita Mikhaylov 已提交
797
        }
N
Nikita Mikhaylov 已提交
798 799
        catch (...)
        {
800
            std::unique_lock<std::mutex> lock(update_mutex);
801 802 803 804 805
            /// It is a big trouble, because one bad query can make other threads fail with not relative exception.
            /// So at this point all threads (and queries) will receive the same exception.
            for (auto & unit_ptr: update_request)
                unit_ptr->current_exception = std::current_exception();

806
            is_update_finished.notify_all();
N
Nikita Mikhaylov 已提交
807
        }
N
mvc  
Nikita Mikhaylov 已提交
808 809 810
    }
}

N
Nikita Mikhaylov 已提交
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
void CacheDictionary::updateMultiThreadFunction()
{
    setThreadName("AsyncUpdater");

    const size_t thread_number = global_update_thread_number.fetch_add(1);

    while (!finished)
    {
        UpdateUnitPtr first_popped;
        update_queue.pop(first_popped);

        if (finished)
            break;

        LOG_TRACE(log, "update with thread number " << thread_number);

N
better  
Nikita Mikhaylov 已提交
827 828
        auto start = std::chrono::system_clock::now();

N
Nikita Mikhaylov 已提交
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
        try
        {
            auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(first_popped->requested_ids.size());

            /// Copy shared_ptr to let this map be alive until other thread finish his stuff.
            /// It is thread safe because writing to the map happens before reading from multiple threads.
            first_popped->found_ids_mask_ptr = found_ids_mask_ptr;

            for (const auto id : first_popped->requested_ids)
                found_ids_mask_ptr->insert({id, 0});

            /// Update a bunch of ids.
            update(first_popped->requested_ids, *found_ids_mask_ptr);

            /// Notify all threads about finished updating the bunch of ids
            /// where their own ids were included.
            std::unique_lock<std::mutex> lock(update_mutex);

            first_popped->is_done = true;
            is_update_finished.notify_all();
        }
        catch (...)
        {
            std::unique_lock<std::mutex> lock(update_mutex);

            first_popped->current_exception = std::current_exception();
            is_update_finished.notify_all();
        }
N
better  
Nikita Mikhaylov 已提交
857 858 859 860 861 862

        auto end = std::chrono::system_clock::now();

        auto duration = end - start;

        LOG_FATAL(log, "full update  " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << " ms");
N
Nikita Mikhaylov 已提交
863 864 865
    }
}

866 867 868
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
    std::unique_lock<std::mutex> lock(update_mutex);
869 870 871 872 873 874 875
    const auto sleeping_result = is_update_finished.wait_for(
            lock,
            std::chrono::minutes(each_update_finish_timeout_seconds),
            [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });

    if (!sleeping_result)
        throw DB::Exception("Keys updating timed out", ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
876 877 878 879 880 881

    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
}

void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
N
mvc  
Nikita Mikhaylov 已提交
882
{
883 884 885
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
        throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
        std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
N
mvc  
Nikita Mikhaylov 已提交
886 887
}

N
Nikita Mikhaylov 已提交
888 889 890 891 892 893 894 895

void CacheDictionary::update(const std::vector<Key> & requested_ids, std::unordered_map<Key, UInt8> & remaining_ids) const
{
    CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());

    const auto now = std::chrono::system_clock::now();

N
Nikita Mikhaylov 已提交
896 897
    size_t found_num = 0;

N
Nikita Mikhaylov 已提交
898 899 900 901 902 903 904 905 906 907 908 909 910
    if (now > backoff_end_time)
    {
        try
        {
            if (error_count)
            {
                /// Recover after error: we have to clone the source here because
                /// it could keep connections which should be reset after error.
                source_ptr = source_ptr->clone();
            }

            Stopwatch watch;
            /// Go to external storage. Might be very slow and blocking.
N
Nikita Mikhaylov 已提交
911 912
            auto start = std::chrono::system_clock::now();

N
Nikita Mikhaylov 已提交
913 914
            auto stream = source_ptr->loadIds(requested_ids);

N
Nikita Mikhaylov 已提交
915 916 917 918 919
            auto end = std::chrono::system_clock::now();

            std::chrono::duration<double> diff = end-start;

            LOG_FATAL(log, "load ids  " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
N
Nikita Mikhaylov 已提交
920 921 922

            stream->readPrefix();

N
Nikita Mikhaylov 已提交
923
            while (true)
N
Nikita Mikhaylov 已提交
924
            {
N
Nikita Mikhaylov 已提交
925 926 927 928 929
                start = std::chrono::system_clock::now();
                if (const auto block = stream->read()) {
                    end = std::chrono::system_clock::now();
                    diff = end - start;
                    LOG_FATAL(log, "read  " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
N
Nikita Mikhaylov 已提交
930

N
Nikita Mikhaylov 已提交
931 932 933 934
                    const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
                    if (!id_column)
                        throw Exception{name + ": id column has type different from UInt64.",
                                        ErrorCodes::TYPE_MISMATCH};
N
Nikita Mikhaylov 已提交
935

N
Nikita Mikhaylov 已提交
936
                    const auto &ids = id_column->getData();
N
Nikita Mikhaylov 已提交
937

N
Nikita Mikhaylov 已提交
938 939 940 941
                    /// cache column pointers
                    const auto column_ptrs = ext::map<std::vector>(
                            ext::range(0, attributes.size()),
                            [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
N
Nikita Mikhaylov 已提交
942

N
Nikita Mikhaylov 已提交
943
                    const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
N
Nikita Mikhaylov 已提交
944

N
Nikita Mikhaylov 已提交
945 946
                    for (const auto i : ext::range(0, ids.size())) {
                        const auto id = ids[i];
N
Nikita Mikhaylov 已提交
947

N
Nikita Mikhaylov 已提交
948 949
                        const auto find_result = findCellIdx(id, now);
                        const auto &cell_idx = find_result.cell_idx;
N
Nikita Mikhaylov 已提交
950

N
Nikita Mikhaylov 已提交
951
                        auto &cell = cells[cell_idx];
N
Nikita Mikhaylov 已提交
952

N
Nikita Mikhaylov 已提交
953 954 955
                        for (const auto attribute_idx : ext::range(0, attributes.size())) {
                            const auto &attribute_column = *column_ptrs[attribute_idx];
                            auto &attribute = attributes[attribute_idx];
N
Nikita Mikhaylov 已提交
956

N
Nikita Mikhaylov 已提交
957 958
                            setAttributeValue(attribute, cell_idx, attribute_column[i]);
                        }
N
Nikita Mikhaylov 已提交
959

N
Nikita Mikhaylov 已提交
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
                        /// if cell id is zero and zero does not map to this cell, then the cell is unused
                        if (cell.id == 0 && cell_idx != zero_cell_idx)
                            element_count.fetch_add(1, std::memory_order_relaxed);

                        cell.id = id;
                        if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) {
                            std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec,
                                                                               dict_lifetime.max_sec};
                            cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
                        } else
                            cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

                        /// mark corresponding id as found
                        remaining_ids[id] = 1;
                        ++found_num;
                    }
                } else {
                    break;
N
Nikita Mikhaylov 已提交
978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
                }
            }

            stream->readSuffix();

            error_count = 0;
            last_exception = std::exception_ptr{};
            backoff_end_time = std::chrono::system_clock::time_point{};

            ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
        }
        catch (...)
        {
            ++error_count;
            last_exception = std::current_exception();
            backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));

            tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
                                                 "', next update is scheduled at " + ext::to_string(backoff_end_time));
        }
    }


N
Nikita Mikhaylov 已提交
1001
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, remaining_ids.size() - found_num);
N
Nikita Mikhaylov 已提交
1002 1003 1004 1005
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
    ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}

1006
}