提交 28d0f332 编写于 作者: N Nikita Mikhaylov

bunch-updates && timeouts for waiting

上级 f0e4b0c0
......@@ -65,7 +65,8 @@ CacheDictionary::CacheDictionary(
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_)
const size_t update_queue_push_timeout_milliseconds_,
const size_t each_update_finish_timeout_seconds_)
: name{name_}
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
......@@ -73,6 +74,7 @@ CacheDictionary::CacheDictionary(
, allow_read_expired_keys(allow_read_expired_keys_)
, max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
......@@ -696,9 +698,15 @@ void registerDictionaryCache(DictionaryFactory & factory)
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS};
const size_t each_update_finish_timeout_seconds =
config.getUInt64(layout_prefix + ".each_update_finish_timeout_seconds", 600);
if (each_update_finish_timeout_seconds == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<CacheDictionary>(
name, dict_struct, std::move(source_ptr), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds);
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
};
factory.registerLayout("cache", create_layout, false);
}
......@@ -708,28 +716,56 @@ void CacheDictionary::updateThreadFunction()
setThreadName("AsyncUpdater");
while (!finished)
{
UpdateUnitPtr unit_ptr;
/// TODO: pop as many as we can + concatenate requested_ids
update_queue.pop(unit_ptr);
/// Here we pop as many unit pointers from update queue as we can.
/// We fix current size to avoid livelock (or too long waiting),
/// when this thread pops from the queue and other threads push to the queue.
const size_t current_queue_size = update_queue.size();
std::vector<UpdateUnitPtr> update_request(current_queue_size);
for (auto & unit_ptr: update_request)
update_queue.pop(unit_ptr);
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
size_t requested_keys_count = 0;
for (auto & unit_ptr: update_request)
requested_keys_count += unit_ptr->requested_ids.size();
std::vector<Key> concatenated_requested_ids;
concatenated_requested_ids.reserve(requested_keys_count);
for (auto & unit_ptr: update_request)
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
try
{
auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(unit_ptr->requested_ids.size());
auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(concatenated_requested_ids.size());
/// Copy shared_ptr to let this map be alive until other thread finish his stuff
unit_ptr->found_ids_mask_ptr = found_ids_mask_ptr;
/// Copy shared_ptr to let this map be alive until other thread finish his stuff.
/// It is thread safe because writing to the map happens before reading from multiple threads.
for (auto & unit_ptr: update_request)
unit_ptr->found_ids_mask_ptr = found_ids_mask_ptr;
for (const auto id : unit_ptr->requested_ids)
for (const auto id : concatenated_requested_ids)
found_ids_mask_ptr->insert({id, 0});
update(unit_ptr->requested_ids, *found_ids_mask_ptr);
/// Update a bunch of ids.
update(concatenated_requested_ids, *found_ids_mask_ptr);
/// Notify all threads about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
unit_ptr->is_done = true;
for (auto & unit_ptr: update_request)
unit_ptr->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
unit_ptr->current_exception = std::current_exception();
/// It is a big trouble, because one bad query can make other threads fail with not relative exception.
/// So at this point all threads (and queries) will receive the same exception.
for (auto & unit_ptr: update_request)
unit_ptr->current_exception = std::current_exception();
is_update_finished.notify_all();
}
}
......@@ -738,8 +774,13 @@ void CacheDictionary::updateThreadFunction()
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
std::unique_lock<std::mutex> lock(update_mutex);
is_update_finished.wait(lock,
[&] () {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
const auto sleeping_result = is_update_finished.wait_for(
lock,
std::chrono::minutes(each_update_finish_timeout_seconds),
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (!sleeping_result)
throw DB::Exception("Keys updating timed out", ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
......
......@@ -51,7 +51,8 @@ public:
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_);
const size_t update_queue_push_timeout_milliseconds_,
const size_t timeout_for_each_update_finish_);
~CacheDictionary() override;
......@@ -78,7 +79,8 @@ public:
{
return std::make_shared<CacheDictionary>(
name, dict_struct, source_ptr->clone(), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds);
allow_read_expired_keys, max_update_queue_size,
update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
......@@ -286,6 +288,7 @@ private:
const bool allow_read_expired_keys;
const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds;
const size_t each_update_finish_timeout_seconds;
Logger * const log;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册