CacheDictionary.cpp 25.9 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <memory>
#include <Columns/ColumnString.h>
6
#include <Common/BitHelpers.h>
P
proller 已提交
7
#include <Common/CurrentMetrics.h>
8
#include <Common/HashTable/Hash.h>
9
#include <Common/ProfileEvents.h>
P
proller 已提交
10 11
#include <Common/ProfilingScopedRWLock.h>
#include <Common/randomSeed.h>
12
#include <Common/typeid_cast.h>
P
proller 已提交
13 14
#include <ext/range.h>
#include <ext/size.h>
15
#include "CacheDictionary.inc.h"
P
proller 已提交
16 17
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
18

19 20
namespace ProfileEvents
{
P
proller 已提交
21 22 23 24 25 26 27 28 29 30
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
31 32 33 34
}

namespace CurrentMetrics
{
P
proller 已提交
35
extern const Metric DictCacheRequests;
36 37 38
}


39 40 41 42
namespace DB
{
namespace ErrorCodes
{
43 44 45
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
46
    extern const int LOGICAL_ERROR;
47
    extern const int TOO_SMALL_BUFFER_SIZE;
48 49 50
}


P
proller 已提交
51
inline size_t CacheDictionary::getCellIdx(const Key id) const
52
{
53 54 55
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
56 57 58
}


P
proller 已提交
59
CacheDictionary::CacheDictionary(
K
kreuzerkrieg 已提交
60 61 62 63 64 65 66 67 68
    const std::string & name_,
    const DictionaryStructure & dict_struct_,
    DictionarySourcePtr source_ptr_,
    const DictionaryLifetime dict_lifetime_,
    const size_t size_)
    : name{name_}
    , dict_struct(dict_struct_)
    , source_ptr{std::move(source_ptr_)}
    , dict_lifetime(dict_lifetime_)
69
    , log(&Logger::get("ExternalDictionaries"))
K
kreuzerkrieg 已提交
70
    , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
P
proller 已提交
71 72 73
    , size_overlap_mask{this->size - 1}
    , cells{this->size}
    , rnd_engine(randomSeed())
74
{
75
    if (!this->source_ptr->supportsSelectiveLoad())
76
        throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
77

78
    createAttributes();
N
mvc  
Nikita Mikhaylov 已提交
79 80 81 82 83 84 85 86
    update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); });
}

CacheDictionary::~CacheDictionary()
{
    finished = true;
    update_queue.clear();
    update_thread.join();
87 88 89
}


A
Alexey Milovidov 已提交
90
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
91
{
92
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
93

94
    getItemsNumberImpl<UInt64, UInt64>(*hierarchical_attribute, ids, out, [&](const size_t) { return null_value; });
95 96 97
}


98
/// Allow to use single value in same way as array.
P
proller 已提交
99 100 101 102 103 104 105 106
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx)
{
    return arr[idx];
}
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t)
{
    return value;
}
107 108 109


template <typename AncestorType>
P
proller 已提交
110
void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
111
{
112 113
    /// Transform all children to parents until ancestor id or null_value will be reached.

114
    size_t out_size = out.size();
P
proller 已提交
115
    memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
116 117 118

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

119
    PaddedPODArray<Key> children(out_size, 0);
120 121 122 123 124 125 126 127
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

128
        while (out_idx < out_size)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
147
            /// Loop detected
148 149
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
150
                out[out_idx] = 1;
151
            }
A
alexey-milovidov 已提交
152
            /// Found intermediate parent, add this value to search at next loop iteration
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
172 173 174
}

void CacheDictionary::isInVectorVector(
P
proller 已提交
175
    const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
176
{
177
    isInImpl(child_ids, ancestor_ids, out);
178
}
179

P
proller 已提交
180
void CacheDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
181
{
182
    isInImpl(child_ids, ancestor_id, out);
183
}
184

P
proller 已提交
185
void CacheDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
186
{
187
    /// Special case with single child value.
188

189
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
190

191 192 193
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
194

195 196 197 198
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
199

200 201
        if (parent[0] == null_value)
            break;
202

203 204 205
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
206

207
    /// Assuming short hierarchy, so linear search is Ok.
208
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
209
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
210
}
211

A
Alexey Milovidov 已提交
212
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
213
{
214
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
215
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
216

217
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
218

P
proller 已提交
219
    getItemsString(attribute, ids, out, [&](const size_t) { return null_value; });
220 221 222
}

void CacheDictionary::getString(
P
proller 已提交
223
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out) const
224
{
225
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
226
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
227

P
proller 已提交
228
    getItemsString(attribute, ids, out, [&](const size_t row) { return def->getDataAt(row); });
229 230 231
}

void CacheDictionary::getString(
P
proller 已提交
232
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const
233
{
234
    auto & attribute = getAttribute(attribute_name);
K
kreuzerkrieg 已提交
235
    checkAttributeType(name, attribute_name, attribute.type, AttributeUnderlyingType::utString);
236

P
proller 已提交
237
    getItemsString(attribute, ids, out, [&](const size_t) { return StringRef{def}; });
238 239 240
}


241
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
242 243 244 245 246 247 248 249
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
279 280
}

A
Alexey Milovidov 已提交
281
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
282
{
N
mvc  
Nikita Mikhaylov 已提交
283 284 285 286 287
    /// There are three types of ids.
    /// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
    /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
    /// - CacheNotFound ids. We have to go to external storage to know its value.

288
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
N
mvc  
Nikita Mikhaylov 已提交
289 290
    std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
    std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
291 292 293

    size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;

N
more  
Nikita Mikhaylov 已提交
294
    const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
N
mvc  
Nikita Mikhaylov 已提交
295

296 297 298 299 300 301 302 303 304 305 306 307 308 309
    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
            if (!find_result.valid)
            {
                if (find_result.outdated)
N
mvc  
Nikita Mikhaylov 已提交
310 311
                {
                    cache_expired_ids[id].push_back(row);
312
                    ++cache_expired;
N
mvc  
Nikita Mikhaylov 已提交
313 314 315 316 317 318 319

                    if (allow_read_expired_keys_from_cache_dictionary)
                    {
                        const auto & cell = cells[cell_idx];
                        out[row] = !cell.isDefault();
                    }
                }
320
                else
N
mvc  
Nikita Mikhaylov 已提交
321 322
                {
                    cache_not_found_ids[id].push_back(row);
323
                    ++cache_not_found;
N
mvc  
Nikita Mikhaylov 已提交
324
                }
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
            }
            else
            {
                ++cache_hit;
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            }
        }
    }

    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
N
mvc  
Nikita Mikhaylov 已提交
340 341
    const size_t outdated_ids_count = cache_expired + cache_not_found;
    hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
342

N
mvc  
Nikita Mikhaylov 已提交
343 344
    /// We have no keys to update.
    if (outdated_ids_count == 0)
345 346
        return;

N
mvc  
Nikita Mikhaylov 已提交
347 348 349 350 351 352 353
    /// Schedule an update job for expired keys. (At this point we have only expired keys.)
    /// No need to wait for expired keys being updated.
    if (allow_read_expired_keys_from_cache_dictionary && cache_not_found == 0)
    {
        std::vector<Key> required_expired_ids(cache_expired_ids.size());
        std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
        /// Callbacks are empty because we don't want to receive them after an unknown period of time.
N
better  
Nikita Mikhaylov 已提交
354 355
        auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto){} );
        if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
N
mvc  
Nikita Mikhaylov 已提交
356 357 358
            throw std::runtime_error("Can't schedule an update job.");
        return;
    }
359

N
mvc  
Nikita Mikhaylov 已提交
360 361 362
    /// At this point we have two situations. There may be both types of keys: expired and not found.
    /// We will update them all synchronously.

N
better  
Nikita Mikhaylov 已提交
363 364 365 366
    std::vector<Key> required_ids;
    required_ids.reserve(outdated_ids_count);
    std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
    std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
N
mvc  
Nikita Mikhaylov 已提交
367

N
better  
Nikita Mikhaylov 已提交
368
    auto update_unit_ptr = std::make_shared<UpdateUnit>(
N
mvc  
Nikita Mikhaylov 已提交
369 370 371 372
            std::move(required_ids),
            [&](const Key id, const size_t) {
                for (const auto row : cache_not_found_ids[id])
                    out[row] = true;
N
better  
Nikita Mikhaylov 已提交
373 374
                for (const auto row : cache_expired_ids[id])
                    out[row] = true;
N
mvc  
Nikita Mikhaylov 已提交
375 376 377 378
            },
            [&](const Key id, const size_t) {
                for (const auto row : cache_not_found_ids[id])
                    out[row] = false;
N
better  
Nikita Mikhaylov 已提交
379 380
                for (const auto row : cache_expired_ids[id])
                    out[row] = true;
N
mvc  
Nikita Mikhaylov 已提交
381
            }
N
better  
Nikita Mikhaylov 已提交
382
    );
N
mvc  
Nikita Mikhaylov 已提交
383

N
better  
Nikita Mikhaylov 已提交
384
    if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
N
mvc  
Nikita Mikhaylov 已提交
385 386
        throw std::runtime_error("Too many updates");

N
better  
Nikita Mikhaylov 已提交
387
//    waitForCurrentUpdateFinish();
N
Nikita Mikhaylov 已提交
388
    while (!update_unit_ptr->is_done && !update_unit_ptr->current_exception) {
N
better  
Nikita Mikhaylov 已提交
389 390 391
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        std::this_thread::yield();
    }
N
Nikita Mikhaylov 已提交
392 393 394

    if (update_unit_ptr->current_exception)
        std::rethrow_exception(update_unit_ptr->current_exception);
395 396 397 398 399
}


void CacheDictionary::createAttributes()
{
400 401
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
402 403

    bytes_allocated += size * sizeof(CellMetadata);
404
    bytes_allocated += attributes_size * sizeof(attributes.front());
405 406 407 408 409 410 411 412

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
P
proller 已提交
413
            hierarchical_attribute = &attributes.back();
414

K
kreuzerkrieg 已提交
415
            if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
416
                throw Exception{name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
417 418
        }
    }
419 420
}

A
Alexey Milovidov 已提交
421
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
422
{
A
Alexey Milovidov 已提交
423
    Attribute attr{type, {}, {}};
424 425 426

    switch (type)
    {
P
proller 已提交
427
#define DISPATCH(TYPE) \
K
kreuzerkrieg 已提交
428
    case AttributeUnderlyingType::ut##TYPE: \
P
proller 已提交
429
        attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
P
proller 已提交
430 431
        attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
        bytes_allocated += size * sizeof(TYPE); \
P
proller 已提交
432
        break;
A
Amos Bird 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
K
kreuzerkrieg 已提交
448
        case AttributeUnderlyingType::utString:
A
Alexey Milovidov 已提交
449 450
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
451 452 453 454 455 456 457
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
458 459
}

A
Alexey Milovidov 已提交
460
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
461
{
462 463
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
464
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
465 466
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
467
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
468 469
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
470
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
471 472
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
473
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
474 475
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
476
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
477 478
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
479
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
480 481
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
482
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
483 484
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
485
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
486 487
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
488
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
489 490
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
491
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
492 493
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
494
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
495 496
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
            break;
497

K
kreuzerkrieg 已提交
498
        case AttributeUnderlyingType::utDecimal32:
499 500
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
501
        case AttributeUnderlyingType::utDecimal64:
502 503
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
K
kreuzerkrieg 已提交
504
        case AttributeUnderlyingType::utDecimal128:
505 506 507
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

K
kreuzerkrieg 已提交
508
        case AttributeUnderlyingType::utString:
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
524 525
}

A
Alexey Milovidov 已提交
526
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
527
{
528 529
    switch (attribute.type)
    {
K
kreuzerkrieg 已提交
530
        case AttributeUnderlyingType::utUInt8:
P
proller 已提交
531 532
            std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
533
        case AttributeUnderlyingType::utUInt16:
P
proller 已提交
534 535
            std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
536
        case AttributeUnderlyingType::utUInt32:
P
proller 已提交
537 538
            std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
539
        case AttributeUnderlyingType::utUInt64:
P
proller 已提交
540 541
            std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
            break;
K
kreuzerkrieg 已提交
542
        case AttributeUnderlyingType::utUInt128:
P
proller 已提交
543 544
            std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
            break;
K
kreuzerkrieg 已提交
545
        case AttributeUnderlyingType::utInt8:
P
proller 已提交
546 547
            std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
548
        case AttributeUnderlyingType::utInt16:
P
proller 已提交
549 550
            std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
551
        case AttributeUnderlyingType::utInt32:
P
proller 已提交
552 553
            std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
554
        case AttributeUnderlyingType::utInt64:
P
proller 已提交
555 556
            std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
            break;
K
kreuzerkrieg 已提交
557
        case AttributeUnderlyingType::utFloat32:
P
proller 已提交
558 559
            std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
            break;
K
kreuzerkrieg 已提交
560
        case AttributeUnderlyingType::utFloat64:
P
proller 已提交
561 562 563
            std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
            break;

K
kreuzerkrieg 已提交
564
        case AttributeUnderlyingType::utDecimal32:
P
proller 已提交
565 566
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
            break;
K
kreuzerkrieg 已提交
567
        case AttributeUnderlyingType::utDecimal64:
P
proller 已提交
568 569
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
            break;
K
kreuzerkrieg 已提交
570
        case AttributeUnderlyingType::utDecimal128:
P
proller 已提交
571 572
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
            break;
573

K
kreuzerkrieg 已提交
574
        case AttributeUnderlyingType::utString:
575 576 577 578 579 580 581 582 583
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

584 585
            const auto str_size = string.size();
            if (str_size != 0)
586
            {
587 588 589
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
590 591 592 593 594 595 596
            }
            else
                string_ref = {};

            break;
        }
    }
597 598
}

A
Alexey Milovidov 已提交
599
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
600
{
601 602
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
603
        throw Exception{name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
604 605

    return attributes[it->second];
606 607
}

608 609
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
P
proller 已提交
610 611
    return (idx != zero_cell_idx && cells[idx].id == 0)
        || (cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
612 613 614 615
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
616 617
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

618 619 620
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
621
        auto & cell = cells[idx];
622
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
623 624 625 626 627 628 629
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

630
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
631
{
632
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
633
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
634 635
}

636 637 638 639 640 641
std::exception_ptr CacheDictionary::getLastException() const
{
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
    return last_exception;
}

642 643
void registerDictionaryCache(DictionaryFactory & factory)
{
P
proller 已提交
644 645 646 647
    auto create_layout = [=](const std::string & name,
                             const DictionaryStructure & dict_struct,
                             const Poco::Util::AbstractConfiguration & config,
                             const std::string & config_prefix,
P
proller 已提交
648 649
                             DictionarySourcePtr source_ptr) -> DictionaryPtr
    {
650
        if (dict_struct.key)
P
proller 已提交
651
            throw Exception{"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
652 653

        if (dict_struct.range_min || dict_struct.range_max)
P
proller 已提交
654 655 656 657
            throw Exception{name
                                + ": elements .structure.range_min and .structure.range_max should be defined only "
                                  "for a dictionary of layout 'range_hashed'",
                            ErrorCodes::BAD_ARGUMENTS};
658 659 660
        const auto & layout_prefix = config_prefix + ".layout";
        const auto size = config.getInt(layout_prefix + ".cache.size_in_cells");
        if (size == 0)
P
proller 已提交
661
            throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
662 663 664

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
P
proller 已提交
665 666
            throw Exception{name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
                            ErrorCodes::BAD_ARGUMENTS};
667

P
proller 已提交
668
        const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
669 670
        return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
    };
671
    factory.registerLayout("cache", create_layout, false);
672 673
}

N
mvc  
Nikita Mikhaylov 已提交
674 675
void CacheDictionary::updateThreadFunction()
{
N
Nikita Mikhaylov 已提交
676
    while (!finished)
N
mvc  
Nikita Mikhaylov 已提交
677
    {
N
Nikita Mikhaylov 已提交
678 679 680
        UpdateUnitPtr unit_ptr;
        update_queue.pop(unit_ptr);
        try
N
mvc  
Nikita Mikhaylov 已提交
681
        {
N
better  
Nikita Mikhaylov 已提交
682 683
            update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found);
            unit_ptr->is_done = true;
N
mvc  
Nikita Mikhaylov 已提交
684 685
            last_update.fetch_add(1);
        }
N
Nikita Mikhaylov 已提交
686 687 688 689
        catch (...)
        {
            unit_ptr->current_exception = std::current_exception();
        }
N
mvc  
Nikita Mikhaylov 已提交
690 691 692 693 694 695 696 697 698 699
    }
}

void CacheDictionary::waitForCurrentUpdateFinish() const
{
    size_t current_update_number = update_number.fetch_add(1);
    while (last_update != current_update_number)
        std::this_thread::yield();
}

700
}