提交 9170e5d4 编写于 作者: N Nikita Mikhaylov

style errors

上级 9b63f289
......@@ -67,7 +67,8 @@ CacheDictionary::CacheDictionary(
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_,
const size_t each_update_finish_timeout_seconds_)
const size_t each_update_finish_timeout_seconds_,
const size_t max_threads_for_updates_)
: database(database_)
, name(name_)
, full_name{database_.empty() ? name_ : (database_ + "." + name_)}
......@@ -78,19 +79,20 @@ CacheDictionary::CacheDictionary(
, max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
, max_threads_for_updates(max_threads_for_updates_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, cells{this->size}
, rnd_engine(randomSeed())
, update_queue(max_update_queue_size_)
, update_pool(4)
, update_pool(max_threads_for_updates)
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
for (int i = 0; i < 4; ++i)
for (size_t i = 0; i < max_threads_for_updates; ++i)
{
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
}
......@@ -100,7 +102,8 @@ CacheDictionary::~CacheDictionary()
{
finished = true;
update_queue.clear();
for (int i = 0; i < 4; ++i) {
for (size_t i = 0; i < max_threads_for_updates; ++i)
{
auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
update_queue.push(empty_finishing_ptr);
}
......@@ -717,9 +720,16 @@ void registerDictionaryCache(DictionaryFactory & factory)
throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
ErrorCodes::BAD_ARGUMENTS};
const size_t max_threads_for_updates =
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have zero threads for updates.",
ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<CacheDictionary>(
database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
each_update_finish_timeout_seconds, max_threads_for_updates);
};
factory.registerLayout("cache", create_layout, false);
}
......@@ -729,16 +739,12 @@ void CacheDictionary::updateThreadFunction()
setThreadName("AsyncUpdater");
while (!finished)
{
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
if (finished)
break;
///std::this_thread::sleep_for(std::chrono::milliseconds(10));
/// Here we pop as many unit pointers from update queue as we can.
/// We fix current size to avoid livelock (or too long waiting),
/// when this thread pops from the queue and other threads push to the queue.
......@@ -746,17 +752,18 @@ void CacheDictionary::updateThreadFunction()
/// Word "bunch" must present in this log message, because it is being checked in tests.
if (current_queue_size > 0)
LOG_DEBUG(log, "Performing bunch of keys update in cache dictionary with " << current_queue_size + 1 << " keys"; );
LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with " << current_queue_size + 1 << " keys" );
std::vector<UpdateUnitPtr> update_request;
update_request.reserve(current_queue_size + 1);
update_request.push_back(first_popped);
update_request.emplace_back(first_popped);
auto current_unit_ptr = UpdateUnitPtr();
UpdateUnitPtr current_unit_ptr;
while (update_queue.tryPop(current_unit_ptr))
{
update_request.push_back(current_unit_ptr);
update_request.emplace_back(std::move(current_unit_ptr));
}
/// Here we prepare total count of all requested ids
......@@ -907,74 +914,68 @@ void CacheDictionary::update(const std::vector<Key> & requested_ids, std::unorde
}
Stopwatch watch;
/// Go to external storage. Might be very slow and blocking.
auto start = std::chrono::system_clock::now();
auto stream = source_ptr->loadIds(requested_ids);
auto load_ids_start = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
/// Trip to external storage. Might be very bad, slow and blocking.
auto stream = source_ptr->loadIds(requested_ids);
std::chrono::duration<double> diff = end-start;
auto load_ids_end = std::chrono::system_clock::now();
LOG_FATAL(log, "load ids " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
LOG_TRACE(log, "Loading " << requested_ids.size() << " number of ids from external storage took " <<
std::chrono::duration_cast<std::chrono::milliseconds>(load_ids_end - load_ids_start).count() << " ms");
stream->readPrefix();
while (true)
while (const auto block = stream->read())
{
start = std::chrono::system_clock::now();
if (const auto block = stream->read()) {
end = std::chrono::system_clock::now();
diff = end - start;
LOG_FATAL(log, "read " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.",
ErrorCodes::TYPE_MISMATCH};
const auto &ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()),
[&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.",
ErrorCodes::TYPE_MISMATCH};
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
const auto &ids = id_column->getData();
for (const auto i : ext::range(0, ids.size())) {
const auto id = ids[i];
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()),
[&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
const auto find_result = findCellIdx(id, now);
const auto &cell_idx = find_result.cell_idx;
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto &cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size())) {
const auto &attribute_column = *column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
const auto find_result = findCellIdx(id, now);
const auto &cell_idx = find_result.cell_idx;
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
auto &cell = cells[cell_idx];
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) {
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec,
dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
} else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto &attribute_column = *column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
/// mark corresponding id as found
remaining_ids[id] = 1;
++found_num;
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
} else {
break;
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec,
dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
} else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// mark corresponding id as found
remaining_ids[id] = 1;
++found_num;
}
}
......
......@@ -54,7 +54,8 @@ public:
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_,
const size_t each_update_finish_timeout_seconds_);
const size_t each_update_finish_timeout_seconds_,
const size_t max_threads_for_updates);
~CacheDictionary() override;
......@@ -84,7 +85,7 @@ public:
return std::make_shared<CacheDictionary>(
database, name, dict_struct, source_ptr->clone(), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size,
update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds);
update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds, max_threads_for_updates);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
......@@ -295,6 +296,7 @@ private:
const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds;
const size_t each_update_finish_timeout_seconds;
const size_t max_threads_for_updates;
Logger * const log;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册