提交 7ce2107e 编写于 作者: N Nikita Mikhaylov

better

上级 c8e42e97
......@@ -84,15 +84,15 @@ CacheDictionary::CacheDictionary(
, cells{this->size}
, rnd_engine(randomSeed())
, update_queue(max_update_queue_size_)
, update_pool(5)
, update_pool(4)
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
for (int i = 0; i < 5; ++i)
for (int i = 0; i < 4; ++i)
{
update_pool.scheduleOrThrowOnError([this] { updateMultiThreadFunction(); });
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
}
}
......@@ -100,11 +100,11 @@ CacheDictionary::~CacheDictionary()
{
finished = true;
update_queue.clear();
for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 4; ++i) {
auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
update_queue.push(empty_finishing_ptr);
}
update_thread.join();
update_pool.wait();
}
......@@ -748,20 +748,15 @@ void CacheDictionary::updateThreadFunction()
if (current_queue_size > 0)
LOG_DEBUG(log, "Performing bunch of keys update in cache dictionary with " << current_queue_size + 1 << " keys"; );
std::vector<UpdateUnitPtr> update_request(current_queue_size + 1);
std::vector<UpdateUnitPtr> update_request;
bool first_position = true;
update_request.push_back(first_popped);
update_request[0] = first_popped;
auto current_unit_ptr = UpdateUnitPtr();
for (auto & unit_ptr: update_request)
while (update_queue.tryPop(current_unit_ptr))
{
if unlikely(first_position)
{
first_position = false;
continue;
}
update_queue.pop(unit_ptr);
update_request.push_back(current_unit_ptr);
}
/// Here we prepare total count of all requested ids
......@@ -829,6 +824,8 @@ void CacheDictionary::updateMultiThreadFunction()
LOG_TRACE(log, "update with thread number " << thread_number);
auto start = std::chrono::system_clock::now();
try
{
auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(first_popped->requested_ids.size());
......@@ -857,6 +854,12 @@ void CacheDictionary::updateMultiThreadFunction()
first_popped->current_exception = std::current_exception();
is_update_finished.notify_all();
}
auto end = std::chrono::system_clock::now();
auto duration = end - start;
LOG_FATAL(log, "full update " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << " ms");
}
}
......
......@@ -39,6 +39,7 @@ template <typename AttributeType, typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumberImpl(
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
{
std::cout << StackTrace().toString() << std::endl;
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册