提交 56eb0f2e 编写于 作者: N Nikita Mikhaylov

cleanup

上级 b9a8f1fc
......@@ -249,8 +249,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(SettingUInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(SettingUInt64, query_profiler_real_time_period_ns, 0, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(SettingUInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
\
\
/** Limits during query execution are part of the settings. \
......@@ -364,7 +364,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.", 0) \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.", 0) \
M(SettingBool, allow_introspection_functions, true, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.", 0) \
M(SettingBool, allow_introspection_functions, false, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.", 0) \
\
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.", 0) \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value", 0) \
......
......@@ -62,13 +62,13 @@ CacheDictionary::CacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_,
const size_t each_update_finish_timeout_seconds_,
const size_t max_threads_for_updates_)
DictionaryLifetime dict_lifetime_,
size_t size_,
bool allow_read_expired_keys_,
size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_,
size_t each_update_finish_timeout_seconds_,
size_t max_threads_for_updates_)
: database(database_)
, name(name_)
, full_name{database_.empty() ? name_ : (database_ + "." + name_)}
......@@ -93,9 +93,7 @@ CacheDictionary::CacheDictionary(
createAttributes();
for (size_t i = 0; i < max_threads_for_updates; ++i)
{
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
}
}
CacheDictionary::~CacheDictionary()
......@@ -750,22 +748,18 @@ void CacheDictionary::updateThreadFunction()
/// when this thread pops from the queue and other threads push to the queue.
const size_t current_queue_size = update_queue.size();
/// Word "bunch" must present in this log message, because it is being checked in tests.
if (current_queue_size > 0)
LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with "
<< current_queue_size + 1 << " keys" );
<< current_queue_size+1 << " keys" );
std::vector<UpdateUnitPtr> update_request;
update_request.reserve(current_queue_size + 1);
update_request.emplace_back(first_popped);
UpdateUnitPtr current_unit_ptr;
while (update_queue.tryPop(current_unit_ptr))
{
update_request.emplace_back(std::move(current_unit_ptr));
}
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
......@@ -777,7 +771,7 @@ void CacheDictionary::updateThreadFunction()
concatenated_requested_ids.reserve(total_requested_keys_count);
for (auto & unit_ptr: update_request)
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
try
{
......@@ -816,61 +810,6 @@ void CacheDictionary::updateThreadFunction()
}
}
void CacheDictionary::updateMultiThreadFunction()
{
setThreadName("AsyncUpdater");
const size_t thread_number = global_update_thread_number.fetch_add(1);
while (!finished)
{
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
if (finished)
break;
LOG_TRACE(log, "update with thread number " << thread_number);
auto start = std::chrono::system_clock::now();
try
{
auto found_ids_mask_ptr = std::make_shared<std::unordered_map<Key, UInt8>>(first_popped->requested_ids.size());
/// Copy shared_ptr to let this map be alive until other thread finish his stuff.
/// It is thread safe because writing to the map happens before reading from multiple threads.
first_popped->found_ids_mask_ptr = found_ids_mask_ptr;
for (const auto id : first_popped->requested_ids)
found_ids_mask_ptr->insert({id, 0});
/// Update a bunch of ids.
update(first_popped->requested_ids, *found_ids_mask_ptr);
/// Notify all threads about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
first_popped->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
first_popped->current_exception = std::current_exception();
is_update_finished.notify_all();
}
auto end = std::chrono::system_clock::now();
auto duration = end - start;
LOG_FATAL(log, "full update " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() << " ms");
}
}
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
std::unique_lock<std::mutex> lock(update_mutex);
......
......@@ -49,13 +49,13 @@ public:
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_,
const size_t each_update_finish_timeout_seconds_,
const size_t max_threads_for_updates);
DictionaryLifetime dict_lifetime_,
size_t size_,
bool allow_read_expired_keys_,
size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_,
size_t each_update_finish_timeout_seconds_,
size_t max_threads_for_updates);
~CacheDictionary() override;
......@@ -309,7 +309,7 @@ private:
const size_t size_overlap_mask;
/// Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3
static constexpr size_t max_collision_length = 10;
static constexpr size_t max_collision_length = 1;
const size_t zero_cell_idx{getCellIdx(0)};
std::map<std::string, size_t> attribute_index_by_name;
......@@ -331,13 +331,13 @@ private:
/// Field and methods correlated with update expired and not found keys
using FoundIdsMaskPtr = std::shared_ptr<std::unordered_map<Key, UInt8>>;
struct UpdateUnit
{
UpdateUnit(
std::vector<Key> requested_ids_):
requested_ids(std::move(requested_ids_)) {}
UpdateUnit(std::vector<Key> requested_ids_) : requested_ids(std::move(requested_ids_)) {}
std::shared_ptr<std::unordered_map<Key, UInt8>> found_ids_mask_ptr{nullptr};
FoundIdsMaskPtr found_ids_mask_ptr{nullptr};
std::atomic<bool> is_done{false};
std::exception_ptr current_exception{nullptr};
std::vector<Key> requested_ids;
......@@ -348,8 +348,6 @@ private:
mutable UpdateQueue update_queue;
ThreadFromGlobalPool update_thread;
std::atomic<size_t> global_update_thread_number{0};
ThreadPool update_pool;
......@@ -364,8 +362,6 @@ private:
template <typename PresentIdHandler, typename AbsentIdHandler>
void prepareAnswer(UpdateUnitPtr, PresentIdHandler &&, AbsentIdHandler &&) const;
void updateMultiThreadFunction();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册