提交 da8f6712 编写于 作者: V Vitaly Baranov

Improve error handling in cache dictionaries:

allow using expired values while the source of a cache dictionary doesn't respond;
clone the source after an error to reset connections;
don't ask the source for a little time after error;
show the last exception in system.dictionaries for a cache dictionary too.
上级 0bca68e5
......@@ -70,6 +70,7 @@ CacheDictionary::CacheDictionary(
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, cells{this->size}
......@@ -575,6 +576,12 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
}
std::exception_ptr CacheDictionary::getLastException() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return last_exception;
}
void registerDictionaryCache(DictionaryFactory & factory)
{
auto create_layout = [=](const std::string & name,
......
......@@ -7,6 +7,7 @@
#include <shared_mutex>
#include <variant>
#include <vector>
#include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <pcg_random.hpp>
......@@ -74,6 +75,8 @@ public:
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
std::exception_ptr getLastException() const override;
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
......@@ -253,8 +256,9 @@ private:
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
Logger * const log;
mutable std::shared_mutex rw_lock;
......@@ -274,6 +278,10 @@ private:
Attribute * hierarchical_attribute = nullptr;
std::unique_ptr<ArenaWithFreeLists> string_arena;
mutable std::exception_ptr last_exception;
mutable size_t error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable pcg64 rnd_engine;
mutable size_t bytes_allocated = 0;
......
......@@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h>
#include <common/DateLUT.h>
#include <DataStreams/IBlockInputStream.h>
#include <ext/map.h>
#include <ext/range.h>
......@@ -243,77 +244,102 @@ template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({id, 0});
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
const auto now = std::chrono::system_clock::now();
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
const auto now = std::chrono::system_clock::now();
while (const auto block = stream->read())
try
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
if (error_count)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
for (const auto i : ext::range(0, ids.size()))
while (const auto block = stream->read())
{
const auto id = ids[i];
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
const auto & ids = id_column->getData();
auto & cell = cells[cell_idx];
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto attribute_idx : ext::range(0, attributes.size()))
for (const auto i : ext::range(0, ids.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
stream->readSuffix();
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
stream->readSuffix();
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
"', next update is scheduled at " + DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(backoff_end_time)));
}
}
size_t not_found_num = 0, found_num = 0;
const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
......@@ -328,24 +354,45 @@ void CacheDictionary::update(
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
/// Set null_value for each attribute
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
if (error_count)
{
if (find_result.outdated)
{
/// We have expired data for that `id` so we can continue using it.
bool was_default = cell.isDefault();
cell.setExpiresAt(backoff_end_time);
if (was_default)
cell.setDefault();
if (was_default)
on_id_not_found(id, cell_idx);
else
on_cell_updated(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_exception);
}
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// Set null_value for each attribute
cell.setDefault();
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found
on_id_not_found(id, cell_idx);
......
......@@ -56,6 +56,8 @@ struct IDictionaryBase : public IExternalLoadable
return source && source->isModified();
}
virtual std::exception_ptr getLastException() const { return {}; }
std::shared_ptr<IDictionaryBase> shared_from_this()
{
return std::static_pointer_cast<IDictionaryBase>(IExternalLoadable::shared_from_this());
......
#include "ExternalLoader.h"
#include <cmath>
#include <mutex>
#include <pcg_random.hpp>
#include <common/DateLUT.h>
......@@ -933,6 +932,8 @@ private:
class ExternalLoader::PeriodicUpdater : private boost::noncopyable
{
public:
static constexpr UInt64 check_period_sec = 5;
PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
{
......@@ -940,11 +941,10 @@ public:
~PeriodicUpdater() { enable(false); }
void enable(bool enable_, const ExternalLoaderUpdateSettings & settings_ = {})
void enable(bool enable_)
{
std::unique_lock lock{mutex};
enabled = enable_;
settings = settings_;
if (enable_)
{
......@@ -985,9 +985,7 @@ public:
return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
}
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
std::chrono::seconds delay(std::min<UInt64>(settings.backoff_max_sec, settings.backoff_initial_sec + distribution(rnd_engine)));
return std::chrono::system_clock::now() + delay;
return std::chrono::system_clock::now() + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
}
private:
......@@ -996,9 +994,8 @@ private:
setThreadName("ExterLdrReload");
std::unique_lock lock{mutex};
auto timeout = [this] { return std::chrono::seconds(settings.check_period_sec); };
auto pred = [this] { return !enabled; };
while (!event.wait_for(lock, timeout(), pred))
while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
{
lock.unlock();
loading_dispatcher.setConfiguration(config_files_reader.read());
......@@ -1012,7 +1009,6 @@ private:
mutable std::mutex mutex;
bool enabled = false;
ExternalLoaderUpdateSettings settings;
ThreadFromGlobalPool thread;
std::condition_variable event;
mutable pcg64 rnd_engine{randomSeed()};
......@@ -1051,9 +1047,9 @@ void ExternalLoader::enableAsyncLoading(bool enable)
loading_dispatcher->enableAsyncLoading(enable);
}
void ExternalLoader::enablePeriodicUpdates(bool enable_, const ExternalLoaderUpdateSettings & settings_)
void ExternalLoader::enablePeriodicUpdates(bool enable_)
{
periodic_updater->enable(enable_, settings_);
periodic_updater->enable(enable_);
}
bool ExternalLoader::hasCurrentlyLoadedObjects() const
......
......@@ -11,19 +11,6 @@
namespace DB
{
struct ExternalLoaderUpdateSettings
{
UInt64 check_period_sec = 5;
UInt64 backoff_initial_sec = 5;
/// 10 minutes
UInt64 backoff_max_sec = 10 * 60;
ExternalLoaderUpdateSettings() = default;
ExternalLoaderUpdateSettings(UInt64 check_period_sec_, UInt64 backoff_initial_sec_, UInt64 backoff_max_sec_)
: check_period_sec(check_period_sec_), backoff_initial_sec(backoff_initial_sec_), backoff_max_sec(backoff_max_sec_) {}
};
/* External configuration structure.
*
* <external_group>
......@@ -105,7 +92,7 @@ public:
void enableAsyncLoading(bool enable);
/// Sets settings for periodic updates.
void enablePeriodicUpdates(bool enable, const ExternalLoaderUpdateSettings & settings = {});
void enablePeriodicUpdates(bool enable);
/// Returns the status of the object.
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED.
......
#include <Interpreters/IExternalLoadable.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cmath>
namespace DB
{
......@@ -16,4 +16,13 @@ ExternalLoadableLifetime::ExternalLoadableLifetime(const Poco::Util::AbstractCon
max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec;
}
UInt64 ExternalLoadableBackoff::calculateDuration(pcg64 & rnd_engine, size_t error_count) const
{
if (error_count < 1)
error_count = 1;
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
return std::min<UInt64>(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine));
}
}
......@@ -3,6 +3,7 @@
#include <string>
#include <memory>
#include <boost/noncopyable.hpp>
#include <pcg_random.hpp>
#include <Core/Types.h>
......@@ -25,6 +26,17 @@ struct ExternalLoadableLifetime
};
/// Delay before trying to load again after error.
struct ExternalLoadableBackoff
{
UInt64 backoff_initial_sec = 5;
UInt64 backoff_max_sec = 10 * 60; /// 10 minutes
/// Calculates time to try loading again after error.
UInt64 calculateDuration(pcg64 & rnd_engine, size_t error_count = 1) const;
};
/// Basic interface for external loadable objects. Is used in ExternalLoader.
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
{
......
......@@ -50,10 +50,11 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<Int8>(load_result.status));
res_columns[i++]->insert(load_result.origin);
if (load_result.object)
{
const auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
std::exception_ptr last_exception = load_result.exception;
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionaryBase>(load_result.object);
if (dict_ptr)
{
res_columns[i++]->insert(dict_ptr->getTypeName());
const auto & dict_struct = dict_ptr->getStructure();
......@@ -66,6 +67,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(dict_ptr->getElementCount());
res_columns[i++]->insert(dict_ptr->getLoadFactor());
res_columns[i++]->insert(dict_ptr->getSource()->toString());
if (!last_exception)
last_exception = dict_ptr->getLastException();
}
else
{
......@@ -76,8 +80,8 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<UInt64>(std::chrono::system_clock::to_time_t(load_result.loading_start_time)));
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::duration<float>>(load_result.loading_duration).count());
if (load_result.exception)
res_columns[i++]->insert(getExceptionMessage(load_result.exception, false));
if (last_exception)
res_columns[i++]->insert(getExceptionMessage(last_exception, false));
else
res_columns[i++]->insertDefault();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册