diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 4e9e4040b78622e29866d9fd3fce6a004205fdca..9b541b87262bd77023a4a841533cc8533f34384f 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -25,6 +25,10 @@ namespace engine { namespace { +static constexpr uint64_t METRIC_ACTION_INTERVAL = 1; +static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; +static constexpr uint64_t INDEX_ACTION_INTERVAL = 1; + void CollectInsertMetrics(double total_time, size_t n, bool succeed) { double avg_time = total_time / n; for (int i = 0; i < n; ++i) { @@ -130,7 +134,7 @@ DBImpl::DBImpl(const Options& options) pMemMgr_(new MemManager(pMeta_, options_)), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { - StartTimerTasks(options_.memory_sync_interval); + StartTimerTasks(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -399,12 +403,11 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch return Status::OK(); } -void DBImpl::StartTimerTasks(int interval) { - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); +void DBImpl::StartTimerTasks() { + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } - -void DBImpl::BackgroundTimerTask(int interval) { +void DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); while (true) { @@ -419,27 +422,42 @@ void DBImpl::BackgroundTimerTask(int interval) { break; } - std::this_thread::sleep_for(std::chrono::seconds(interval)); - - server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval); - int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); - int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); - server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total); - uint64_t size; - Size(size); - server::Metrics::GetInstance().DataFileSizeGaugeSet(size); - server::Metrics::GetInstance().CPUUsagePercentSet(); - server::Metrics::GetInstance().RAMUsagePercentSet(); - server::Metrics::GetInstance().GPUPercentGaugeSet(); - server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); - server::Metrics::GetInstance().OctetsSet(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + StartMetricTask(); StartCompactionTask(); StartBuildIndexTask(); } } +void DBImpl::StartMetricTask() { + static uint64_t metric_clock_tick = 0; + metric_clock_tick++; + if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) { + return; + } + + server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL); + int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); + int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); + server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total); + uint64_t size; + Size(size); + server::Metrics::GetInstance().DataFileSizeGaugeSet(size); + server::Metrics::GetInstance().CPUUsagePercentSet(); + server::Metrics::GetInstance().RAMUsagePercentSet(); + server::Metrics::GetInstance().GPUPercentGaugeSet(); + server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); + server::Metrics::GetInstance().OctetsSet(); +} + void DBImpl::StartCompactionTask() { + static uint64_t compact_clock_tick = 0; + compact_clock_tick++; + if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) { + return; + } + //serialize memory data std::vector temp_table_ids; pMemMgr_->Serialize(temp_table_ids); @@ -556,6 +574,12 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { } void DBImpl::StartBuildIndexTask() { + static uint64_t index_clock_tick = 0; + index_clock_tick++; + if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) { + return; + } + //build index has been finished? if(!index_thread_results_.empty()) { std::chrono::milliseconds span(10); @@ -581,29 +605,36 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { } ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); + if(to_index == nullptr) { + return Status::Error("Invalid engine type"); + } - to_index->Load(); - auto start_time = METRICS_NOW_TIME; - auto index = to_index->BuildIndex(table_file.location_); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); + try { + to_index->Load(); + auto start_time = METRICS_NOW_TIME; + auto index = to_index->BuildIndex(table_file.location_); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); - table_file.file_type_ = meta::TableFileSchema::INDEX; - table_file.size_ = index->Size(); + table_file.file_type_ = meta::TableFileSchema::INDEX; + table_file.size_ = index->Size(); - auto to_remove = file; - to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; + auto to_remove = file; + to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; - meta::TableFilesSchema update_files = {to_remove, table_file}; - pMeta_->UpdateTableFiles(update_files); + meta::TableFilesSchema update_files = {to_remove, table_file}; + pMeta_->UpdateTableFiles(update_files); - LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size " - << index->PhysicalSize()/(1024*1024) << " M" - << " from file " << to_remove.file_id_; + LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size " + << index->PhysicalSize()/(1024*1024) << " M" + << " from file " << to_remove.file_id_; - index->Cache(); - pMeta_->Archive(); + index->Cache(); + + } catch (std::exception& ex) { + return Status::Error("Build index encounter exception", ex.what()); + } return Status::OK(); } diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 64879a9c9d16dfff3e866f0494017f4ab86756b4..177faeb9c111e826c993c3490d3192ce70c82df0 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -70,8 +70,10 @@ private: const meta::DatesT& dates, QueryResults& results); - void StartTimerTasks(int interval); - void BackgroundTimerTask(int interval); + void StartTimerTasks(); + void BackgroundTimerTask(); + + void StartMetricTask(); void StartCompactionTask(); Status MergeFiles(const std::string& table_id, diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index ecad07835ebc42045d98a4f7972db9a9318c2608..3ae34c1c2e505a752b5349ae1c25ec38eed9c42c 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -186,10 +186,18 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { try { MetricCollector metric; - server::Metrics::GetInstance().MetaAccessTotalIncrement(); if (table_schema.table_id_ == "") { NextTableId(table_schema.table_id_); + } else { + auto table = ConnectorPtr->select(columns(&TableSchema::state_), + where(c(&TableSchema::table_id_) == table_schema.table_id_)); + if (table.size() == 1) { + std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ? + "Table already exists" : "Table already exists and it is in delete state, please wait a second"; + return Status::Error(msg); + } } + table_schema.files_cnt_ = 0; table_schema.id_ = -1; table_schema.created_on_ = utils::GetMicroSecTimeStamp(); @@ -207,8 +215,8 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { auto ret = boost::filesystem::create_directories(table_path); if (!ret) { ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + return Status::Error("Failed to create table path"); } - assert(ret); } } catch (std::exception &e) { @@ -733,10 +741,12 @@ Status DBMetaImpl::Size(uint64_t &result) { } Status DBMetaImpl::DiscardFiles(long to_discard_size) { - LOG(DEBUG) << "About to discard size=" << to_discard_size; if (to_discard_size <= 0) { return Status::OK(); } + + LOG(DEBUG) << "About to discard size=" << to_discard_size; + try { auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::size_), diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 756abea4cdf9ef4e09956e301920c1839dd56f97..fa0dee88fb45ac7110f6ed0563f7215195027904 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -6,7 +6,7 @@ #include "SearchScheduler.h" #include "IndexLoaderQueue.h" -#include "SearchTaskQueue.h" +#include "SearchTask.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "metrics/Metrics.h" diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h index 6e84c43684c6c83913425b72ba726c1f1d74cbf0..673d759056d64e867b175b5fd544939b8a74f170 100644 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -7,7 +7,7 @@ #include "SearchContext.h" #include "IndexLoaderQueue.h" -#include "SearchTaskQueue.h" +#include "SearchTask.h" namespace zilliz { namespace milvus { @@ -35,6 +35,8 @@ private: std::shared_ptr search_thread_; IndexLoaderQueue index_load_queue_; + + using SearchTaskQueue = server::BlockingQueue; SearchTaskQueue search_queue_; bool stopped_ = true; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.cpp b/cpp/src/db/scheduler/SearchTask.cpp similarity index 99% rename from cpp/src/db/scheduler/SearchTaskQueue.cpp rename to cpp/src/db/scheduler/SearchTask.cpp index 819a881f396f15d588384c0f423b49cd5e4b1cb2..45503490d137ff5367987066c9ad81eb25fbc37b 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.cpp +++ b/cpp/src/db/scheduler/SearchTask.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "SearchTaskQueue.h" +#include "SearchTask.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTask.h similarity index 92% rename from cpp/src/db/scheduler/SearchTaskQueue.h rename to cpp/src/db/scheduler/SearchTask.h index e5841cd1d5a1a2f55872a10c2ee46c3758548805..3513c15a3e9a8eebfcc8979302d35ff0fad623e5 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTask.h @@ -27,7 +27,6 @@ public: }; using SearchTaskPtr = std::shared_ptr; -using SearchTaskQueue = server::BlockingQueue; } diff --git a/cpp/src/server/RequestTask.cpp b/cpp/src/server/RequestTask.cpp index 47e79c0b854c51ecc26ad503a085a22acaedbea7..3133e1cd80c4157a32a822a2ff1219243dfeee67 100644 --- a/cpp/src/server/RequestTask.cpp +++ b/cpp/src/server/RequestTask.cpp @@ -221,7 +221,7 @@ ServerError CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema) - : BaseTask(PING_TASK_GROUP), + : BaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) { schema_.table_name = table_name_; @@ -329,7 +329,7 @@ ServerError DeleteTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(std::vector& tables) - : BaseTask(DQL_TASK_GROUP), + : BaseTask(DDL_DML_TASK_GROUP), tables_(tables) { } @@ -451,13 +451,13 @@ SearchVectorTask::SearchVectorTask(const std::string &table_name, const std::vector &query_range_array, const int64_t top_k, std::vector &result_array) - : BaseTask(DQL_TASK_GROUP), - table_name_(table_name), - file_id_array_(file_id_array), - record_array_(query_record_array), - range_array_(query_range_array), - top_k_(top_k), - result_array_(result_array) { + : BaseTask(DQL_TASK_GROUP), + table_name_(table_name), + file_id_array_(file_id_array), + record_array_(query_record_array), + range_array_(query_range_array), + top_k_(top_k), + result_array_(result_array) { } @@ -575,7 +575,7 @@ ServerError SearchVectorTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count) -: BaseTask(DQL_TASK_GROUP), +: BaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) {