提交 c8e42e97 编写于 作者: N Nikita Mikhaylov

multithreaded updates

上级 e5320032
......@@ -249,8 +249,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(SettingUInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
\
\
/** Limits during query execution are part of the settings. \
......@@ -364,7 +364,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.", 0) \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.", 0) \
M(SettingBool, allow_introspection_functions, false, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.", 0) \
M(SettingBool, allow_introspection_functions, true, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.", 0) \
\
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.", 0) \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
......
......@@ -84,20 +84,26 @@ CacheDictionary::CacheDictionary(
, cells{this->size}
, rnd_engine(randomSeed())
, update_queue(max_update_queue_size_)
, update_pool(5)
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); });
for (int i = 0; i < 5; ++i)
{
update_pool.scheduleOrThrowOnError([this] { updateMultiThreadFunction(); });
}
}
CacheDictionary::~CacheDictionary()
{
finished = true;
update_queue.clear();
auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
update_queue.push(empty_finishing_ptr);
for (int i = 0; i < 5; ++i) {
auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
update_queue.push(empty_finishing_ptr);
}
update_thread.join();
}
......@@ -723,7 +729,7 @@ void CacheDictionary::updateThreadFunction()
setThreadName("AsyncUpdater");
while (!finished)
{
///std::this_thread::sleep_for(std::chrono::milliseconds(1000));
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
......@@ -731,8 +737,7 @@ void CacheDictionary::updateThreadFunction()
if (finished)
break;
/// Wait other pointers to be pushed.
/// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
///std::this_thread::sleep_for(std::chrono::milliseconds(10));
/// Here we pop as many unit pointers from update queue as we can.
/// We fix current size to avoid livelock (or too long waiting),
......@@ -741,16 +746,23 @@ void CacheDictionary::updateThreadFunction()
/// Word "bunch" must present in this log message, because it is being checked in tests.
if (current_queue_size > 0)
LOG_DEBUG(log, "Performing bunch of keys update in cache dictionary.");
LOG_DEBUG(log, "Performing bunch of keys update in cache dictionary with " << current_queue_size + 1 << " keys"; );
/// We use deque since there is first_popped pointer.
/// And we have to add to the update_request without breaking order.
std::deque<UpdateUnitPtr> update_request(current_queue_size);
std::vector<UpdateUnitPtr> update_request(current_queue_size + 1);
bool first_position = true;
update_request[0] = first_popped;
for (auto & unit_ptr: update_request)
{
if unlikely(first_position)
{
first_position = false;
continue;
}
update_queue.pop(unit_ptr);
update_request.push_front(first_popped);
}
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
......@@ -801,6 +813,53 @@ void CacheDictionary::updateThreadFunction()
}
}
void CacheDictionary::updateMultiThreadFunction()
{
setThreadName("AsyncUpdater");
const size_t thread_number = global_update_thread_number.fetch_add(1);
while (!finished)
{
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
if (finished)
break;
LOG_TRACE(log, "update with thread number " << thread_number);
try
{
auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(first_popped->requested_ids.size());
/// Copy shared_ptr to let this map be alive until other thread finish his stuff.
/// It is thread safe because writing to the map happens before reading from multiple threads.
first_popped->found_ids_mask_ptr = found_ids_mask_ptr;
for (const auto id : first_popped->requested_ids)
found_ids_mask_ptr->insert({id, 0});
/// Update a bunch of ids.
update(first_popped->requested_ids, *found_ids_mask_ptr);
/// Notify all threads about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
first_popped->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
first_popped->current_exception = std::current_exception();
is_update_finished.notify_all();
}
}
}
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
std::unique_lock<std::mutex> lock(update_mutex);
......@@ -846,57 +905,73 @@ void CacheDictionary::update(const std::vector<Key> & requested_ids, std::unorde
Stopwatch watch;
/// Go to external storage. Might be very slow and blocking.
auto start = std::chrono::system_clock::now();
auto stream = source_ptr->loadIds(requested_ids);
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto end = std::chrono::system_clock::now();
std::chrono::duration<double> diff = end-start;
LOG_FATAL(log, "load ids " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
stream->readPrefix();
while (const auto block = stream->read())
while (true)
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
start = std::chrono::system_clock::now();
if (const auto block = stream->read()) {
end = std::chrono::system_clock::now();
diff = end - start;
LOG_FATAL(log, "read " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms");
const auto & ids = id_column->getData();
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.",
ErrorCodes::TYPE_MISMATCH};
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
const auto &ids = id_column->getData();
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()),
[&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto & cell = cells[cell_idx];
for (const auto i : ext::range(0, ids.size())) {
const auto id = ids[i];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
const auto find_result = findCellIdx(id, now);
const auto &cell_idx = find_result.cell_idx;
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
auto &cell = cells[cell_idx];
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
for (const auto attribute_idx : ext::range(0, attributes.size())) {
const auto &attribute_column = *column_ptrs[attribute_idx];
auto &attribute = attributes[attribute_idx];
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// mark corresponding id as found
remaining_ids[id] = 1;
++found_num;
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) {
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec,
dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
} else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// mark corresponding id as found
remaining_ids[id] = 1;
++found_num;
}
} else {
break;
}
}
......
......@@ -347,6 +347,10 @@ private:
mutable UpdateQueue update_queue;
ThreadFromGlobalPool update_thread;
std::atomic<size_t> global_update_thread_number{0};
ThreadPool update_pool;
void updateThreadFunction();
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const;
......@@ -358,6 +362,8 @@ private:
template <typename PresentIdHandler, typename AbsentIdHandler>
void prepareAnswer(UpdateUnitPtr, PresentIdHandler &&, AbsentIdHandler &&) const;
};
void updateMultiThreadFunction();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册