CacheDictionary.cpp 27.6 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include <Common/setThreadName.h>
16
#include "CacheDictionary.inc.h"
P
proller 已提交
17 18
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
19

20 21
namespace ProfileEvents
{
P
proller 已提交
22 23 24 25 26 27 28 29 30 31
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
32 33 34 35
}

namespace CurrentMetrics
{
P
proller 已提交
36
extern const Metric DictCacheRequests;
37 38 39
}


40 41 42 43
namespace DB
{
namespace ErrorCodes
{
44 45 46
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
47
    extern const int LOGICAL_ERROR;
48
    extern const int TOO_SMALL_BUFFER_SIZE;
49 50 51
}


P
proller 已提交
52
inline size_t CacheDictionary::getCellIdx(const Key id) const
53
{
54 55 56
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
57 58 59
}


P
proller 已提交
60
CacheDictionary::CacheDictionary(
K
kreuzerkrieg 已提交
61 62 63 64
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
    const DictionaryLifetime dict_lifetime_,
65 66 67 68
    const size_t size_,
    const bool allow_read_expired_keys_,
    const size_t max_update_queue_size_,
    const size_t update_queue_push_timeout_milliseconds_)
K
kreuzerkrieg 已提交
69 70 71 72
    : name{name_}
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
73 74 75
    , allow_read_expired_keys(allow_read_expired_keys_)
    , max_update_queue_size(max_update_queue_size_)
    , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
76
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
77
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
78 79 80
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
81
    , update_queue(max_update_queue_size_)
82
{
83
    if (!this->source_ptr->supportsSelectiveLoad())
84
        throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
85

86
    createAttributes();
N
mvc  
Nikita Mikhaylov 已提交
87 88 89 90 91 92 93 94
    update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); });
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
    update_thread.join();
95 96 97
}


A
Alexey Milovidov 已提交
98
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
99
{
100
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
101

102
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
103 104 105
}


106
/// Allow to use single value in same way as array.
P
proller 已提交
107 108 109 110 111 112 113 114
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
115 116 117


template <typename AncestorType>
P
proller 已提交
118
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
119
{
120 121
    /// Transform all children to parents until ancestor id or null_value will be reached.

122
    size_t out_size = out.size();
P
proller 已提交
123
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
124 125 126

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

127
    PaddedPODArray<Key> children(out_size, 0);
128 129 130 131 132 133 134 135
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

136
        while (out_idx < out_size)
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
155
            /// Loop detected
156 157
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
158
                out[out_idx] = 1;
159
            }
A
alexey-milovidov 已提交
160
            /// Found intermediate parent, add this value to search at next loop iteration
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
180 181 182
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
183
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
184
{
185
    isInImpl(child_ids, ancestor_ids, out);
186
}
187

P
proller 已提交
188
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
189
{
190
    isInImpl(child_ids, ancestor_id, out);
191
}
192

P
proller 已提交
193
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
194
{
195
    /// Special case with single child value.
196

197
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
198

199 200 201
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
202

203 204 205 206
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
207

208 209
        if (parent[0] == null_value)
            break;
210

211 212 213
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
214

215
    /// Assuming short hierarchy, so linear search is Ok.
216
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
217
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
218
}
219

A
Alexey Milovidov 已提交
220
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
221
{
222
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
223
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
224

225
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
226

P
proller 已提交
227
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
228 229 230
}

void CacheDictionary::getString(
P
proller 已提交
231
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
232
{
233
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
234
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
235

P
proller 已提交
236
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
237 238 239
}

void CacheDictionary::getString(
P
proller 已提交
240
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
241
{
242
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
243
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
244

P
proller 已提交
245
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
246 247 248
}


249
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
250 251 252 253 254 255 256 257
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
287 288
}

A
Alexey Milovidov 已提交
289
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
290
{
N
mvc  
Nikita Mikhaylov 已提交
291 292 293 294 295
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

296
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
297 298
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
299

300
    size_t cache_hit = 0;
N
mvc  
Nikita Mikhaylov 已提交
301

302 303 304 305 306 307 308 309 310 311 312
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
313 314 315 316 317 318 319

            auto insert_to_answer_routine = [&] ()
            {
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            };

320 321 322
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
323 324 325
                {
                    cache_expired_ids[id].push_back(row);

326 327
                    if (allow_read_expired_keys)
                        insert_to_answer_routine();
N
mvc  
Nikita Mikhaylov 已提交
328
                }
329
                else
N
mvc  
Nikita Mikhaylov 已提交
330 331 332
                {
                    cache_not_found_ids[id].push_back(row);
                }
333 334 335 336
            }
            else
            {
                ++cache_hit;
337
                insert_to_answer_routine();
338 339 340 341
            }
        }
    }

342 343
    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
344 345 346
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
347
    hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
348

349
    if (cache_not_found_ids.empty())
N
mvc  
Nikita Mikhaylov 已提交
350
    {
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
        /// Nothing to update - return;
        if (cache_expired_ids.empty())
            return;

        if (allow_read_expired_keys)
        {
            std::vector<Key> required_expired_ids;
            required_expired_ids.reserve(cache_expired_ids.size());
            std::transform(
                    std::begin(cache_expired_ids), std::end(cache_expired_ids),
                    std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });

            /// Callbacks are empty because we don't want to receive them after an unknown period of time.
            auto update_unit_ptr = std::make_shared<UpdateUnit>(
                    required_expired_ids,
                    [&](const auto, const auto) {},
                    [&](const auto, const auto) {} );

            tryPushToUpdateQueueOrThrow(update_unit_ptr);
            /// Update is async - no need to wait.
            return;
        }
N
mvc  
Nikita Mikhaylov 已提交
373
    }
374

375 376
    /// At this point we have two situations.
    /// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
N
mvc  
Nikita Mikhaylov 已提交
377 378
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
379
    std::vector<Key> required_ids;
380 381 382 383 384 385 386
    required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
    std::transform(
            std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(
            std::begin(cache_expired_ids), std::end(cache_expired_ids),
            std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
387

N
better  
Nikita Mikhaylov 已提交
388
    auto update_unit_ptr = std::make_shared<UpdateUnit>(
N
mvc  
Nikita Mikhaylov 已提交
389
            std::move(required_ids),
390 391
            [&](const Key id, const size_t)
            {
N
mvc  
Nikita Mikhaylov 已提交
392 393
                for (const auto row : cache_not_found_ids[id])
                    out[row] = true;
N
better  
Nikita Mikhaylov 已提交
394 395
                for (const auto row : cache_expired_ids[id])
                    out[row] = true;
N
mvc  
Nikita Mikhaylov 已提交
396
            },
397 398
            [&](const Key id, const size_t)
            {
N
mvc  
Nikita Mikhaylov 已提交
399 400
                for (const auto row : cache_not_found_ids[id])
                    out[row] = false;
N
better  
Nikita Mikhaylov 已提交
401 402
                for (const auto row : cache_expired_ids[id])
                    out[row] = true;
N
mvc  
Nikita Mikhaylov 已提交
403
            }
N
better  
Nikita Mikhaylov 已提交
404
    );
N
mvc  
Nikita Mikhaylov 已提交
405

406 407
    tryPushToUpdateQueueOrThrow(update_unit_ptr);
    waitForCurrentUpdateFinish(update_unit_ptr);
408 409 410 411 412
}


void CacheDictionary::createAttributes()
{
413 414
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
415 416

    bytes_allocated += size * sizeof(CellMetadata);
417
    bytes_allocated += attributes_size * sizeof(attributes.front());
418 419 420 421 422 423 424 425

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
426
            hierarchical_attribute = &attributes.back();
427

K
kreuzerkrieg 已提交
428
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
429
                throw Exception{name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
430 431
        }
    }
432 433
}

A
Alexey Milovidov 已提交
434
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
435
{
A
Alexey Milovidov 已提交
436
    Attribute attr{type, {}, {}};
437 438 439

    switch (type)
    {
P
proller 已提交
440
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
441
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
442
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
443 444
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
445
        break;
A
Amos Bird 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
461
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
462 463
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
464 465 466 467 468 469 470
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
471 472
}

A
Alexey Milovidov 已提交
473
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
474
{
475 476
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
477
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
478 479
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
480
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
481 482
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
483
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
484 485
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
486
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
487 488
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
489
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
490 491
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
492
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
493 494
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
495
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
496 497
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
498
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
499 500
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
501
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
502 503
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
504
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
505 506
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
507
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
508 509
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
510

K
kreuzerkrieg 已提交
511
        case AttributeUnderlyingType::utDecimal32:
512 513
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
514
        case AttributeUnderlyingType::utDecimal64:
515 516
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
517
        case AttributeUnderlyingType::utDecimal128:
518 519 520
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
521
        case AttributeUnderlyingType::utString:
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
537 538
}

A
Alexey Milovidov 已提交
539
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
540
{
541 542
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
543
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
544 545
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
546
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
547 548
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
549
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
550 551
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
552
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
553 554
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
555
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
556 557
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
558
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
559 560
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
561
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
562 563
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
564
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
565 566
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
567
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
568 569
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
570
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
571 572
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
573
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
574 575 576
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
577
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
578 579
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
580
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
581 582
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
583
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
584 585
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
586

K
kreuzerkrieg 已提交
587
        case AttributeUnderlyingType::utString:
588 589 590 591 592 593 594 595 596
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

597 598
            const auto str_size = string.size();
            if (str_size != 0)
599
            {
600 601 602
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
603 604 605 606 607 608 609
            }
            else
                string_ref = {};

            break;
        }
    }
610 611
}

A
Alexey Milovidov 已提交
612
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
613
{
614 615
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
616
        throw Exception{name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
617 618

    return attributes[it->second];
619 620
}

621 622
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
623 624
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
625 626 627 628
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
629 630
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

631 632 633
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
634
        auto & cell = cells[idx];
635
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
636 637 638 639 640 641 642
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

643
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
644
{
645
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
646
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
647 648
}

649 650 651 652 653 654
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

655 656
void registerDictionaryCache(DictionaryFactory & factory)
{
P
proller 已提交
657 658 659 660
    auto create_layout = [=](const std::string & name,
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
661 662
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
663
        if (dict_struct.key)
664 665
            throw Exception{"'key' is not supported for dictionary of layout 'cache'",
                            ErrorCodes::UNSUPPORTED_METHOD};
666 667

        if (dict_struct.range_min || dict_struct.range_max)
P
proller 已提交
668 669 670 671
            throw Exception{name
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
672
        const auto & layout_prefix = config_prefix + ".layout";
673

N
Nikita Mikhaylov 已提交
674
        const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
675
        if (size == 0)
676 677
            throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};
678 679 680

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
P
proller 已提交
681 682
            throw Exception{name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
                            ErrorCodes::BAD_ARGUMENTS};
683

P
proller 已提交
684
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
685

N
Nikita Mikhaylov 已提交
686
        const size_t max_update_queue_size =
687 688 689 690 691 692 693 694
                config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
        if (max_update_queue_size == 0)
            throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
                            ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool allow_read_expired_keys =
                config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);

N
Nikita Mikhaylov 已提交
695
        const size_t update_queue_push_timeout_milliseconds =
696 697 698 699 700 701 702
                config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
        if (update_queue_push_timeout_milliseconds < 10)
            throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
                            ErrorCodes::BAD_ARGUMENTS};

        return std::make_unique<CacheDictionary>(
                name, dict_struct, std::move(source_ptr), dict_lifetime, size,
N
Nikita Mikhaylov 已提交
703
                allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds);
704
    };
705
    factory.registerLayout("cache", create_layout, false);
706 707
}

N
mvc  
Nikita Mikhaylov 已提交
708 709
void CacheDictionary::updateThreadFunction()
{
710
    setThreadName("AsyncUpdater");
N
Nikita Mikhaylov 已提交
711
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
712
    {
N
Nikita Mikhaylov 已提交
713 714 715
        UpdateUnitPtr unit_ptr;
        update_queue.pop(unit_ptr);
        try
N
mvc  
Nikita Mikhaylov 已提交
716
        {
N
better  
Nikita Mikhaylov 已提交
717
            update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found);
718
            std::unique_lock<std::mutex> lock(update_mutex);
N
better  
Nikita Mikhaylov 已提交
719
            unit_ptr->is_done = true;
720
            is_update_finished.notify_all();
N
mvc  
Nikita Mikhaylov 已提交
721
        }
N
Nikita Mikhaylov 已提交
722 723
        catch (...)
        {
724
            std::unique_lock<std::mutex> lock(update_mutex);
N
Nikita Mikhaylov 已提交
725
            unit_ptr->current_exception = std::current_exception();
726
            is_update_finished.notify_all();
N
Nikita Mikhaylov 已提交
727
        }
N
mvc  
Nikita Mikhaylov 已提交
728 729 730
    }
}

731 732 733 734 735 736 737 738 739 740 741
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
    std::unique_lock<std::mutex> lock(update_mutex);
    is_update_finished.wait(lock,
            [&] () {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });

    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
}

void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
N
mvc  
Nikita Mikhaylov 已提交
742
{
743 744 745
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
        throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
        std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
N
mvc  
Nikita Mikhaylov 已提交
746 747
}

748
}