CacheDictionary.cpp 22.1 KB
Newer Older
1 2
#include "CacheDictionary.h"

3
#include <functional>
4 5
#include <sstream>
#include <memory>
6
#include <Columns/ColumnsNumber.h>
7
#include <Columns/ColumnString.h>
8 9 10 11 12
#include <Common/BitHelpers.h>
#include <Common/randomSeed.h>
#include <Common/HashTable/Hash.h>
#include <Common/Stopwatch.h>
#include <Common/ProfilingScopedRWLock.h>
13 14
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
15
#include <Common/typeid_cast.h>
16
#include "DictionaryBlockInputStream.h"
17 18 19
#include <ext/size.h>
#include <ext/range.h>
#include <ext/map.h>
20 21
#include "DictionaryFactory.h"
#include "CacheDictionary.inc.h"
22

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
namespace ProfileEvents
{
    extern const Event DictCacheKeysRequested;
    extern const Event DictCacheKeysRequestedMiss;
    extern const Event DictCacheKeysRequestedFound;
    extern const Event DictCacheKeysExpired;
    extern const Event DictCacheKeysNotFound;
    extern const Event DictCacheKeysHit;
    extern const Event DictCacheRequestTimeNs;
    extern const Event DictCacheRequests;
    extern const Event DictCacheLockWriteNs;
    extern const Event DictCacheLockReadNs;
}

namespace CurrentMetrics
{
    extern const Metric DictCacheRequests;
}


43 44 45 46 47
namespace DB
{

namespace ErrorCodes
{
48 49 50
    extern const int TYPE_MISMATCH;
    extern const int BAD_ARGUMENTS;
    extern const int UNSUPPORTED_METHOD;
A
Alexey Milovidov 已提交
51
    extern const int LOGICAL_ERROR;
52
    extern const int TOO_SMALL_BUFFER_SIZE;
53 54 55
}


P
proller 已提交
56
inline size_t CacheDictionary::getCellIdx(const Key id) const
57
{
58 59 60
    const auto hash = intHash64(id);
    const auto idx = hash & size_overlap_mask;
    return idx;
61 62 63 64
}


CacheDictionary::CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct,
65
    DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime,
66
    const size_t size)
67 68 69 70 71
    : name{name}, dict_struct(dict_struct),
        source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
        size{roundUpToPowerOfTwoOrZero(std::max(size, size_t(max_collision_length)))},
        size_overlap_mask{this->size - 1},
        cells{this->size},
72
        rnd_engine(randomSeed())
73
{
74
    if (!this->source_ptr->supportsSelectiveLoad())
75
        throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
76

77
    createAttributes();
78 79 80
}

CacheDictionary::CacheDictionary(const CacheDictionary & other)
81
    : CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
82 83 84
{}


A
Alexey Milovidov 已提交
85
void CacheDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
86
{
87
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
88

89
    getItemsNumber<UInt64>(*hierarchical_attribute, ids, out, [&] (const size_t) { return null_value; });
90 91 92
}


93 94
/// Allow to use single value in same way as array.
static inline CacheDictionary::Key getAt(const PaddedPODArray<CacheDictionary::Key> & arr, const size_t idx) { return arr[idx]; }
A
Alexey Milovidov 已提交
95
static inline CacheDictionary::Key getAt(const CacheDictionary::Key & value, const size_t) { return value; }
96 97 98 99


template <typename AncestorType>
void CacheDictionary::isInImpl(
100 101 102
    const PaddedPODArray<Key> & child_ids,
    const AncestorType & ancestor_ids,
    PaddedPODArray<UInt8> & out) const
103
{
104 105
    /// Transform all children to parents until ancestor id or null_value will be reached.

106 107
    size_t out_size = out.size();
    memset(out.data(), 0xFF, out_size);        /// 0xFF means "not calculated"
108 109 110

    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);

111
    PaddedPODArray<Key> children(out_size);
112 113 114 115 116 117 118 119
    PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());

    while (true)
    {
        size_t out_idx = 0;
        size_t parents_idx = 0;
        size_t new_children_idx = 0;

120
        while (out_idx < out_size)
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
        {
            /// Already calculated
            if (out[out_idx] != 0xFF)
            {
                ++out_idx;
                continue;
            }

            /// No parent
            if (parents[parents_idx] == null_value)
            {
                out[out_idx] = 0;
            }
            /// Found ancestor
            else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
            {
                out[out_idx] = 1;
            }
A
alexey-milovidov 已提交
139
            /// Loop detected
140 141
            else if (children[new_children_idx] == parents[parents_idx])
            {
P
proller 已提交
142
                out[out_idx] = 1;
143
            }
A
alexey-milovidov 已提交
144
            /// Found intermediate parent, add this value to search at next loop iteration
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
            else
            {
                children[new_children_idx] = parents[parents_idx];
                ++new_children_idx;
            }

            ++out_idx;
            ++parents_idx;
        }

        if (new_children_idx == 0)
            break;

        /// Transform all children to its parents.
        children.resize(new_children_idx);
        parents.resize(new_children_idx);

        toParent(children, parents);
    }
164 165 166
}

void CacheDictionary::isInVectorVector(
167 168 169
    const PaddedPODArray<Key> & child_ids,
    const PaddedPODArray<Key> & ancestor_ids,
    PaddedPODArray<UInt8> & out) const
170
{
171
    isInImpl(child_ids, ancestor_ids, out);
172
}
173

174
void CacheDictionary::isInVectorConstant(
175 176 177
    const PaddedPODArray<Key> & child_ids,
    const Key ancestor_id,
    PaddedPODArray<UInt8> & out) const
178
{
179
    isInImpl(child_ids, ancestor_id, out);
180
}
181

182
void CacheDictionary::isInConstantVector(
183 184 185
    const Key child_id,
    const PaddedPODArray<Key> & ancestor_ids,
    PaddedPODArray<UInt8> & out) const
186
{
187
    /// Special case with single child value.
188

189
    const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
190

191 192 193
    PaddedPODArray<Key> child(1, child_id);
    PaddedPODArray<Key> parent(1);
    std::vector<Key> ancestors(1, child_id);
194

195 196 197 198
    /// Iteratively find all ancestors for child.
    while (true)
    {
        toParent(child, parent);
199

200 201
        if (parent[0] == null_value)
            break;
202

203 204 205
        child[0] = parent[0];
        ancestors.push_back(parent[0]);
    }
206

207
    /// Assuming short hierarchy, so linear search is Ok.
208
    for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
209
        out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
210
}
211

A
Alexey Milovidov 已提交
212
void CacheDictionary::getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const
213
{
214 215
    auto & attribute = getAttribute(attribute_name);
    if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
216
        throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
217

218
    const auto null_value = StringRef{std::get<String>(attribute.null_values)};
219

220
    getItemsString(attribute, ids, out, [&] (const size_t) { return null_value; });
221 222 223
}

void CacheDictionary::getString(
224 225
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def,
    ColumnString * const out) const
226
{
227 228
    auto & attribute = getAttribute(attribute_name);
    if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
229
        throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
230

231
    getItemsString(attribute, ids, out, [&] (const size_t row) { return def->getDataAt(row); });
232 233 234
}

void CacheDictionary::getString(
235 236
    const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def,
    ColumnString * const out) const
237
{
238 239
    auto & attribute = getAttribute(attribute_name);
    if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
240
        throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
241

242
    getItemsString(attribute, ids, out, [&] (const size_t) { return StringRef{def}; });
243 244 245
}


246
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
247 248 249 250 251 252 253 254
/// true  false   found and valid
/// false true    not found (something outdated, maybe our cell)
/// false false   not found (other id stored with valid data)
/// true  true    impossible
///
/// todo: split this func to two: find_for_get and find_for_set
CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const CellMetadata::time_point_t now) const
{
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
    auto pos = getCellIdx(id);
    auto oldest_id = pos;
    auto oldest_time = CellMetadata::time_point_t::max();
    const auto stop = pos + max_collision_length;
    for (; pos < stop; ++pos)
    {
        const auto cell_idx = pos & size_overlap_mask;
        const auto & cell = cells[cell_idx];

        if (cell.id != id)
        {
            /// maybe we already found nearest expired cell (try minimize collision_length on insert)
            if (oldest_time > now && oldest_time > cell.expiresAt())
            {
                oldest_time = cell.expiresAt();
                oldest_id = cell_idx;
            }
            continue;
        }

        if (cell.expiresAt() < now)
        {
            return {cell_idx, false, true};
        }

        return {cell_idx, true, false};
    }

    return {oldest_id, false, false};
284 285
}

A
Alexey Milovidov 已提交
286
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
287
{
288
    /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
289
    std::unordered_map<Key, std::vector<size_t>> outdated_ids;
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335

    size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;

    const auto rows = ext::size(ids);
    {
        const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

        const auto now = std::chrono::system_clock::now();
        /// fetch up-to-date values, decide which ones require update
        for (const auto row : ext::range(0, rows))
        {
            const auto id = ids[row];
            const auto find_result = findCellIdx(id, now);
            const auto & cell_idx = find_result.cell_idx;
            if (!find_result.valid)
            {
                outdated_ids[id].push_back(row);
                if (find_result.outdated)
                    ++cache_expired;
                else
                    ++cache_not_found;
            }
            else
            {
                ++cache_hit;
                const auto & cell = cells[cell_idx];
                out[row] = !cell.isDefault();
            }
        }
    }

    ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
    ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
    ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);

    query_count.fetch_add(rows, std::memory_order_relaxed);
    hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);

    if (outdated_ids.empty())
        return;

    std::vector<Key> required_ids(outdated_ids.size());
    std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
        [] (auto & pair) { return pair.first; });

    /// request new values
A
Alexey Milovidov 已提交
336 337 338
    update(required_ids,
    [&] (const auto id, const auto)
    {
339 340
        for (const auto row : outdated_ids[id])
            out[row] = true;
A
Alexey Milovidov 已提交
341 342 343
    },
    [&] (const auto id, const auto)
    {
344 345 346
        for (const auto row : outdated_ids[id])
            out[row] = false;
    });
347 348 349 350 351
}


void CacheDictionary::createAttributes()
{
352 353
    const auto attributes_size = dict_struct.attributes.size();
    attributes.reserve(attributes_size);
354 355

    bytes_allocated += size * sizeof(CellMetadata);
356
    bytes_allocated += attributes_size * sizeof(attributes.front());
357 358 359 360 361 362 363 364

    for (const auto & attribute : dict_struct.attributes)
    {
        attribute_index_by_name.emplace(attribute.name, attributes.size());
        attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));

        if (attribute.hierarchical)
        {
365
            hierarchical_attribute = & attributes.back();
366 367

            if (hierarchical_attribute->type != AttributeUnderlyingType::UInt64)
368
                throw Exception{name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
369 370
        }
    }
371 372
}

A
Alexey Milovidov 已提交
373
CacheDictionary::Attribute CacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
374
{
A
Alexey Milovidov 已提交
375
    Attribute attr{type, {}, {}};
376 377 378

    switch (type)
    {
A
Amos Bird 已提交
379 380
#define DISPATCH(TYPE) \
        case AttributeUnderlyingType::TYPE: \
A
Alexey Milovidov 已提交
381
            attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
A
Amos Bird 已提交
382 383
            attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
            bytes_allocated += size * sizeof(TYPE); \
384
            break;
A
Amos Bird 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
        DISPATCH(UInt8)
        DISPATCH(UInt16)
        DISPATCH(UInt32)
        DISPATCH(UInt64)
        DISPATCH(UInt128)
        DISPATCH(Int8)
        DISPATCH(Int16)
        DISPATCH(Int32)
        DISPATCH(Int64)
        DISPATCH(Decimal32)
        DISPATCH(Decimal64)
        DISPATCH(Decimal128)
        DISPATCH(Float32)
        DISPATCH(Float64)
#undef DISPATCH
400
        case AttributeUnderlyingType::String:
A
Alexey Milovidov 已提交
401 402
            attr.null_values = null_value.get<String>();
            attr.arrays = std::make_unique<ContainerType<StringRef>>(size);
403 404 405 406 407 408 409
            bytes_allocated += size * sizeof(StringRef);
            if (!string_arena)
                string_arena = std::make_unique<ArenaWithFreeLists>();
            break;
    }

    return attr;
410 411
}

A
Alexey Milovidov 已提交
412
void CacheDictionary::setDefaultAttributeValue(Attribute & attribute, const Key idx) const
413
{
414 415 416 417 418 419
    switch (attribute.type)
    {
        case AttributeUnderlyingType::UInt8: std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values); break;
        case AttributeUnderlyingType::UInt16: std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values); break;
        case AttributeUnderlyingType::UInt32: std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values); break;
        case AttributeUnderlyingType::UInt64: std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values); break;
420
        case AttributeUnderlyingType::UInt128: std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values); break;
421 422 423 424 425 426
        case AttributeUnderlyingType::Int8: std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values); break;
        case AttributeUnderlyingType::Int16: std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values); break;
        case AttributeUnderlyingType::Int32: std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values); break;
        case AttributeUnderlyingType::Int64: std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values); break;
        case AttributeUnderlyingType::Float32: std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values); break;
        case AttributeUnderlyingType::Float64: std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values); break;
427 428 429 430 431 432 433 434 435 436 437

        case AttributeUnderlyingType::Decimal32:
            std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
            break;
        case AttributeUnderlyingType::Decimal64:
            std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = std::get<Decimal64>(attribute.null_values);
            break;
        case AttributeUnderlyingType::Decimal128:
            std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = std::get<Decimal128>(attribute.null_values);
            break;

438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
        case AttributeUnderlyingType::String:
        {
            const auto & null_value_ref = std::get<String>(attribute.null_values);
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];

            if (string_ref.data != null_value_ref.data())
            {
                if (string_ref.data)
                    string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

                string_ref = StringRef{null_value_ref};
            }

            break;
        }
    }
454 455
}

A
Alexey Milovidov 已提交
456
void CacheDictionary::setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const
457
{
458 459 460 461 462 463
    switch (attribute.type)
    {
        case AttributeUnderlyingType::UInt8: std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
        case AttributeUnderlyingType::UInt16: std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
        case AttributeUnderlyingType::UInt32: std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
        case AttributeUnderlyingType::UInt64: std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
464
        case AttributeUnderlyingType::UInt128: std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>(); break;
465 466 467 468 469 470
        case AttributeUnderlyingType::Int8: std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>(); break;
        case AttributeUnderlyingType::Int16: std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>(); break;
        case AttributeUnderlyingType::Int32: std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>(); break;
        case AttributeUnderlyingType::Int64: std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>(); break;
        case AttributeUnderlyingType::Float32: std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>(); break;
        case AttributeUnderlyingType::Float64: std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>(); break;
471 472 473 474 475

        case AttributeUnderlyingType::Decimal32: std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>(); break;
        case AttributeUnderlyingType::Decimal64: std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>(); break;
        case AttributeUnderlyingType::Decimal128: std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>(); break;

476 477 478 479 480 481 482 483 484 485
        case AttributeUnderlyingType::String:
        {
            const auto & string = value.get<String>();
            auto & string_ref = std::get<ContainerPtrType<StringRef>>(attribute.arrays)[idx];
            const auto & null_value_ref = std::get<String>(attribute.null_values);

            /// free memory unless it points to a null_value
            if (string_ref.data && string_ref.data != null_value_ref.data())
                string_arena->free(const_cast<char *>(string_ref.data), string_ref.size);

486 487
            const auto str_size = string.size();
            if (str_size != 0)
488
            {
489 490 491
                auto string_ptr = string_arena->alloc(str_size + 1);
                std::copy(string.data(), string.data() + str_size + 1, string_ptr);
                string_ref = StringRef{string_ptr, str_size};
492 493 494 495 496 497 498
            }
            else
                string_ref = {};

            break;
        }
    }
499 500
}

A
Alexey Milovidov 已提交
501
CacheDictionary::Attribute & CacheDictionary::getAttribute(const std::string & attribute_name) const
502
{
503 504
    const auto it = attribute_index_by_name.find(attribute_name);
    if (it == std::end(attribute_index_by_name))
505
        throw Exception{name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
506 507

    return attributes[it->second];
508 509
}

510 511
bool CacheDictionary::isEmptyCell(const UInt64 idx) const
{
512
    return (idx != zero_cell_idx && cells[idx].id == 0) || (cells[idx].data
513 514 515 516 517
        == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()));
}

PaddedPODArray<CacheDictionary::Key> CacheDictionary::getCachedIds() const
{
518 519
    const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};

520 521 522
    PaddedPODArray<Key> array;
    for (size_t idx = 0; idx < cells.size(); ++idx)
    {
N
Nikolai Kochetov 已提交
523
        auto & cell = cells[idx];
524
        if (!isEmptyCell(idx) && !cells[idx].isDefault())
525 526 527 528 529 530 531
        {
            array.push_back(cell.id);
        }
    }
    return array;
}

532
BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
533
{
534
    using BlockInputStreamType = DictionaryBlockInputStream<CacheDictionary, Key>;
535
    return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
536 537
}

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
void registerDictionaryCache(DictionaryFactory & factory)
{
    auto create_layout = [=](
                                 const std::string & name,
                                 const DictionaryStructure & dict_struct,
                                 const Poco::Util::AbstractConfiguration & config,
                                 const std::string & config_prefix,
                                 DictionarySourcePtr source_ptr
                                 ) -> DictionaryPtr {

        if (dict_struct.key)
            throw Exception {"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};

        if (dict_struct.range_min || dict_struct.range_max)
            throw Exception {name
                                 + ": elements .structure.range_min and .structure.range_max should be defined only "
                                   "for a dictionary of layout 'range_hashed'",
                             ErrorCodes::BAD_ARGUMENTS};
        const auto & layout_prefix = config_prefix + ".layout";
        const auto size = config.getInt(layout_prefix + ".cache.size_in_cells");
        if (size == 0)
            throw Exception {name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};

        const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
        if (require_nonempty)
            throw Exception {name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
                             ErrorCodes::BAD_ARGUMENTS};

        const DictionaryLifetime dict_lifetime {config, config_prefix + ".lifetime"};
        return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);


    };
    factory.registerLayout("cache", create_layout);
}

574

575
}