CacheDictionary.cpp 37.1 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include <Common/setThreadName.h>
16
#include "CacheDictionary.inc.h"
P
proller 已提交
17 18
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
19

20 21
namespace ProfileEvents
{
P
proller 已提交
22 23 24 25 26 27 28 29 30 31
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
32 33 34 35
}

namespace CurrentMetrics
{
P
proller 已提交
36
extern const Metric DictCacheRequests;
37 38 39
}


40 41 42 43
namespace DB
{
namespace ErrorCodes
{
44 45 46
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
47
    extern const int LOGICAL_ERROR;
48
    extern const int TOO_SMALL_BUFFER_SIZE;
49 50 51
}


P
proller 已提交
52
inline size_t CacheDictionary::getCellIdx(const Key id) const
53
{
54 55 56
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
57 58 59
}


P
proller 已提交
60
CacheDictionary::CacheDictionary(
61
    const std::string & database_,
K
kreuzerkrieg 已提交
62 63 64
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
N
cleanup  
Nikita Mikhaylov 已提交
65 66 67 68 69 70 71
    DictionaryLifetime dict_lifetime_,
    size_t size_,
    bool allow_read_expired_keys_,
    size_t max_update_queue_size_,
    size_t update_queue_push_timeout_milliseconds_,
    size_t each_update_finish_timeout_seconds_,
    size_t max_threads_for_updates_)
72 73 74
    : database(database_)
    , name(name_)
    , full_name{database_.empty() ? name_ : (database_ + "." + name_)}
K
kreuzerkrieg 已提交
75 76 77
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
78 79 80
    , allow_read_expired_keys(allow_read_expired_keys_)
    , max_update_queue_size(max_update_queue_size_)
    , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
81
    , each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
N
Nikita Mikhaylov 已提交
82
    , max_threads_for_updates(max_threads_for_updates_)
83
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
84
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
85 86 87
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
88
    , update_queue(max_update_queue_size_)
N
Nikita Mikhaylov 已提交
89
    , update_pool(max_threads_for_updates)
90
{
91
    if (!this->source_ptr->supportsSelectiveLoad())
92
        throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
93

94
    createAttributes();
N
Nikita Mikhaylov 已提交
95
    for (size_t i = 0; i < max_threads_for_updates; ++i)
N
better  
Nikita Mikhaylov 已提交
96
        update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
N
mvc  
Nikita Mikhaylov 已提交
97 98 99 100 101 102
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
N
Nikita Mikhaylov 已提交
103 104
    for (size_t i = 0; i < max_threads_for_updates; ++i)
    {
N
Nikita Mikhaylov 已提交
105 106 107
        auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
        update_queue.push(empty_finishing_ptr);
    }
N
better  
Nikita Mikhaylov 已提交
108
    update_pool.wait();
109 110 111
}


A
Alexey Milovidov 已提交
112
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
113
{
114
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
115

116
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
117 118 119
}


120
/// Allow to use single value in same way as array.
P
proller 已提交
121 122 123 124 125 126 127 128
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
129 130 131


template <typename AncestorType>
P
proller 已提交
132
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
133
{
134 135
    /// Transform all children to parents until ancestor id or null_value will be reached.

136
    size_t out_size = out.size();
P
proller 已提交
137
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
138 139 140

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

141
    PaddedPODArray<Key> children(out_size, 0);
142 143 144 145 146 147 148 149
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

150
        while (out_idx < out_size)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
169
            /// Loop detected
170 171
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
172
                out[out_idx] = 1;
173
            }
A
alexey-milovidov 已提交
174
            /// Found intermediate parent, add this value to search at next loop iteration
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
194 195 196
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
197
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
198
{
199
    isInImpl(child_ids, ancestor_ids, out);
200
}
201

P
proller 已提交
202
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
203
{
204
    isInImpl(child_ids, ancestor_id, out);
205
}
206

P
proller 已提交
207
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
208
{
209
    /// Special case with single child value.
210

211
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
212

213 214 215
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
216

217 218 219 220
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
221

222 223
        if (parent[0] == null_value)
            break;
224

225 226 227
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
228

229
    /// Assuming short hierarchy, so linear search is Ok.
230
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
231
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
232
}
233

A
Alexey Milovidov 已提交
234
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
235
{
236
    auto & attribute = getAttribute(attribute_name);
237
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
238

239
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
240

P
proller 已提交
241
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
242 243 244
}

void CacheDictionary::getString(
P
proller 已提交
245
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
246
{
247
    auto & attribute = getAttribute(attribute_name);
248
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
249

P
proller 已提交
250
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
251 252 253
}

void CacheDictionary::getString(
P
proller 已提交
254
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
255
{
256
    auto & attribute = getAttribute(attribute_name);
257
    checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
258

P
proller 已提交
259
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
260 261 262
}


263
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
264 265 266 267 268 269 270 271
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
301 302
}

A
Alexey Milovidov 已提交
303
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
304
{
N
mvc  
Nikita Mikhaylov 已提交
305 306 307 308 309
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

310
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
311 312
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
313

314
    size_t cache_hit = 0;
N
mvc  
Nikita Mikhaylov 已提交
315

316 317 318 319 320 321 322 323 324 325 326
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
327 328 329 330 331 332 333

            auto insert_to_answer_routine = [&] ()
            {
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            };

334 335 336
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
337 338 339
                {
                    cache_expired_ids[id].push_back(row);

340 341
                    if (allow_read_expired_keys)
                        insert_to_answer_routine();
N
mvc  
Nikita Mikhaylov 已提交
342
                }
343
                else
N
mvc  
Nikita Mikhaylov 已提交
344 345 346
                {
                    cache_not_found_ids[id].push_back(row);
                }
347 348 349 350
            }
            else
            {
                ++cache_hit;
351
                insert_to_answer_routine();
352 353 354 355
            }
        }
    }

356 357
    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
358 359 360
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
361
    hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
362

363
    if (cache_not_found_ids.empty())
N
mvc  
Nikita Mikhaylov 已提交
364
    {
365 366 367 368 369 370 371 372 373 374 375 376 377
        /// Nothing to update - return;
        if (cache_expired_ids.empty())
            return;

        if (allow_read_expired_keys)
        {
            std::vector<Key> required_expired_ids;
            required_expired_ids.reserve(cache_expired_ids.size());
            std::transform(
                    std::begin(cache_expired_ids), std::end(cache_expired_ids),
                    std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });

            /// Callbacks are empty because we don't want to receive them after an unknown period of time.
N
Nikita Mikhaylov 已提交
378
            auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
379 380 381 382 383

            tryPushToUpdateQueueOrThrow(update_unit_ptr);
            /// Update is async - no need to wait.
            return;
        }
N
mvc  
Nikita Mikhaylov 已提交
384
    }
385

386 387
    /// At this point we have two situations.
    /// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
N
mvc  
Nikita Mikhaylov 已提交
388 389
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
390
    std::vector<Key> required_ids;
391 392 393 394 395 396 397
    required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
    std::transform(
            std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(
            std::begin(cache_expired_ids), std::end(cache_expired_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
398

N
Nikita Mikhaylov 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    auto on_cell_updated = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = true;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

    auto on_id_not_found = [&] (const Key id, const size_t)
    {
        for (const auto row : cache_not_found_ids[id])
            out[row] = false;
        for (const auto row : cache_expired_ids[id])
            out[row] = true;
    };

N
Nikita Mikhaylov 已提交
415
    auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids, on_cell_updated, on_id_not_found);
N
mvc  
Nikita Mikhaylov 已提交
416

417 418
    tryPushToUpdateQueueOrThrow(update_unit_ptr);
    waitForCurrentUpdateFinish(update_unit_ptr);
419 420 421 422 423
}


void CacheDictionary::createAttributes()
{
424 425
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
426 427

    bytes_allocated += size * sizeof(CellMetadata);
428
    bytes_allocated += attributes_size * sizeof(attributes.front());
429 430 431 432 433 434 435 436

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
437
            hierarchical_attribute = &attributes.back();
438

K
kreuzerkrieg 已提交
439
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
440
                throw Exception{full_name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
441 442
        }
    }
443 444
}

A
Alexey Milovidov 已提交
445
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
446
{
A
Alexey Milovidov 已提交
447
    Attribute attr{type, {}, {}};
448 449 450

    switch (type)
    {
P
proller 已提交
451
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
452
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
453
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
454 455
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
456
        break;
A
Amos Bird 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
472
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
473 474
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
475 476 477 478 479 480 481
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
482 483
}

A
Alexey Milovidov 已提交
484
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
485
{
486 487
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
488
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
489 490
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
491
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
492 493
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
494
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
495 496
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
497
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
498 499
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
500
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
501 502
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
503
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
504 505
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
506
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
507 508
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
509
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
510 511
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
512
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
513 514
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
515
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
516 517
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
518
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
519 520
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
521

K
kreuzerkrieg 已提交
522
        case AttributeUnderlyingType::utDecimal32:
523 524
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
525
        case AttributeUnderlyingType::utDecimal64:
526 527
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
528
        case AttributeUnderlyingType::utDecimal128:
529 530 531
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
532
        case AttributeUnderlyingType::utString:
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
548 549
}

A
Alexey Milovidov 已提交
550
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
551
{
552 553
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
554
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
555 556
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
557
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
558 559
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
560
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
561 562
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
563
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
564 565
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
566
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
567 568
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
569
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
570 571
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
572
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
573 574
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
575
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
576 577
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
578
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
579 580
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
581
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
582 583
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
584
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
585 586 587
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
588
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
589 590
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
591
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
592 593
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
594
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
595 596
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
597

K
kreuzerkrieg 已提交
598
        case AttributeUnderlyingType::utString:
599 600 601 602 603 604 605 606 607
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

608 609
            const auto str_size = string.size();
            if (str_size != 0)
610
            {
611 612 613
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
614 615 616 617 618 619 620
            }
            else
                string_ref = {};

            break;
        }
    }
621 622
}

A
Alexey Milovidov 已提交
623
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
624
{
625 626
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
627
        throw Exception{full_name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
628 629

    return attributes[it->second];
630 631
}

632 633
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
634 635
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
636 637 638 639
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
640 641
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

642 643 644
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
645
        auto & cell = cells[idx];
646
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
647 648 649 650 651 652 653
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

654
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
655
{
656
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
657
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
658 659
}

660 661 662 663 664 665
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

666 667
void registerDictionaryCache(DictionaryFactory & factory)
{
668
    auto create_layout = [=](const std::string & full_name,
P
proller 已提交
669 670 671
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
672 673
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
674
        if (dict_struct.key)
675 676
            throw Exception{"'key' is not supported for dictionary of layout 'cache'",
                            ErrorCodes::UNSUPPORTED_METHOD};
677 678

        if (dict_struct.range_min || dict_struct.range_max)
679
            throw Exception{full_name
P
proller 已提交
680 681 682
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
683
        const auto & layout_prefix = config_prefix + ".layout";
684

N
Nikita Mikhaylov 已提交
685
        const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
686
        if (size == 0)
N
better  
Nikita Mikhaylov 已提交
687
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells",
688
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};
689 690 691

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
692
            throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
P
proller 已提交
693
                            ErrorCodes::BAD_ARGUMENTS};
694

695 696
        const String database = config.getString(config_prefix + ".database", "");
        const String name = config.getString(config_prefix + ".name");
P
proller 已提交
697
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
698

N
Nikita Mikhaylov 已提交
699
        const size_t max_update_queue_size =
700 701 702 703 704 705 706 707
                config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
        if (max_update_queue_size == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool allow_read_expired_keys =
                config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);

N
Nikita Mikhaylov 已提交
708
        const size_t update_queue_push_timeout_milliseconds =
709 710 711 712 713
                config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
        if (update_queue_push_timeout_milliseconds < 10)
            throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
                            ErrorCodes::BAD_ARGUMENTS};

714 715 716 717 718 719
        const size_t each_update_finish_timeout_seconds =
                config.getUInt64(layout_prefix + ".each_update_finish_timeout_seconds", 600);
        if (each_update_finish_timeout_seconds == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
                            ErrorCodes::BAD_ARGUMENTS};

N
Nikita Mikhaylov 已提交
720 721 722 723 724 725
        const size_t max_threads_for_updates =
                config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
        if (max_threads_for_updates == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have zero threads for updates.",
                            ErrorCodes::BAD_ARGUMENTS};

726
        return std::make_unique<CacheDictionary>(
727
                database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
N
Nikita Mikhaylov 已提交
728 729
                allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
                each_update_finish_timeout_seconds, max_threads_for_updates);
730
    };
731
    factory.registerLayout("cache", create_layout, false);
732 733
}

N
mvc  
Nikita Mikhaylov 已提交
734 735
void CacheDictionary::updateThreadFunction()
{
736
    setThreadName("AsyncUpdater");
N
Nikita Mikhaylov 已提交
737
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
738
    {
N
Nikita Mikhaylov 已提交
739 740 741
        UpdateUnitPtr first_popped;
        update_queue.pop(first_popped);

N
Nikita Mikhaylov 已提交
742 743 744
        if (finished)
            break;

745 746 747 748
        /// Here we pop as many unit pointers from update queue as we can.
        /// We fix current size to avoid livelock (or too long waiting),
        /// when this thread pops from the queue and other threads push to the queue.
        const size_t current_queue_size = update_queue.size();
N
Nikita Mikhaylov 已提交
749 750

        if (current_queue_size > 0)
N
Nikita Mikhaylov 已提交
751
            LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with "
N
cleanup  
Nikita Mikhaylov 已提交
752
                            << current_queue_size+1 << " keys" );
N
Nikita Mikhaylov 已提交
753

N
better  
Nikita Mikhaylov 已提交
754
        std::vector<UpdateUnitPtr> update_request;
N
Nikita Mikhaylov 已提交
755 756
        update_request.reserve(current_queue_size + 1);
        update_request.emplace_back(first_popped);
N
Nikita Mikhaylov 已提交
757

N
Nikita Mikhaylov 已提交
758
        UpdateUnitPtr current_unit_ptr;
N
Nikita Mikhaylov 已提交
759

N
better  
Nikita Mikhaylov 已提交
760
        while (update_queue.tryPop(current_unit_ptr))
N
Nikita Mikhaylov 已提交
761
            update_request.emplace_back(std::move(current_unit_ptr));
N
Nikita Mikhaylov 已提交
762

N
Nikita Mikhaylov 已提交
763
        BunchUpdateUnit bunch_update_unit(update_request);
764

N
Nikita Mikhaylov 已提交
765
        try
N
mvc  
Nikita Mikhaylov 已提交
766
        {
767
            /// Update a bunch of ids.
N
Nikita Mikhaylov 已提交
768
            update(bunch_update_unit);
769 770 771

            /// Notify all threads about finished updating the bunch of ids
            /// where their own ids were included.
772
            std::unique_lock<std::mutex> lock(update_mutex);
N
Nikita Mikhaylov 已提交
773

774 775
            for (auto & unit_ptr: update_request)
                unit_ptr->is_done = true;
N
Nikita Mikhaylov 已提交
776

777
            is_update_finished.notify_all();
N
mvc  
Nikita Mikhaylov 已提交
778
        }
N
Nikita Mikhaylov 已提交
779 780
        catch (...)
        {
781
            std::unique_lock<std::mutex> lock(update_mutex);
782 783 784 785 786
            /// It is a big trouble, because one bad query can make other threads fail with not relative exception.
            /// So at this point all threads (and queries) will receive the same exception.
            for (auto & unit_ptr: update_request)
                unit_ptr->current_exception = std::current_exception();

787
            is_update_finished.notify_all();
N
Nikita Mikhaylov 已提交
788
        }
N
mvc  
Nikita Mikhaylov 已提交
789 790 791
    }
}

792 793 794
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
    std::unique_lock<std::mutex> lock(update_mutex);
795 796 797 798 799 800 801
    const auto sleeping_result = is_update_finished.wait_for(
            lock,
            std::chrono::minutes(each_update_finish_timeout_seconds),
            [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });

    if (!sleeping_result)
        throw DB::Exception("Keys updating timed out", ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
802 803 804 805 806 807

    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
}

void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
N
mvc  
Nikita Mikhaylov 已提交
808
{
809 810 811
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
        throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
        std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
N
mvc  
Nikita Mikhaylov 已提交
812 813
}

N
Nikita Mikhaylov 已提交
814
void CacheDictionary::update(BunchUpdateUnit bunch_update_unit) const
N
Nikita Mikhaylov 已提交
815 816
{
    CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
N
Nikita Mikhaylov 已提交
817
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size());
N
Nikita Mikhaylov 已提交
818

N
Nikita Mikhaylov 已提交
819 820 821
    std::unordered_map<Key, UInt8> remaining_ids{bunch_update_unit.getRequestedIds().size()};
    for (const auto id : bunch_update_unit.getRequestedIds())
        remaining_ids.insert({id, 0});
N
Nikita Mikhaylov 已提交
822

N
Nikita Mikhaylov 已提交
823
    const auto now = std::chrono::system_clock::now();
N
Nikita Mikhaylov 已提交
824

N
Nikita Mikhaylov 已提交
825 826 827 828 829 830 831 832 833 834 835 836
    if (now > backoff_end_time)
    {
        try
        {
            if (error_count)
            {
                /// Recover after error: we have to clone the source here because
                /// it could keep connections which should be reset after error.
                source_ptr = source_ptr->clone();
            }

            Stopwatch watch;
N
Nikita Mikhaylov 已提交
837
            auto stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
N
Nikita Mikhaylov 已提交
838

N
Nikita Mikhaylov 已提交
839
            const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
N
Nikita Mikhaylov 已提交
840 841

            stream->readPrefix();
N
Nikita Mikhaylov 已提交
842
            while (const auto block = stream->read())
N
Nikita Mikhaylov 已提交
843
            {
N
Nikita Mikhaylov 已提交
844 845
                const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
                if (!id_column)
N
Nikita Mikhaylov 已提交
846
                    throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
N
Nikita Mikhaylov 已提交
847

N
Nikita Mikhaylov 已提交
848
                const auto & ids = id_column->getData();
N
Nikita Mikhaylov 已提交
849

N
Nikita Mikhaylov 已提交
850 851
                /// cache column pointers
                const auto column_ptrs = ext::map<std::vector>(
N
Nikita Mikhaylov 已提交
852
                        ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
N
Nikita Mikhaylov 已提交
853

N
Nikita Mikhaylov 已提交
854 855 856
                for (const auto i : ext::range(0, ids.size()))
                {
                    const auto id = ids[i];
N
Nikita Mikhaylov 已提交
857

N
Nikita Mikhaylov 已提交
858
                    const auto find_result = findCellIdx(id, now);
N
Nikita Mikhaylov 已提交
859
                    const auto & cell_idx = find_result.cell_idx;
N
Nikita Mikhaylov 已提交
860

N
Nikita Mikhaylov 已提交
861
                    auto & cell = cells[cell_idx];
N
Nikita Mikhaylov 已提交
862

N
Nikita Mikhaylov 已提交
863 864
                    for (const auto attribute_idx : ext::range(0, attributes.size()))
                    {
N
Nikita Mikhaylov 已提交
865 866
                        const auto & attribute_column = *column_ptrs[attribute_idx];
                        auto & attribute = attributes[attribute_idx];
N
Nikita Mikhaylov 已提交
867

N
Nikita Mikhaylov 已提交
868
                        setAttributeValue(attribute, cell_idx, attribute_column[i]);
N
Nikita Mikhaylov 已提交
869
                    }
N
Nikita Mikhaylov 已提交
870 871 872 873 874 875 876 877

                    /// if cell id is zero and zero does not map to this cell, then the cell is unused
                    if (cell.id == 0 && cell_idx != zero_cell_idx)
                        element_count.fetch_add(1, std::memory_order_relaxed);

                    cell.id = id;
                    if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
                    {
N
Nikita Mikhaylov 已提交
878
                        std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
N
Nikita Mikhaylov 已提交
879
                        cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
N
Nikita Mikhaylov 已提交
880 881
                    }
                    else
N
Nikita Mikhaylov 已提交
882 883
                        cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

N
Nikita Mikhaylov 已提交
884 885

                    bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
N
Nikita Mikhaylov 已提交
886 887
                    /// mark corresponding id as found
                    remaining_ids[id] = 1;
N
Nikita Mikhaylov 已提交
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
                }
            }

            stream->readSuffix();

            error_count = 0;
            last_exception = std::exception_ptr{};
            backoff_end_time = std::chrono::system_clock::time_point{};

            ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
        }
        catch (...)
        {
            ++error_count;
            last_exception = std::current_exception();
            backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));

N
Nikita Mikhaylov 已提交
905
            tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() +
N
Nikita Mikhaylov 已提交
906 907 908 909
                                                 "', next update is scheduled at " + ext::to_string(backoff_end_time));
        }
    }

N
Nikita Mikhaylov 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
    size_t not_found_num = 0, found_num = 0;

    const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};

    /// Check which ids have not been found and require setting null_value
    for (const auto & id_found_pair : remaining_ids)
    {
        if (id_found_pair.second)
        {
            ++found_num;
            continue;
        }
        ++not_found_num;

        const auto id = id_found_pair.first;

        const auto find_result = findCellIdx(id, now);
        const auto & cell_idx = find_result.cell_idx;
        auto & cell = cells[cell_idx];

        if (error_count)
        {
            if (find_result.outdated)
            {
                /// We have expired data for that `id` so we can continue using it.
                bool was_default = cell.isDefault();
                cell.setExpiresAt(backoff_end_time);
                if (was_default)
                    cell.setDefault();
                if (was_default)
                    bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
                else
                    bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
                continue;
            }
            /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
            std::rethrow_exception(last_exception);
        }

        /// Check if cell had not been occupied before and increment element counter if it hadn't
        if (cell.id == 0 && cell_idx != zero_cell_idx)
            element_count.fetch_add(1, std::memory_order_relaxed);

        cell.id = id;

        if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
        {
            std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
            cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
        }
        else
            cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());

        /// Set null_value for each attribute
        cell.setDefault();
        for (auto & attribute : attributes)
            setDefaultAttributeValue(attribute, cell_idx);

        /// inform caller that the cell has not been found
        bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
    }
N
Nikita Mikhaylov 已提交
971

N
Nikita Mikhaylov 已提交
972
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
N
Nikita Mikhaylov 已提交
973 974 975 976
    ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
    ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}

977
}