diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 53fc746e565aee8a71a470929efe37fd37da3828..c3a78150f05d452f8f5bf0ade848993518cef14c 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -70,6 +70,7 @@ CacheDictionary::CacheDictionary( , dict_struct(dict_struct_) , source_ptr{std::move(source_ptr_)} , dict_lifetime(dict_lifetime_) + , log(&Logger::get("ExternalDictionaries")) , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} , size_overlap_mask{this->size - 1} , cells{this->size} @@ -575,6 +576,12 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na return std::make_shared(shared_from_this(), max_block_size, getCachedIds(), column_names); } +std::exception_ptr CacheDictionary::getLastException() const +{ + const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; + return last_exception; +} + void registerDictionaryCache(DictionaryFactory & factory) { auto create_layout = [=](const std::string & name, diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 7e1cec6ffe9040667d1f6d48bd8a94b4d8ea0300..750c51a7cf3b3bc5b6ae9724404f5df371e83df4 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +75,8 @@ public: void isInVectorConstant(const PaddedPODArray & child_ids, const Key ancestor_id, PaddedPODArray & out) const override; void isInConstantVector(const Key child_id, const PaddedPODArray & ancestor_ids, PaddedPODArray & out) const override; + std::exception_ptr getLastException() const override; + template using ResultArrayType = std::conditional_t, DecimalPaddedPODArray, PaddedPODArray>; @@ -253,8 +256,9 @@ private: const std::string name; const DictionaryStructure dict_struct; - const DictionarySourcePtr source_ptr; + mutable DictionarySourcePtr source_ptr; const DictionaryLifetime dict_lifetime; + Logger * const log; mutable std::shared_mutex rw_lock; @@ -274,6 +278,10 @@ private: Attribute * hierarchical_attribute = nullptr; std::unique_ptr string_arena; + mutable std::exception_ptr last_exception; + mutable size_t error_count = 0; + mutable std::chrono::system_clock::time_point backoff_end_time; + mutable pcg64 rnd_engine; mutable size_t bytes_allocated = 0; diff --git a/dbms/src/Dictionaries/CacheDictionary.inc.h b/dbms/src/Dictionaries/CacheDictionary.inc.h index 39e1a4b00df6ba2dcba83f11567e35298fe9ef41..51d515a63ddb9232ba41bd254eed0dc4d4a20611 100644 --- a/dbms/src/Dictionaries/CacheDictionary.inc.h +++ b/dbms/src/Dictionaries/CacheDictionary.inc.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -243,77 +244,102 @@ template void CacheDictionary::update( const std::vector & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const { + CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); + std::unordered_map remaining_ids{requested_ids.size()}; for (const auto id : requested_ids) remaining_ids.insert({id, 0}); - std::uniform_int_distribution distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; + const auto now = std::chrono::system_clock::now(); const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; + if (now > backoff_end_time) { - CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; - Stopwatch watch; - auto stream = source_ptr->loadIds(requested_ids); - stream->readPrefix(); - - const auto now = std::chrono::system_clock::now(); - - while (const auto block = stream->read()) + try { - const auto id_column = typeid_cast(block.safeGetByPosition(0).column.get()); - if (!id_column) - throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH}; - - const auto & ids = id_column->getData(); + if (error_count) + { + /// Recover after error: we have to clone the source here because + /// it could keep connections which should be reset after error. + source_ptr = source_ptr->clone(); + } - /// cache column pointers - const auto column_ptrs = ext::map( - ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); }); + Stopwatch watch; + auto stream = source_ptr->loadIds(requested_ids); + stream->readPrefix(); - for (const auto i : ext::range(0, ids.size())) + while (const auto block = stream->read()) { - const auto id = ids[i]; + const auto id_column = typeid_cast(block.safeGetByPosition(0).column.get()); + if (!id_column) + throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH}; - const auto find_result = findCellIdx(id, now); - const auto & cell_idx = find_result.cell_idx; + const auto & ids = id_column->getData(); - auto & cell = cells[cell_idx]; + /// cache column pointers + const auto column_ptrs = ext::map( + ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); }); - for (const auto attribute_idx : ext::range(0, attributes.size())) + for (const auto i : ext::range(0, ids.size())) { - const auto & attribute_column = *column_ptrs[attribute_idx]; - auto & attribute = attributes[attribute_idx]; - - setAttributeValue(attribute, cell_idx, attribute_column[i]); + const auto id = ids[i]; + + const auto find_result = findCellIdx(id, now); + const auto & cell_idx = find_result.cell_idx; + + auto & cell = cells[cell_idx]; + + for (const auto attribute_idx : ext::range(0, attributes.size())) + { + const auto & attribute_column = *column_ptrs[attribute_idx]; + auto & attribute = attributes[attribute_idx]; + + setAttributeValue(attribute, cell_idx, attribute_column[i]); + } + + /// if cell id is zero and zero does not map to this cell, then the cell is unused + if (cell.id == 0 && cell_idx != zero_cell_idx) + element_count.fetch_add(1, std::memory_order_relaxed); + + cell.id = id; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + { + std::uniform_int_distribution distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; + cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); + } + else + cell.setExpiresAt(std::chrono::time_point::max()); + + /// inform caller + on_cell_updated(id, cell_idx); + /// mark corresponding id as found + remaining_ids[id] = 1; } + } - /// if cell id is zero and zero does not map to this cell, then the cell is unused - if (cell.id == 0 && cell_idx != zero_cell_idx) - element_count.fetch_add(1, std::memory_order_relaxed); + stream->readSuffix(); - cell.id = id; - if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) - cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); - else - cell.setExpiresAt(std::chrono::time_point::max()); + error_count = 0; + last_exception = std::exception_ptr{}; + backoff_end_time = std::chrono::system_clock::time_point{}; - /// inform caller - on_cell_updated(id, cell_idx); - /// mark corresponding id as found - remaining_ids[id] = 1; - } + ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); } + catch (...) + { + ++error_count; + last_exception = std::current_exception(); + backoff_end_time = now + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count)); - stream->readSuffix(); - - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); - ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); + tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() + + "', next update is scheduled at " + DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(backoff_end_time))); + } } size_t not_found_num = 0, found_num = 0; - const auto now = std::chrono::system_clock::now(); /// Check which ids have not been found and require setting null_value for (const auto & id_found_pair : remaining_ids) { @@ -328,24 +354,45 @@ void CacheDictionary::update( const auto find_result = findCellIdx(id, now); const auto & cell_idx = find_result.cell_idx; - auto & cell = cells[cell_idx]; - /// Set null_value for each attribute - for (auto & attribute : attributes) - setDefaultAttributeValue(attribute, cell_idx); + if (error_count) + { + if (find_result.outdated) + { + /// We have expired data for that `id` so we can continue using it. + bool was_default = cell.isDefault(); + cell.setExpiresAt(backoff_end_time); + if (was_default) + cell.setDefault(); + if (was_default) + on_id_not_found(id, cell_idx); + else + on_cell_updated(id, cell_idx); + continue; + } + /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`. + std::rethrow_exception(last_exception); + } /// Check if cell had not been occupied before and increment element counter if it hadn't if (cell.id == 0 && cell_idx != zero_cell_idx) element_count.fetch_add(1, std::memory_order_relaxed); cell.id = id; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) - cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); + { + std::uniform_int_distribution distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; + cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); + } else cell.setExpiresAt(std::chrono::time_point::max()); + /// Set null_value for each attribute cell.setDefault(); + for (auto & attribute : attributes) + setDefaultAttributeValue(attribute, cell_idx); /// inform caller that the cell has not been found on_id_not_found(id, cell_idx); diff --git a/dbms/src/Dictionaries/IDictionary.h b/dbms/src/Dictionaries/IDictionary.h index 05f9806dc04dc09d07f0ef4e264539481de6be69..a1c080ca6ebd2cc5cf992a519b2517ef3fb21fc7 100644 --- a/dbms/src/Dictionaries/IDictionary.h +++ b/dbms/src/Dictionaries/IDictionary.h @@ -56,6 +56,8 @@ struct IDictionaryBase : public IExternalLoadable return source && source->isModified(); } + virtual std::exception_ptr getLastException() const { return {}; } + std::shared_ptr shared_from_this() { return std::static_pointer_cast(IExternalLoadable::shared_from_this()); diff --git a/dbms/src/Interpreters/ExternalLoader.cpp b/dbms/src/Interpreters/ExternalLoader.cpp index cc8466bebc16110d904d3ada8cb2169d94fb126b..6e16fd37cba875ad6099c9c9b3012334fcc49d47 100644 --- a/dbms/src/Interpreters/ExternalLoader.cpp +++ b/dbms/src/Interpreters/ExternalLoader.cpp @@ -1,6 +1,5 @@ #include "ExternalLoader.h" -#include #include #include #include @@ -933,6 +932,8 @@ private: class ExternalLoader::PeriodicUpdater : private boost::noncopyable { public: + static constexpr UInt64 check_period_sec = 5; + PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_) : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_) { @@ -940,11 +941,10 @@ public: ~PeriodicUpdater() { enable(false); } - void enable(bool enable_, const ExternalLoaderUpdateSettings & settings_ = {}) + void enable(bool enable_) { std::unique_lock lock{mutex}; enabled = enable_; - settings = settings_; if (enable_) { @@ -985,9 +985,7 @@ public: return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; } - std::uniform_int_distribution distribution(0, static_cast(std::exp2(error_count - 1))); - std::chrono::seconds delay(std::min(settings.backoff_max_sec, settings.backoff_initial_sec + distribution(rnd_engine))); - return std::chrono::system_clock::now() + delay; + return std::chrono::system_clock::now() + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count)); } private: @@ -996,9 +994,8 @@ private: setThreadName("ExterLdrReload"); std::unique_lock lock{mutex}; - auto timeout = [this] { return std::chrono::seconds(settings.check_period_sec); }; auto pred = [this] { return !enabled; }; - while (!event.wait_for(lock, timeout(), pred)) + while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred)) { lock.unlock(); loading_dispatcher.setConfiguration(config_files_reader.read()); @@ -1012,7 +1009,6 @@ private: mutable std::mutex mutex; bool enabled = false; - ExternalLoaderUpdateSettings settings; ThreadFromGlobalPool thread; std::condition_variable event; mutable pcg64 rnd_engine{randomSeed()}; @@ -1051,9 +1047,9 @@ void ExternalLoader::enableAsyncLoading(bool enable) loading_dispatcher->enableAsyncLoading(enable); } -void ExternalLoader::enablePeriodicUpdates(bool enable_, const ExternalLoaderUpdateSettings & settings_) +void ExternalLoader::enablePeriodicUpdates(bool enable_) { - periodic_updater->enable(enable_, settings_); + periodic_updater->enable(enable_); } bool ExternalLoader::hasCurrentlyLoadedObjects() const diff --git a/dbms/src/Interpreters/ExternalLoader.h b/dbms/src/Interpreters/ExternalLoader.h index 8a52d991759c7a699eb63c8f3574d072eca0133d..ecfc43c2dd941e1d143feb97b0235e37ece4975a 100644 --- a/dbms/src/Interpreters/ExternalLoader.h +++ b/dbms/src/Interpreters/ExternalLoader.h @@ -11,19 +11,6 @@ namespace DB { -struct ExternalLoaderUpdateSettings -{ - UInt64 check_period_sec = 5; - UInt64 backoff_initial_sec = 5; - /// 10 minutes - UInt64 backoff_max_sec = 10 * 60; - - ExternalLoaderUpdateSettings() = default; - ExternalLoaderUpdateSettings(UInt64 check_period_sec_, UInt64 backoff_initial_sec_, UInt64 backoff_max_sec_) - : check_period_sec(check_period_sec_), backoff_initial_sec(backoff_initial_sec_), backoff_max_sec(backoff_max_sec_) {} -}; - - /* External configuration structure. * * @@ -105,7 +92,7 @@ public: void enableAsyncLoading(bool enable); /// Sets settings for periodic updates. - void enablePeriodicUpdates(bool enable, const ExternalLoaderUpdateSettings & settings = {}); + void enablePeriodicUpdates(bool enable); /// Returns the status of the object. /// If the object has not been loaded yet then the function returns Status::NOT_LOADED. diff --git a/dbms/src/Interpreters/IExternalLoadable.cpp b/dbms/src/Interpreters/IExternalLoadable.cpp index 37855490a9f79a158c9ccd77b0a62afbb3685a41..e8bf8cbaf3c1ced3cd6ab0c9a4d8b9274f4d64b0 100644 --- a/dbms/src/Interpreters/IExternalLoadable.cpp +++ b/dbms/src/Interpreters/IExternalLoadable.cpp @@ -1,7 +1,7 @@ #include #include - +#include namespace DB { @@ -16,4 +16,13 @@ ExternalLoadableLifetime::ExternalLoadableLifetime(const Poco::Util::AbstractCon max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec; } + +UInt64 ExternalLoadableBackoff::calculateDuration(pcg64 & rnd_engine, size_t error_count) const +{ + if (error_count < 1) + error_count = 1; + std::uniform_int_distribution distribution(0, static_cast(std::exp2(error_count - 1))); + return std::min(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine)); +} + } diff --git a/dbms/src/Interpreters/IExternalLoadable.h b/dbms/src/Interpreters/IExternalLoadable.h index f8725a679898d9b6bdfc51595e91f19a8fd33b93..e842fdb8573b17fb200c57661c42884898afb45b 100644 --- a/dbms/src/Interpreters/IExternalLoadable.h +++ b/dbms/src/Interpreters/IExternalLoadable.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -25,6 +26,17 @@ struct ExternalLoadableLifetime }; +/// Delay before trying to load again after error. +struct ExternalLoadableBackoff +{ + UInt64 backoff_initial_sec = 5; + UInt64 backoff_max_sec = 10 * 60; /// 10 minutes + + /// Calculates time to try loading again after error. + UInt64 calculateDuration(pcg64 & rnd_engine, size_t error_count = 1) const; +}; + + /// Basic interface for external loadable objects. Is used in ExternalLoader. class IExternalLoadable : public std::enable_shared_from_this, private boost::noncopyable { diff --git a/dbms/src/Storages/System/StorageSystemDictionaries.cpp b/dbms/src/Storages/System/StorageSystemDictionaries.cpp index 6f85e1ea84bb25f858a116b9c5d984f0e1edab9b..826bb601609e6f81db3f33f054010502bc9c1d06 100644 --- a/dbms/src/Storages/System/StorageSystemDictionaries.cpp +++ b/dbms/src/Storages/System/StorageSystemDictionaries.cpp @@ -50,10 +50,11 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con res_columns[i++]->insert(static_cast(load_result.status)); res_columns[i++]->insert(load_result.origin); - if (load_result.object) - { - const auto dict_ptr = std::static_pointer_cast(load_result.object); + std::exception_ptr last_exception = load_result.exception; + const auto dict_ptr = std::dynamic_pointer_cast(load_result.object); + if (dict_ptr) + { res_columns[i++]->insert(dict_ptr->getTypeName()); const auto & dict_struct = dict_ptr->getStructure(); @@ -66,6 +67,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con res_columns[i++]->insert(dict_ptr->getElementCount()); res_columns[i++]->insert(dict_ptr->getLoadFactor()); res_columns[i++]->insert(dict_ptr->getSource()->toString()); + + if (!last_exception) + last_exception = dict_ptr->getLastException(); } else { @@ -76,8 +80,8 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con res_columns[i++]->insert(static_cast(std::chrono::system_clock::to_time_t(load_result.loading_start_time))); res_columns[i++]->insert(std::chrono::duration_cast>(load_result.loading_duration).count()); - if (load_result.exception) - res_columns[i++]->insert(getExceptionMessage(load_result.exception, false)); + if (last_exception) + res_columns[i++]->insert(getExceptionMessage(last_exception, false)); else res_columns[i++]->insertDefault(); }