diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 1a6865e6aebcee22d13d1522b0963f3ee0de587d..f56141f94d9db884b9d27385f0a54a72ac13d463 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -351,9 +351,8 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray std::vector required_expired_ids(cache_expired_ids.size()); std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; }); /// Callbacks are empty because we don't want to receive them after an unknown period of time. - UpdateUnit update_unit{std::move(required_expired_ids), [&](const Key, const size_t){}, [&](const Key, const size_t){}}; - UInt64 timeout{10}; /// TODO: make setting or a field called update_queue_push_timeout; - if (!update_queue.tryPush(update_unit, timeout)) + auto update_unit_ptr = std::make_shared(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto){} ); + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) throw std::runtime_error("Can't schedule an update job."); return; } @@ -361,28 +360,35 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray /// At this point we have two situations. There may be both types of keys: expired and not found. /// We will update them all synchronously. - std::vector required_ids(cache_not_found_ids.size()); - std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); - std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + std::vector required_ids; + required_ids.reserve(outdated_ids_count); + std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); + std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); - UpdateUnit update_unit{ + auto update_unit_ptr = std::make_shared( std::move(required_ids), [&](const Key id, const size_t) { for (const auto row : cache_not_found_ids[id]) out[row] = true; + for (const auto row : cache_expired_ids[id]) + out[row] = true; }, [&](const Key id, const size_t) { for (const auto row : cache_not_found_ids[id]) out[row] = false; + for (const auto row : cache_expired_ids[id]) + out[row] = true; } - }; - - const bool res = update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds); + ); - if (!res) + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) throw std::runtime_error("Too many updates"); - waitForCurrentUpdateFinish(); +// waitForCurrentUpdateFinish(); + while (!update_unit_ptr->is_done) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::yield(); + } } @@ -668,10 +674,11 @@ void CacheDictionary::updateThreadFunction() { while (!finished) { - UpdateUnit unit; - update_queue.pop(unit); + UpdateUnitPtr unit_ptr; + update_queue.pop(unit_ptr); - update(unit.requested_ids, unit.on_cell_updated, unit.on_id_not_found); + update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found); + unit_ptr->is_done = true; last_update.fetch_add(1); } } @@ -688,5 +695,4 @@ void CacheDictionary::waitForCurrentUpdateFinish() const std::this_thread::yield(); } - } diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 11c7a5f08c6dbee191179f88ba787fcc7e8b1ac2..0b9189a906a639fc3c3c14ab499a04ee47f415da 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -300,27 +301,37 @@ private: struct UpdateUnit { + UpdateUnit( + std::vector requested_ids_, + std::function on_cell_updated_, + std::function on_id_not_found_): + requested_ids(std::move(requested_ids_)), + on_cell_updated(std::move(on_cell_updated_)), + on_id_not_found(std::move(on_id_not_found_)) {} + + std::atomic is_done{false}; std::vector requested_ids; std::function on_cell_updated; std::function on_id_not_found; }; - using UpdateQueue = ConcurrentBoundedQueue; + using UpdateUnitPtr = std::shared_ptr; + using UpdateQueue = ConcurrentBoundedQueue; // TODO: make setting called max_updates_number - mutable UpdateQueue update_queue{10}; + mutable UpdateQueue update_queue{100}; ThreadFromGlobalPool update_thread; void updateThreadFunction(); std::atomic finished{false}; - bool getAllowReadExpiredKeysSetting() const + static bool getAllowReadExpiredKeysSetting() { Context * context = current_thread->getThreadGroup()->global_context; return context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary; } - const size_t update_queue_push_timeout_milliseconds = 10; + const size_t update_queue_push_timeout_milliseconds = 100; void waitForCurrentUpdateFinish() const; mutable std::mutex update_mutex; diff --git a/dbms/src/Dictionaries/CacheDictionary.inc.h b/dbms/src/Dictionaries/CacheDictionary.inc.h index 543074829ba9dd00c331127edef5375fa24a57b5..397ed5f0256817a84e8327083c9b11d689dca58d 100644 --- a/dbms/src/Dictionaries/CacheDictionary.inc.h +++ b/dbms/src/Dictionaries/CacheDictionary.inc.h @@ -90,11 +90,12 @@ void CacheDictionary::getItemsNumberImpl( if (outdated_ids.empty()) return; - std::vector required_ids(outdated_ids.size()); - std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + std::vector required_ids; + required_ids.reserve(outdated_ids.size()); + std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); /// request new values - update( + auto update_unit_ptr = std::make_shared( required_ids, [&](const auto id, const auto cell_idx) { @@ -108,6 +109,15 @@ void CacheDictionary::getItemsNumberImpl( for (const size_t row : outdated_ids[id]) out[row] = get_default(row); }); + + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) + throw std::runtime_error("Too many updates"); + +// waitForCurrentUpdateFinish(); + while (!update_unit_ptr->is_done) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::yield(); + } } template @@ -230,46 +240,52 @@ void CacheDictionary::getItemsString( if (!cache_expired_ids.empty()) { - std::vector required_expired_ids(cache_not_found_ids.size()); - std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; }); + std::vector required_expired_ids; + required_expired_ids.reserve(cache_not_found_ids.size()); + std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; }); if (allow_read_expired_keys_from_cache_dictionary) { - UpdateUnit update_unit{required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {}}; - if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + auto update_unit_ptr = std::make_shared(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto) {}); + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) throw std::runtime_error("Too many updates"); } else { - UpdateUnit update_unit{ + auto update_unit_ptr = std::make_shared( required_expired_ids, [&](const auto id, const auto cell_idx) { const auto attribute_value = attribute_array[cell_idx]; map[id] = String{attribute_value}; - total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size(); + total_length += (attribute_value.size + 1) * cache_expired_ids[id].size(); }, [&](const auto id, const auto) { - for (const auto row : cache_not_found_ids[id]) + for (const auto row : cache_expired_ids[id]) total_length += get_default(row).size + 1; - }}; + }); - if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) throw std::runtime_error("Too many updates"); - waitForCurrentUpdateFinish(); +// waitForCurrentUpdateFinish(); + while (!update_unit_ptr->is_done) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::yield(); + } } } /// request new values if (!cache_not_found_ids.empty()) { - std::vector required_ids(cache_not_found_ids.size()); - std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + std::vector required_ids; + required_ids.reserve(cache_not_found_ids.size()); + std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; }); - UpdateUnit update_unit{ + auto update_unit_ptr = std::make_shared( required_ids, [&](const auto id, const auto cell_idx) { @@ -282,10 +298,16 @@ void CacheDictionary::getItemsString( { for (const auto row : cache_not_found_ids[id]) total_length += get_default(row).size + 1; - }}; + }); - if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds)) throw std::runtime_error("Too many updates"); + +// waitForCurrentUpdateFinish();begin + while (!update_unit_ptr->is_done) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::yield(); + } } out->getChars().reserve(total_length); diff --git a/dbms/tests/integration/test_dictionaries_all_layouts_and_sources/test.py b/dbms/tests/integration/test_dictionaries_all_layouts_and_sources/test.py index 01f9b15b51f3b7fc9d14fb2f52f478c68873a3fd..accc85d4f003ded4973f11917ccc30319bed6113 100644 --- a/dbms/tests/integration/test_dictionaries_all_layouts_and_sources/test.py +++ b/dbms/tests/integration/test_dictionaries_all_layouts_and_sources/test.py @@ -1,3 +1,4 @@ +from __future__ import print_function import pytest import os @@ -162,13 +163,13 @@ def setup_module(module): if source.compatible_with_layout(layout): DICTIONARIES.append(get_dict(source, layout, FIELDS[layout.layout_type])) else: - print "Source", source.name, "incompatible with layout", layout.name + print("Source", source.name, "incompatible with layout", layout.name) for layout in LAYOUTS: field_keys = list(filter(lambda x: x.is_key, FIELDS[layout.layout_type])) for source in SOURCES_KV: if not source.compatible_with_layout(layout): - print "Source", source.name, "incompatible with layout", layout.name + print("Source", source.name, "incompatible with layout", layout.name) continue for field in FIELDS[layout.layout_type]: @@ -188,9 +189,9 @@ def started_cluster(): try: cluster.start() for dictionary in DICTIONARIES + DICTIONARIES_KV: - print "Preparing", dictionary.name + # print "Preparing", dictionary.name dictionary.prepare_source(cluster) - print "Prepared" + # print "Prepared" yield cluster @@ -235,10 +236,9 @@ def test_simple_dictionaries(started_cluster): queries_with_answers.append((query, 1)) for query, answer in queries_with_answers: - print query if isinstance(answer, list): answer = str(answer).replace(' ', '') - assert node.query(query) == str(answer) + '\n' + assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query) def test_complex_dictionaries(started_cluster): @@ -267,8 +267,7 @@ def test_complex_dictionaries(started_cluster): queries_with_answers.append((query, field.default_value_for_get)) for query, answer in queries_with_answers: - print query - assert node.query(query) == str(answer) + '\n' + assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query) def test_ranged_dictionaries(started_cluster): @@ -291,8 +290,7 @@ def test_ranged_dictionaries(started_cluster): queries_with_answers.append((query, row.get_value_by_name(field.name))) for query, answer in queries_with_answers: - print query - assert node.query(query) == str(answer) + '\n' + assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query) def test_key_value_simple_dictionaries(started_cluster): @@ -314,12 +312,12 @@ def test_key_value_simple_dictionaries(started_cluster): node.query("system reload dictionary {}".format(dct.name)) - print 'name: ', dct.name + # print 'name: ', dct.name for row in local_data: - print dct.get_fields() + # print dct.get_fields() for field in dct.get_fields(): - print field.name, field.is_key + # print field.name, field.is_key if not field.is_key: for query in dct.get_select_get_queries(field, row): queries_with_answers.append((query, row.get_value_by_name(field.name))) @@ -344,10 +342,9 @@ def test_key_value_simple_dictionaries(started_cluster): queries_with_answers.append((query, 1)) for query, answer in queries_with_answers: - print query if isinstance(answer, list): answer = str(answer).replace(' ', '') - assert node.query(query) == str(answer) + '\n' + assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query) def test_key_value_complex_dictionaries(started_cluster): @@ -386,5 +383,4 @@ def test_key_value_complex_dictionaries(started_cluster): queries_with_answers.append((query, field.default_value_for_get)) for query, answer in queries_with_answers: - print query - assert node.query(query) == str(answer) + '\n' + assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)