From 54361633baeb3832682f9fc035099a3e3efcf95d Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 19 Jun 2019 12:10:27 +0800 Subject: [PATCH] refine code Former-commit-id: eae3ec0b3f87dbecf1e44d079d4a4a46794f9b9f --- cpp/src/db/DBImpl.cpp | 103 ++++++++++++------ cpp/src/db/DBImpl.h | 6 +- cpp/src/db/DBMetaImpl.cpp | 16 ++- cpp/src/db/scheduler/SearchScheduler.cpp | 2 +- cpp/src/db/scheduler/SearchScheduler.h | 4 +- .../{SearchTaskQueue.cpp => SearchTask.cpp} | 2 +- .../{SearchTaskQueue.h => SearchTask.h} | 1 - cpp/src/server/RequestTask.cpp | 20 ++-- 8 files changed, 99 insertions(+), 55 deletions(-) rename cpp/src/db/scheduler/{SearchTaskQueue.cpp => SearchTask.cpp} (99%) rename cpp/src/db/scheduler/{SearchTaskQueue.h => SearchTask.h} (92%) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 4e9e4040..9b541b87 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -25,6 +25,10 @@ namespace engine { namespace { +static constexpr uint64_t METRIC_ACTION_INTERVAL = 1; +static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; +static constexpr uint64_t INDEX_ACTION_INTERVAL = 1; + void CollectInsertMetrics(double total_time, size_t n, bool succeed) { double avg_time = total_time / n; for (int i = 0; i < n; ++i) { @@ -130,7 +134,7 @@ DBImpl::DBImpl(const Options& options) pMemMgr_(new MemManager(pMeta_, options_)), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { - StartTimerTasks(options_.memory_sync_interval); + StartTimerTasks(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -399,12 +403,11 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch return Status::OK(); } -void DBImpl::StartTimerTasks(int interval) { - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); +void DBImpl::StartTimerTasks() { + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } - -void DBImpl::BackgroundTimerTask(int interval) { +void DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); while (true) { @@ -419,27 +422,42 @@ void DBImpl::BackgroundTimerTask(int interval) { break; } - std::this_thread::sleep_for(std::chrono::seconds(interval)); - - server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval); - int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); - int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); - server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total); - uint64_t size; - Size(size); - server::Metrics::GetInstance().DataFileSizeGaugeSet(size); - server::Metrics::GetInstance().CPUUsagePercentSet(); - server::Metrics::GetInstance().RAMUsagePercentSet(); - server::Metrics::GetInstance().GPUPercentGaugeSet(); - server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); - server::Metrics::GetInstance().OctetsSet(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + StartMetricTask(); StartCompactionTask(); StartBuildIndexTask(); } } +void DBImpl::StartMetricTask() { + static uint64_t metric_clock_tick = 0; + metric_clock_tick++; + if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) { + return; + } + + server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL); + int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); + int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); + server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total); + uint64_t size; + Size(size); + server::Metrics::GetInstance().DataFileSizeGaugeSet(size); + server::Metrics::GetInstance().CPUUsagePercentSet(); + server::Metrics::GetInstance().RAMUsagePercentSet(); + server::Metrics::GetInstance().GPUPercentGaugeSet(); + server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); + server::Metrics::GetInstance().OctetsSet(); +} + void DBImpl::StartCompactionTask() { + static uint64_t compact_clock_tick = 0; + compact_clock_tick++; + if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) { + return; + } + //serialize memory data std::vector temp_table_ids; pMemMgr_->Serialize(temp_table_ids); @@ -556,6 +574,12 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { } void DBImpl::StartBuildIndexTask() { + static uint64_t index_clock_tick = 0; + index_clock_tick++; + if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) { + return; + } + //build index has been finished? if(!index_thread_results_.empty()) { std::chrono::milliseconds span(10); @@ -581,29 +605,36 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { } ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); + if(to_index == nullptr) { + return Status::Error("Invalid engine type"); + } - to_index->Load(); - auto start_time = METRICS_NOW_TIME; - auto index = to_index->BuildIndex(table_file.location_); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); + try { + to_index->Load(); + auto start_time = METRICS_NOW_TIME; + auto index = to_index->BuildIndex(table_file.location_); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); - table_file.file_type_ = meta::TableFileSchema::INDEX; - table_file.size_ = index->Size(); + table_file.file_type_ = meta::TableFileSchema::INDEX; + table_file.size_ = index->Size(); - auto to_remove = file; - to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; + auto to_remove = file; + to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; - meta::TableFilesSchema update_files = {to_remove, table_file}; - pMeta_->UpdateTableFiles(update_files); + meta::TableFilesSchema update_files = {to_remove, table_file}; + pMeta_->UpdateTableFiles(update_files); - LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size " - << index->PhysicalSize()/(1024*1024) << " M" - << " from file " << to_remove.file_id_; + LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size " + << index->PhysicalSize()/(1024*1024) << " M" + << " from file " << to_remove.file_id_; - index->Cache(); - pMeta_->Archive(); + index->Cache(); + + } catch (std::exception& ex) { + return Status::Error("Build index encounter exception", ex.what()); + } return Status::OK(); } diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 64879a9c..177faeb9 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -70,8 +70,10 @@ private: const meta::DatesT& dates, QueryResults& results); - void StartTimerTasks(int interval); - void BackgroundTimerTask(int interval); + void StartTimerTasks(); + void BackgroundTimerTask(); + + void StartMetricTask(); void StartCompactionTask(); Status MergeFiles(const std::string& table_id, diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index ecad0783..3ae34c1c 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -186,10 +186,18 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { try { MetricCollector metric; - server::Metrics::GetInstance().MetaAccessTotalIncrement(); if (table_schema.table_id_ == "") { NextTableId(table_schema.table_id_); + } else { + auto table = ConnectorPtr->select(columns(&TableSchema::state_), + where(c(&TableSchema::table_id_) == table_schema.table_id_)); + if (table.size() == 1) { + std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ? + "Table already exists" : "Table already exists and it is in delete state, please wait a second"; + return Status::Error(msg); + } } + table_schema.files_cnt_ = 0; table_schema.id_ = -1; table_schema.created_on_ = utils::GetMicroSecTimeStamp(); @@ -207,8 +215,8 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { auto ret = boost::filesystem::create_directories(table_path); if (!ret) { ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + return Status::Error("Failed to create table path"); } - assert(ret); } } catch (std::exception &e) { @@ -733,10 +741,12 @@ Status DBMetaImpl::Size(uint64_t &result) { } Status DBMetaImpl::DiscardFiles(long to_discard_size) { - LOG(DEBUG) << "About to discard size=" << to_discard_size; if (to_discard_size <= 0) { return Status::OK(); } + + LOG(DEBUG) << "About to discard size=" << to_discard_size; + try { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::size_), diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 756abea4..fa0dee88 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -6,7 +6,7 @@ #include "SearchScheduler.h" #include "IndexLoaderQueue.h" -#include "SearchTaskQueue.h" +#include "SearchTask.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "metrics/Metrics.h" diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h index 6e84c436..673d7590 100644 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -7,7 +7,7 @@ #include "SearchContext.h" #include "IndexLoaderQueue.h" -#include "SearchTaskQueue.h" +#include "SearchTask.h" namespace zilliz { namespace milvus { @@ -35,6 +35,8 @@ private: std::shared_ptr search_thread_; IndexLoaderQueue index_load_queue_; + + using SearchTaskQueue = server::BlockingQueue; SearchTaskQueue search_queue_; bool stopped_ = true; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.cpp b/cpp/src/db/scheduler/SearchTask.cpp similarity index 99% rename from cpp/src/db/scheduler/SearchTaskQueue.cpp rename to cpp/src/db/scheduler/SearchTask.cpp index 819a881f..45503490 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.cpp +++ b/cpp/src/db/scheduler/SearchTask.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "SearchTaskQueue.h" +#include "SearchTask.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTask.h similarity index 92% rename from cpp/src/db/scheduler/SearchTaskQueue.h rename to cpp/src/db/scheduler/SearchTask.h index e5841cd1..3513c15a 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTask.h @@ -27,7 +27,6 @@ public: }; using SearchTaskPtr = std::shared_ptr; -using SearchTaskQueue = server::BlockingQueue; } diff --git a/cpp/src/server/RequestTask.cpp b/cpp/src/server/RequestTask.cpp index 47e79c0b..3133e1cd 100644 --- a/cpp/src/server/RequestTask.cpp +++ b/cpp/src/server/RequestTask.cpp @@ -221,7 +221,7 @@ ServerError CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema) - : BaseTask(PING_TASK_GROUP), + : BaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) { schema_.table_name = table_name_; @@ -329,7 +329,7 @@ ServerError DeleteTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(std::vector& tables) - : BaseTask(DQL_TASK_GROUP), + : BaseTask(DDL_DML_TASK_GROUP), tables_(tables) { } @@ -451,13 +451,13 @@ SearchVectorTask::SearchVectorTask(const std::string &table_name, const std::vector &query_range_array, const int64_t top_k, std::vector &result_array) - : BaseTask(DQL_TASK_GROUP), - table_name_(table_name), - file_id_array_(file_id_array), - record_array_(query_record_array), - range_array_(query_range_array), - top_k_(top_k), - result_array_(result_array) { + : BaseTask(DQL_TASK_GROUP), + table_name_(table_name), + file_id_array_(file_id_array), + record_array_(query_record_array), + range_array_(query_range_array), + top_k_(top_k), + result_array_(result_array) { } @@ -575,7 +575,7 @@ ServerError SearchVectorTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count) -: BaseTask(DQL_TASK_GROUP), +: BaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) { -- GitLab