提交 2545f66f 编写于 作者: N Nikita Mikhaylov

better

上级 534d47e0
......@@ -351,9 +351,8 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
std::vector<Key> required_expired_ids(cache_expired_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
UpdateUnit update_unit{std::move(required_expired_ids), [&](const Key, const size_t){}, [&](const Key, const size_t){}};
UInt64 timeout{10}; /// TODO: make setting or a field called update_queue_push_timeout;
if (!update_queue.tryPush(update_unit, timeout))
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto){} );
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Can't schedule an update job.");
return;
}
......@@ -361,28 +360,35 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
/// At this point we have two situations. There may be both types of keys: expired and not found.
/// We will update them all synchronously.
std::vector<Key> required_ids(cache_not_found_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
std::vector<Key> required_ids;
required_ids.reserve(outdated_ids_count);
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
UpdateUnit update_unit{
auto update_unit_ptr = std::make_shared<UpdateUnit>(
std::move(required_ids),
[&](const Key id, const size_t) {
for (const auto row : cache_not_found_ids[id])
out[row] = true;
for (const auto row : cache_expired_ids[id])
out[row] = true;
},
[&](const Key id, const size_t) {
for (const auto row : cache_not_found_ids[id])
out[row] = false;
for (const auto row : cache_expired_ids[id])
out[row] = true;
}
};
const bool res = update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds);
);
if (!res)
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
waitForCurrentUpdateFinish();
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
}
......@@ -668,10 +674,11 @@ void CacheDictionary::updateThreadFunction()
{
while (!finished)
{
UpdateUnit unit;
update_queue.pop(unit);
UpdateUnitPtr unit_ptr;
update_queue.pop(unit_ptr);
update(unit.requested_ids, unit.on_cell_updated, unit.on_id_not_found);
update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found);
unit_ptr->is_done = true;
last_update.fetch_add(1);
}
}
......@@ -688,5 +695,4 @@ void CacheDictionary::waitForCurrentUpdateFinish() const
std::this_thread::yield();
}
}
......@@ -6,6 +6,7 @@
#include <map>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <variant>
#include <vector>
#include <common/logger_useful.h>
......@@ -300,27 +301,37 @@ private:
struct UpdateUnit
{
UpdateUnit(
std::vector<Key> requested_ids_,
std::function<void(const Key, const size_t)> on_cell_updated_,
std::function<void(const Key, const size_t)> on_id_not_found_):
requested_ids(std::move(requested_ids_)),
on_cell_updated(std::move(on_cell_updated_)),
on_id_not_found(std::move(on_id_not_found_)) {}
std::atomic<bool> is_done{false};
std::vector<Key> requested_ids;
std::function<void(const Key, const size_t)> on_cell_updated;
std::function<void(const Key, const size_t)> on_id_not_found;
};
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnit>;
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
// TODO: make setting called max_updates_number
mutable UpdateQueue update_queue{10};
mutable UpdateQueue update_queue{100};
ThreadFromGlobalPool update_thread;
void updateThreadFunction();
std::atomic<bool> finished{false};
bool getAllowReadExpiredKeysSetting() const
static bool getAllowReadExpiredKeysSetting()
{
Context * context = current_thread->getThreadGroup()->global_context;
return context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary;
}
const size_t update_queue_push_timeout_milliseconds = 10;
const size_t update_queue_push_timeout_milliseconds = 100;
void waitForCurrentUpdateFinish() const;
mutable std::mutex update_mutex;
......
......@@ -90,11 +90,12 @@ void CacheDictionary::getItemsNumberImpl(
if (outdated_ids.empty())
return;
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
std::vector<Key> required_ids;
required_ids.reserve(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
/// request new values
update(
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_ids,
[&](const auto id, const auto cell_idx)
{
......@@ -108,6 +109,15 @@ void CacheDictionary::getItemsNumberImpl(
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
}
template <typename DefaultGetter>
......@@ -230,46 +240,52 @@ void CacheDictionary::getItemsString(
if (!cache_expired_ids.empty())
{
std::vector<Key> required_expired_ids(cache_not_found_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
if (allow_read_expired_keys_from_cache_dictionary)
{
UpdateUnit update_unit{required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {}};
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto) {});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
}
else
{
UpdateUnit update_unit{
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_expired_ids,
[&](const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size();
total_length += (attribute_value.size + 1) * cache_expired_ids[id].size();
},
[&](const auto id, const auto)
{
for (const auto row : cache_not_found_ids[id])
for (const auto row : cache_expired_ids[id])
total_length += get_default(row).size + 1;
}};
});
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
waitForCurrentUpdateFinish();
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
}
}
/// request new values
if (!cache_not_found_ids.empty())
{
std::vector<Key> required_ids(cache_not_found_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
UpdateUnit update_unit{
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_ids,
[&](const auto id, const auto cell_idx)
{
......@@ -282,10 +298,16 @@ void CacheDictionary::getItemsString(
{
for (const auto row : cache_not_found_ids[id])
total_length += get_default(row).size + 1;
}};
});
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();begin
while (!update_unit_ptr->is_done) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
}
out->getChars().reserve(total_length);
......
from __future__ import print_function
import pytest
import os
......@@ -162,13 +163,13 @@ def setup_module(module):
if source.compatible_with_layout(layout):
DICTIONARIES.append(get_dict(source, layout, FIELDS[layout.layout_type]))
else:
print "Source", source.name, "incompatible with layout", layout.name
print("Source", source.name, "incompatible with layout", layout.name)
for layout in LAYOUTS:
field_keys = list(filter(lambda x: x.is_key, FIELDS[layout.layout_type]))
for source in SOURCES_KV:
if not source.compatible_with_layout(layout):
print "Source", source.name, "incompatible with layout", layout.name
print("Source", source.name, "incompatible with layout", layout.name)
continue
for field in FIELDS[layout.layout_type]:
......@@ -188,9 +189,9 @@ def started_cluster():
try:
cluster.start()
for dictionary in DICTIONARIES + DICTIONARIES_KV:
print "Preparing", dictionary.name
# print "Preparing", dictionary.name
dictionary.prepare_source(cluster)
print "Prepared"
# print "Prepared"
yield cluster
......@@ -235,10 +236,9 @@ def test_simple_dictionaries(started_cluster):
queries_with_answers.append((query, 1))
for query, answer in queries_with_answers:
print query
if isinstance(answer, list):
answer = str(answer).replace(' ', '')
assert node.query(query) == str(answer) + '\n'
assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)
def test_complex_dictionaries(started_cluster):
......@@ -267,8 +267,7 @@ def test_complex_dictionaries(started_cluster):
queries_with_answers.append((query, field.default_value_for_get))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)
def test_ranged_dictionaries(started_cluster):
......@@ -291,8 +290,7 @@ def test_ranged_dictionaries(started_cluster):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)
def test_key_value_simple_dictionaries(started_cluster):
......@@ -314,12 +312,12 @@ def test_key_value_simple_dictionaries(started_cluster):
node.query("system reload dictionary {}".format(dct.name))
print 'name: ', dct.name
# print 'name: ', dct.name
for row in local_data:
print dct.get_fields()
# print dct.get_fields()
for field in dct.get_fields():
print field.name, field.is_key
# print field.name, field.is_key
if not field.is_key:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
......@@ -344,10 +342,9 @@ def test_key_value_simple_dictionaries(started_cluster):
queries_with_answers.append((query, 1))
for query, answer in queries_with_answers:
print query
if isinstance(answer, list):
answer = str(answer).replace(' ', '')
assert node.query(query) == str(answer) + '\n'
assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)
def test_key_value_complex_dictionaries(started_cluster):
......@@ -386,5 +383,4 @@ def test_key_value_complex_dictionaries(started_cluster):
queries_with_answers.append((query, field.default_value_for_get))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
assert node.query(query) == str(answer) + '\n', "Result and answer mismatched for query {}".format(query)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册