From b79270fd03b24eb47e9bc8f39756a89f624971d4 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 26 Dec 2019 21:56:34 +0300 Subject: [PATCH] remove callbacks in update --- dbms/src/Dictionaries/CacheDictionary.cpp | 163 ++++++++++++-- dbms/src/Dictionaries/CacheDictionary.h | 17 +- dbms/src/Dictionaries/CacheDictionary.inc.h | 233 ++++++-------------- 3 files changed, 220 insertions(+), 193 deletions(-) diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 1e4a109098..3d015a011c 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -361,10 +361,7 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; }); /// Callbacks are empty because we don't want to receive them after an unknown period of time. - auto update_unit_ptr = std::make_shared( - required_expired_ids, - [&](const auto, const auto) {}, - [&](const auto, const auto) {} ); + auto update_unit_ptr = std::make_shared(required_expired_ids); tryPushToUpdateQueueOrThrow(update_unit_ptr); /// Update is async - no need to wait. @@ -385,26 +382,27 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); - auto update_unit_ptr = std::make_shared( - std::move(required_ids), - [&](const Key id, const size_t) - { - for (const auto row : cache_not_found_ids[id]) - out[row] = true; - for (const auto row : cache_expired_ids[id]) - out[row] = true; - }, - [&](const Key id, const size_t) - { - for (const auto row : cache_not_found_ids[id]) - out[row] = false; - for (const auto row : cache_expired_ids[id]) - out[row] = true; - } - ); + auto on_cell_updated = [&] (const Key id, const size_t) + { + for (const auto row : cache_not_found_ids[id]) + out[row] = true; + for (const auto row : cache_expired_ids[id]) + out[row] = true; + }; + + auto on_id_not_found = [&] (const Key id, const size_t) + { + for (const auto row : cache_not_found_ids[id]) + out[row] = false; + for (const auto row : cache_expired_ids[id]) + out[row] = true; + }; + + auto update_unit_ptr = std::make_shared(required_ids); tryPushToUpdateQueueOrThrow(update_unit_ptr); waitForCurrentUpdateFinish(update_unit_ptr); + prepareAnswer(update_unit_ptr, on_cell_updated, on_id_not_found); } @@ -714,7 +712,15 @@ void CacheDictionary::updateThreadFunction() update_queue.pop(unit_ptr); try { - update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found); + auto found_ids_mask_ptr = std::make_shared>(unit_ptr->requested_ids.size()); + + /// Copy shared_ptr to let this map be alive until other thread finish his stuff + unit_ptr->found_ids_mask_ptr = found_ids_mask_ptr; + + for (const auto id : unit_ptr->requested_ids) + found_ids_mask_ptr->insert({id, 0}); + + update(unit_ptr->requested_ids, *found_ids_mask_ptr); std::unique_lock lock(update_mutex); unit_ptr->is_done = true; is_update_finished.notify_all(); @@ -745,4 +751,117 @@ void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL); } + +void CacheDictionary::update(const std::vector & requested_ids, std::unordered_map & remaining_ids) const +{ + CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); + + const auto now = std::chrono::system_clock::now(); + + if (now > backoff_end_time) + { + try + { + if (error_count) + { + /// Recover after error: we have to clone the source here because + /// it could keep connections which should be reset after error. + source_ptr = source_ptr->clone(); + } + + Stopwatch watch; + /// Go to external storage. Might be very slow and blocking. + auto stream = source_ptr->loadIds(requested_ids); + + const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; + + stream->readPrefix(); + + while (const auto block = stream->read()) + { + const auto id_column = typeid_cast(block.safeGetByPosition(0).column.get()); + if (!id_column) + throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH}; + + const auto & ids = id_column->getData(); + + /// cache column pointers + const auto column_ptrs = ext::map( + ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); }); + + for (const auto i : ext::range(0, ids.size())) + { + const auto id = ids[i]; + + const auto find_result = findCellIdx(id, now); + const auto & cell_idx = find_result.cell_idx; + + auto & cell = cells[cell_idx]; + + for (const auto attribute_idx : ext::range(0, attributes.size())) + { + const auto & attribute_column = *column_ptrs[attribute_idx]; + auto & attribute = attributes[attribute_idx]; + + setAttributeValue(attribute, cell_idx, attribute_column[i]); + } + + /// if cell id is zero and zero does not map to this cell, then the cell is unused + if (cell.id == 0 && cell_idx != zero_cell_idx) + element_count.fetch_add(1, std::memory_order_relaxed); + + cell.id = id; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + { + std::uniform_int_distribution distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; + cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); + } + else + cell.setExpiresAt(std::chrono::time_point::max()); + + /// mark corresponding id as found + remaining_ids[id] = 1; + } + } + + stream->readSuffix(); + + error_count = 0; + last_exception = std::exception_ptr{}; + backoff_end_time = std::chrono::system_clock::time_point{}; + + ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); + } + catch (...) + { + ++error_count; + last_exception = std::current_exception(); + backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count)); + + tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() + + "', next update is scheduled at " + ext::to_string(backoff_end_time)); + } + } + + size_t not_found_num = 0, found_num = 0; + + + /// TODO: Replace! without checking the whole map with O(n) complexity + /// Check which ids have not been found and require setting null_value + for (const auto & id_found_pair : remaining_ids) + { + if (id_found_pair.second) + { + ++found_num; + continue; + } + ++not_found_num; + } + + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num); + ProfileEvents::increment(ProfileEvents::DictCacheRequests); +} + } diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index f4614ce07d..b28f877d68 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -254,8 +254,7 @@ private: template void getItemsString(Attribute & attribute, const PaddedPODArray & ids, ColumnString * out, DefaultGetter && get_default) const; - template - void update(const std::vector & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const; + void update(const std::vector & requested_ids, std::unordered_map & found_ids_mask_ptr) const; PaddedPODArray getCachedIds() const; @@ -325,18 +324,13 @@ private: struct UpdateUnit { UpdateUnit( - std::vector requested_ids_, - std::function on_cell_updated_, - std::function on_id_not_found_): - requested_ids(std::move(requested_ids_)), - on_cell_updated(std::move(on_cell_updated_)), - on_id_not_found(std::move(on_id_not_found_)) {} + std::vector requested_ids_): + requested_ids(std::move(requested_ids_)) {} + std::shared_ptr> found_ids_mask_ptr{nullptr}; std::atomic is_done{false}; std::exception_ptr current_exception{nullptr}; std::vector requested_ids; - std::function on_cell_updated; - std::function on_id_not_found; }; using UpdateUnitPtr = std::shared_ptr; @@ -353,6 +347,9 @@ private: mutable std::condition_variable is_update_finished; std::atomic finished{false}; + + template + void prepareAnswer(UpdateUnitPtr, PresentIdHandler &&, AbsentIdHandler &&) const; }; } diff --git a/dbms/src/Dictionaries/CacheDictionary.inc.h b/dbms/src/Dictionaries/CacheDictionary.inc.h index 549a137788..84d9930400 100644 --- a/dbms/src/Dictionaries/CacheDictionary.inc.h +++ b/dbms/src/Dictionaries/CacheDictionary.inc.h @@ -107,7 +107,7 @@ void CacheDictionary::getItemsNumberImpl( if (cache_expired_ids.empty()) return; - /// Update async only if allow_read_expired_keys_is_enabled + /// Update async only if allow_read_expired_keys_is_enabledadd condvar usage and better code if (allow_read_expired_keys) { std::vector required_expired_ids; @@ -116,10 +116,7 @@ void CacheDictionary::getItemsNumberImpl( [](auto & pair) { return pair.first; }); /// request new values - auto update_unit_ptr = std::make_shared( - required_expired_ids, - [&](const auto, const auto) {}, - [&](const auto, const auto) {}); + auto update_unit_ptr = std::make_shared(required_expired_ids); tryPushToUpdateQueueOrThrow(update_unit_ptr); @@ -142,34 +139,37 @@ void CacheDictionary::getItemsNumberImpl( std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); /// Request new values - auto update_unit_ptr = std::make_shared( - required_ids, - [&](const auto id, const auto cell_idx) - { - const auto attribute_value = attribute_array[cell_idx]; + auto update_unit_ptr = std::make_shared(required_ids); - for (const size_t row : cache_not_found_ids[id]) - out[row] = static_cast(attribute_value); + auto on_cell_updated = [&] (const auto id, const auto cell_idx) + { + const auto attribute_value = attribute_array[cell_idx]; - for (const size_t row : cache_expired_ids[id]) - out[row] = static_cast(attribute_value); - }, - [&](const auto id, const auto) - { - for (const size_t row : cache_not_found_ids[id]) - out[row] = get_default(row); + for (const size_t row : cache_not_found_ids[id]) + out[row] = static_cast(attribute_value); - for (const size_t row : cache_expired_ids[id]) - out[row] = get_default(row); - }); + for (const size_t row : cache_expired_ids[id]) + out[row] = static_cast(attribute_value); + }; + + auto on_id_not_found = [&] (const auto id, const auto) + { + for (const size_t row : cache_not_found_ids[id]) + out[row] = get_default(row); + + for (const size_t row : cache_expired_ids[id]) + out[row] = get_default(row); + }; tryPushToUpdateQueueOrThrow(update_unit_ptr); waitForCurrentUpdateFinish(update_unit_ptr); + prepareAnswer(update_unit_ptr, on_cell_updated, on_id_not_found); } template void CacheDictionary::getItemsString( - Attribute & attribute, const PaddedPODArray & ids, ColumnString * out, DefaultGetter && get_default) const { + Attribute & attribute, const PaddedPODArray & ids, ColumnString * out, DefaultGetter && get_default) const +{ const auto rows = ext::size(ids); /// save on some allocations @@ -186,14 +186,18 @@ void CacheDictionary::getItemsString( const auto now = std::chrono::system_clock::now(); /// fetch up-to-date values, discard on fail - for (const auto row : ext::range(0, rows)) { + for (const auto row : ext::range(0, rows)) + { const auto id = ids[row]; const auto find_result = findCellIdx(id, now); - if (!find_result.valid) { + if (!find_result.valid) + { found_outdated_values = true; break; - } else { + } + else + { const auto &cell_idx = find_result.cell_idx; const auto &cell = cells[cell_idx]; const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; @@ -203,7 +207,8 @@ void CacheDictionary::getItemsString( } /// optimistic code completed successfully - if (!found_outdated_values) { + if (!found_outdated_values) + { query_count.fetch_add(rows, std::memory_order_relaxed); hit_count.fetch_add(rows, std::memory_order_release); return; @@ -225,13 +230,15 @@ void CacheDictionary::getItemsString( const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; const auto now = std::chrono::system_clock::now(); - for (const auto row : ext::range(0, ids.size())) { + for (const auto row : ext::range(0, ids.size())) + { const auto id = ids[row]; const auto find_result = findCellIdx(id, now); - auto insert_value_routine = [&]() { + auto insert_value_routine = [&]() + { const auto &cell_idx = find_result.cell_idx; const auto &cell = cells[cell_idx]; const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; @@ -242,16 +249,18 @@ void CacheDictionary::getItemsString( total_length += string_ref.size + 1; }; - if (!find_result.valid) { - if (find_result.outdated) { + if (!find_result.valid) + { + if (find_result.outdated) + { cache_expired_ids[id].push_back(row); if (allow_read_expired_keys) insert_value_routine(); - } else { + } else cache_not_found_ids[id].push_back(row); - } - } else { + } else + { ++cache_hit; insert_value_routine(); } @@ -275,8 +284,7 @@ void CacheDictionary::getItemsString( std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; }); - auto update_unit_ptr = std::make_shared( - required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {}); + auto update_unit_ptr = std::make_shared(required_expired_ids); tryPushToUpdateQueueOrThrow(update_unit_ptr); @@ -297,23 +305,25 @@ void CacheDictionary::getItemsString( std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); - auto update_unit_ptr = std::make_shared( - required_ids, - [&](const auto id, const auto cell_idx) - { - const auto attribute_value = attribute_array[cell_idx]; + auto on_cell_updated = [&] (const auto id, const auto cell_idx) + { + const auto attribute_value = attribute_array[cell_idx]; - map[id] = String{attribute_value}; - total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size(); - }, - [&](const auto id, const auto) - { - for (const auto row : cache_not_found_ids[id]) - total_length += get_default(row).size + 1; - }); + map[id] = String{attribute_value}; + total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size(); + }; + + auto on_id_not_found = [&] (const auto id, const auto) + { + for (const auto row : cache_not_found_ids[id]) + total_length += get_default(row).size + 1; + }; + + auto update_unit_ptr = std::make_shared(required_ids); tryPushToUpdateQueueOrThrow(update_unit_ptr); waitForCurrentUpdateFinish(update_unit_ptr); + prepareAnswer(update_unit_ptr, on_cell_updated, on_id_not_found); } out->getChars().reserve(total_length); @@ -329,122 +339,27 @@ void CacheDictionary::getItemsString( } template -void CacheDictionary::update( - const std::vector & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const +void CacheDictionary::prepareAnswer( + UpdateUnitPtr update_unit_ptr, + PresentIdHandler && on_cell_updated, + AbsentIdHandler && on_id_not_found) const { - CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); - - std::unordered_map remaining_ids{requested_ids.size()}; - for (const auto id : requested_ids) - remaining_ids.insert({id, 0}); - + /// Prepare answer + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; const auto now = std::chrono::system_clock::now(); - if (now > backoff_end_time) + for (const auto & id : update_unit_ptr->requested_ids) { - try - { - if (error_count) - { - /// Recover after error: we have to clone the source here because - /// it could keep connections which should be reset after error. - source_ptr = source_ptr->clone(); - } - - Stopwatch watch; - /// Go to external storage. Might be very slow and blocking. - auto stream = source_ptr->loadIds(requested_ids); - - const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; - - stream->readPrefix(); - - while (const auto block = stream->read()) - { - const auto id_column = typeid_cast(block.safeGetByPosition(0).column.get()); - if (!id_column) - throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH}; - - const auto & ids = id_column->getData(); - - /// cache column pointers - const auto column_ptrs = ext::map( - ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); }); - - for (const auto i : ext::range(0, ids.size())) - { - const auto id = ids[i]; - - const auto find_result = findCellIdx(id, now); - const auto & cell_idx = find_result.cell_idx; - - auto & cell = cells[cell_idx]; - - for (const auto attribute_idx : ext::range(0, attributes.size())) - { - const auto & attribute_column = *column_ptrs[attribute_idx]; - auto & attribute = attributes[attribute_idx]; - - setAttributeValue(attribute, cell_idx, attribute_column[i]); - } - - /// if cell id is zero and zero does not map to this cell, then the cell is unused - if (cell.id == 0 && cell_idx != zero_cell_idx) - element_count.fetch_add(1, std::memory_order_relaxed); - - cell.id = id; - if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) - { - std::uniform_int_distribution distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; - cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); - } - else - cell.setExpiresAt(std::chrono::time_point::max()); - - /// inform caller - on_cell_updated(id, cell_idx); - /// mark corresponding id as found - remaining_ids[id] = 1; - } - } - - stream->readSuffix(); - - error_count = 0; - last_exception = std::exception_ptr{}; - backoff_end_time = std::chrono::system_clock::time_point{}; - - ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); - } - catch (...) - { - ++error_count; - last_exception = std::current_exception(); - backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count)); - - tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() + - "', next update is scheduled at " + ext::to_string(backoff_end_time)); - } - } - - size_t not_found_num = 0, found_num = 0; + const auto find_result = findCellIdx(id, now); + const auto & cell_idx = find_result.cell_idx; + auto & cell = cells[cell_idx]; + const auto was_id_updated = update_unit_ptr->found_ids_mask_ptr->at(id); - /// Check which ids have not been found and require setting null_value - for (const auto & id_found_pair : remaining_ids) - { - if (id_found_pair.second) + if (was_id_updated) { - ++found_num; + on_cell_updated(id, find_result.cell_idx); continue; } - ++not_found_num; - - const auto id = id_found_pair.first; - - const auto find_result = findCellIdx(id, now); - const auto & cell_idx = find_result.cell_idx; - auto & cell = cells[cell_idx]; if (error_count) { @@ -487,10 +402,6 @@ void CacheDictionary::update( /// inform caller that the cell has not been found on_id_not_found(id, cell_idx); } - - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num); - ProfileEvents::increment(ProfileEvents::DictCacheRequests); } } -- GitLab