diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index c9ebc41d37f60c5dc3587acec07102c86faf0186..f5eb13378d7f2f1f8e4584cfc56c4d6ef2edbad9 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -44,6 +44,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-403 - Add GpuCacheMgr - MS-404 - Release index after search task done avoid memory increment continues - MS-405 - Add delete task support +- MS-407 - Reconstruct MetricsCollector - MS-408 - Add device_id in resource construct function ## New Feature diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 107b4a10605f4c530e1d7232c4e39d7dfb1de33a..3441f1eaab4bed8f896d67cf374d80875bb535af 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -25,7 +25,6 @@ metric_config: is_startup: off # if monitoring start: on, off collector: prometheus # metrics collector: prometheus prometheus_config: # following are prometheus configure - collect_type: pull # prometheus collect data method port: 8080 # the port prometheus use to fetch metrics push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address push_gateway_port: 9091 # push method configure: push gateway port diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 6b0b87889ce208cf7b57208d2be729c23f2925ef..4299801cb9de33793ba09c7754259acb52784ffe 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace zilliz { namespace milvus { @@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; -void CollectInsertMetrics(double total_time, size_t n, bool succeed) { - double avg_time = total_time / n; - for (int i = 0; i < n; ++i) { - server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time); - } - -// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); - if (succeed) { - server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n); - server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n); - } - else { - server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n); - server::Metrics::GetInstance().AddVectorsFailGaugeSet(n); - } -} - -void CollectQueryMetrics(double total_time, size_t nq) { - for (int i = 0; i < nq; ++i) { - server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time); - } - auto average_time = total_time / nq; - server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq); - server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time); -} - } @@ -166,29 +141,23 @@ Status DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vectors, IDNumbers& vector_ids_) { ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache"; - auto start_time = METRICS_NOW_TIME; - Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_); - auto end_time = METRICS_NOW_TIME; - double total_time = METRICS_MICROSECONDS(start_time,end_time); + Status status; + zilliz::milvus::server::CollectInsertMetrics metrics(n, status); + status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_); // std::chrono::microseconds time_span = std::chrono::duration_cast(end_time - start_time); // double average_time = double(time_span.count()) / n; ENGINE_LOG_DEBUG << "Insert vectors to cache finished"; - CollectInsertMetrics(total_time, n, status.ok()); return status; - } Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, QueryResults &results) { - auto start_time = METRICS_NOW_TIME; + server::CollectQueryMetrics metrics(nq); + meta::DatesT dates = {meta::Meta::GetDate()}; Status result = Query(table_id, k, nq, nprobe, vectors, dates, results); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time,end_time); - - CollectQueryMetrics(total_time, nq); return result; } @@ -255,7 +224,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { - auto start_time = METRICS_NOW_TIME; + server::CollectQueryMetrics metrics(nq); + server::TimeRecorder rc(""); //step 1: get files to search @@ -298,11 +268,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch results = context->GetResult(); rc.ElapseFromBegin("Engine query totally cost"); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time,end_time); - - CollectQueryMetrics(total_time, nq); - return Status::OK(); } @@ -422,14 +387,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, long index_size = 0; for (auto& file : files) { + server::CollectMergeFilesMetrics metrics; - auto start_time = METRICS_NOW_TIME; index->Merge(file.location_); auto file_schema = file; - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time,end_time); - server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time); - file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; @@ -645,11 +606,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { std::shared_ptr index; try { - auto start_time = METRICS_NOW_TIME; + server::CollectBuildIndexMetrics metrics; index = to_index->BuildIndex(table_file.location_); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); } catch (std::exception& ex) { //typical error: out of gpu memory std::string msg = "BuildIndex encounter exception" + std::string(ex.what()); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index deaf864e94d7f35af4f39d94355183107ce8fee4..2952d70ef43b9203987bff7816ea98b9c7ca6c50 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -118,9 +118,10 @@ Status ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); - auto start_time = METRICS_NOW_TIME; if (!index_) { try { + double physical_size = PhysicalSize(); + server::CollectExecutionEngineMetrics metrics(physical_size); index_ = read_index(location_); ENGINE_LOG_DEBUG << "Disk io from: " << location_; } catch (knowhere::KnowhereException &e) { @@ -133,14 +134,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) { if (!already_in_cache && to_cache) { Cache(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - - server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); - double physical_size = PhysicalSize(); - - server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size); - server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time)); } return Status::OK(); } @@ -148,7 +141,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index_ != nullptr); - auto start_time = METRICS_NOW_TIME; if (!index_) { try { index_ = index_->CopyToGpu(device_id); @@ -163,12 +155,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { if (!already_in_cache) { GpuCache(device_id); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - double physical_size = PhysicalSize(); - - server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); - server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size); } return Status::OK(); @@ -177,7 +163,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { Status ExecutionEngineImpl::CopyToCpu() { index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); - auto start_time = METRICS_NOW_TIME; if (!index_) { try { index_ = index_->CopyToCpu(); @@ -192,14 +177,7 @@ Status ExecutionEngineImpl::CopyToCpu() { if(!already_in_cache) { Cache(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - double physical_size = PhysicalSize(); - - server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); - server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size); } - return Status::OK(); } @@ -212,6 +190,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) { auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location); if (!to_merge) { try { + double physical_size = server::CommonUtil::GetFileSize(location); + server::CollectExecutionEngineMetrics metrics(physical_size); to_merge = read_index(location); } catch (knowhere::KnowhereException &e) { ENGINE_LOG_ERROR << e.what(); diff --git a/cpp/src/db/insert/MemTableFile.cpp b/cpp/src/db/insert/MemTableFile.cpp index f8f79c8618bcf1afafbc7258da24b720e89c13d1..99f2e19b35d0b24f5050d309af0d6a319652eee2 100644 --- a/cpp/src/db/insert/MemTableFile.cpp +++ b/cpp/src/db/insert/MemTableFile.cpp @@ -80,20 +80,13 @@ bool MemTableFile::IsFull() { } Status MemTableFile::Serialize() { - - auto start_time = METRICS_NOW_TIME; - - auto size = GetCurrentMem(); + size_t size = GetCurrentMem(); + server::CollectSerializeMetrics metrics(size); execution_engine_->Serialize(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - table_file_schema_.file_size_ = execution_engine_->PhysicalSize(); table_file_schema_.row_count_ = execution_engine_->Count(); - server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time); - table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; diff --git a/cpp/src/db/insert/VectorSource.cpp b/cpp/src/db/insert/VectorSource.cpp index 27385b4b230303bdf9a8c7877648410ce71c4f4a..f7d6a48297f89745f129caae1cbdec48d40472b9 100644 --- a/cpp/src/db/insert/VectorSource.cpp +++ b/cpp/src/db/insert/VectorSource.cpp @@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, size_t &num_vectors_added, IDNumbers &vector_ids) { - auto start_time = METRICS_NOW_TIME; + server::CollectAddMetrics metrics(n_, table_file_schema.dimension_); num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; @@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), - static_cast(table_file_schema.dimension_), - total_time); - return status; } diff --git a/cpp/src/db/meta/MySQLMetaImpl.cpp b/cpp/src/db/meta/MySQLMetaImpl.cpp index 3f85b3503f4600db2c32f27011dbc555fcd8efd3..6e952642bddb0cf4046e0dfad73c4b0aa54bd1ff 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.cpp +++ b/cpp/src/db/meta/MySQLMetaImpl.cpp @@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) { return Status::DBTransactionError(desc, e.what()); } -class MetricCollector { - public: - MetricCollector() { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - start_time_ = METRICS_NOW_TIME; - } - - ~MetricCollector() { - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time_, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - } - - private: - using TIME_POINT = std::chrono::system_clock::time_point; - TIME_POINT start_time_; -}; - } Status MySQLMetaImpl::NextTableId(std::string &table_id) { @@ -273,7 +255,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -391,7 +373,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) { Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -458,7 +440,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -498,7 +480,7 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -545,7 +527,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -601,7 +583,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) { Status MySQLMetaImpl::DeleteTable(const std::string &table_id) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -644,7 +626,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) { Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -683,7 +665,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) { Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -738,7 +720,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -778,7 +760,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { Status MySQLMetaImpl::AllTables(std::vector &table_schema_array) { try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -843,7 +825,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { } try { - MetricCollector metric; + server::MetricCollector metric; NextFileId(file_schema.file_id_); file_schema.dimension_ = table_schema.dimension_; @@ -911,7 +893,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -994,7 +976,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1108,7 +1090,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; StoreQueryResult res; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1218,7 +1200,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; //check table existence TableSchema table_schema; @@ -1498,7 +1480,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) { ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; try { - MetricCollector metric; + server::MetricCollector metric; bool status; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1570,7 +1552,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1684,7 +1666,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) { Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1774,7 +1756,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove to_delete files try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1853,7 +1835,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove to_delete tables try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1911,7 +1893,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove deleted table folder //don't remove table folder until all its files has been deleted try { - MetricCollector metric; + server::MetricCollector metric; { ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); @@ -1996,7 +1978,7 @@ Status MySQLMetaImpl::CleanUp() { Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { try { - MetricCollector metric; + server::MetricCollector metric; TableSchema table_schema; table_schema.table_id_ = table_id; diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index 0132fa87ceb73e471d5a8a7da178cb9b8a05d439..66346cf6c20a2d1d7af995047dcbe01b9df28268 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) { return Status::DBTransactionError(desc, e.what()); } -class MetricCollector { -public: - MetricCollector() { - server::Metrics::GetInstance().MetaAccessTotalIncrement(); - start_time_ = METRICS_NOW_TIME; - } - - ~MetricCollector() { - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time_, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - } - -private: - using TIME_POINT = std::chrono::system_clock::time_point; - TIME_POINT start_time_; -}; - } inline auto StoragePrototype(const std::string &path) { @@ -171,7 +153,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id, Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -213,7 +195,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) { Status SqliteMetaImpl::DeleteTable(const std::string& table_id) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -237,7 +219,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) { Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -262,7 +244,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) { Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { try { - MetricCollector metric; + server::MetricCollector metric; auto groups = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::state_, @@ -353,7 +335,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -405,7 +387,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { try { - MetricCollector metric; + server::MetricCollector metric; //set all backup file to raw ConnectorPtr->update_all( @@ -426,7 +408,7 @@ Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) { try { - MetricCollector metric; + server::MetricCollector metric; auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_, &TableSchema::nlist_, @@ -453,7 +435,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -491,7 +473,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { has_or_not = false; try { - MetricCollector metric; + server::MetricCollector metric; auto tables = ConnectorPtr->select(columns(&TableSchema::id_), where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); @@ -510,7 +492,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { Status SqliteMetaImpl::AllTables(std::vector& table_schema_array) { try { - MetricCollector metric; + server::MetricCollector metric; auto selected = ConnectorPtr->select(columns(&TableSchema::id_, &TableSchema::table_id_, @@ -556,7 +538,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { } try { - MetricCollector metric; + server::MetricCollector metric; NextFileId(file_schema.file_id_); file_schema.dimension_ = table_schema.dimension_; @@ -587,7 +569,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, @@ -645,7 +627,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; if (partition.empty()) { std::vector file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; @@ -745,7 +727,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) { files.clear(); - MetricCollector metric; + server::MetricCollector metric; try { auto select_columns = columns(&TableFileSchema::id_, @@ -822,7 +804,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id, files.clear(); try { - MetricCollector metric; + server::MetricCollector metric; //check table existence TableSchema table_schema; @@ -996,7 +978,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) { ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1053,7 +1035,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) { Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1079,7 +1061,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1101,7 +1083,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1150,7 +1132,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove to_delete files try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1194,7 +1176,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove to_delete tables try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1224,7 +1206,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { //remove deleted table folder //don't remove table folder until all its files has been deleted try { - MetricCollector metric; + server::MetricCollector metric; for(auto& table_id : table_ids) { auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_), @@ -1243,7 +1225,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { Status SqliteMetaImpl::CleanUp() { try { - MetricCollector metric; + server::MetricCollector metric; //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); @@ -1274,7 +1256,7 @@ Status SqliteMetaImpl::CleanUp() { Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) { try { - MetricCollector metric; + server::MetricCollector metric; std::vector file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_), diff --git a/cpp/src/db/scheduler/task/SearchTask.cpp b/cpp/src/db/scheduler/task/SearchTask.cpp index 70802e386d1bd523933846a7760058a640261ba4..061189c4f630d3671d4266debe8f21a78627fb6c 100644 --- a/cpp/src/db/scheduler/task/SearchTask.cpp +++ b/cpp/src/db/scheduler/task/SearchTask.cpp @@ -59,23 +59,6 @@ void ParallelReduce(std::function& reduce_function, size_t } } -void CollectDurationMetrics(int index_type, double total_time) { - switch(index_type) { - case meta::TableFileSchema::RAW: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - case meta::TableFileSchema::TO_INDEX: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - default: { - server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); - break; - } - } -} - } SearchTask::SearchTask() @@ -92,7 +75,7 @@ std::shared_ptr SearchTask::Execute() { server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); - auto start_time = METRICS_NOW_TIME; + server::CollectSearchTaskMetrics metrics(file_type_); bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2); @@ -137,10 +120,6 @@ std::shared_ptr SearchTask::Execute() { context->IndexSearchDone(index_id_); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - CollectDurationMetrics(file_type_, total_time); - rc.ElapseFromBegin("totally cost"); return nullptr; diff --git a/cpp/src/metrics/Metrics.h b/cpp/src/metrics/Metrics.h index 65df7140cc6d4e8259a8b61d92fd278a8cdfa62c..48f9f2b1118409a1e403a0618bf74ba8013095df 100644 --- a/cpp/src/metrics/Metrics.h +++ b/cpp/src/metrics/Metrics.h @@ -6,6 +6,7 @@ #pragma once #include "MetricBase.h" +#include "db/meta/MetaTypes.h" namespace zilliz { @@ -29,6 +30,233 @@ class Metrics { static MetricsBase &CreateMetricsCollector(); }; +class CollectInsertMetrics { +public: + CollectInsertMetrics(size_t n, engine::Status& status) : n_(n), status_(status) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectInsertMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + double avg_time = total_time / n_; + for (int i = 0; i < n_; ++i) { + Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time); + } + + // server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); + if (status_.ok()) { + server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_); + server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_); + } + else { + server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_); + server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_); + } + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + size_t n_; + engine::Status& status_; +}; + +class CollectQueryMetrics { +public: + CollectQueryMetrics(size_t nq) : nq_(nq) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectQueryMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + for (int i = 0; i < nq_; ++i) { + server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time); + } + auto average_time = total_time / nq_; + server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_); + server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time); + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + size_t nq_; +}; + +class CollectMergeFilesMetrics { +public: + CollectMergeFilesMetrics() { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectMergeFilesMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time); + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; +}; + +class CollectBuildIndexMetrics { +public: + CollectBuildIndexMetrics() { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectBuildIndexMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); + } +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; +}; + +class CollectExecutionEngineMetrics { +public: + CollectExecutionEngineMetrics(double physical_size) : physical_size_(physical_size) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectExecutionEngineMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + + server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); + + server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_); + server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time)); + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + double physical_size_; +}; + +class CollectSerializeMetrics { +public: + CollectSerializeMetrics(size_t size) : size_(size) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectSerializeMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time); + } +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + size_t size_; +}; + +class CollectAddMetrics { +public: + CollectAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectAddMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), + static_cast(dimension_), + total_time); + } +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + size_t n_; + uint16_t dimension_; +}; + +class CollectDurationMetrics { +public: + CollectDurationMetrics(int index_type) : index_type_(index_type) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectDurationMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + switch (index_type_) { + case engine::meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + case engine::meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + default: { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + break; + } + } + } +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + int index_type_; +}; + +class CollectSearchTaskMetrics { +public: + CollectSearchTaskMetrics(int index_type) : index_type_(index_type) { + start_time_ = METRICS_NOW_TIME; + } + + ~CollectSearchTaskMetrics() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + switch(index_type_) { + case engine::meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + case engine::meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + default: { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + break; + } + } + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + int index_type_; +}; + +class MetricCollector { +public: + MetricCollector() { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + start_time_ = METRICS_NOW_TIME; + } + + ~MetricCollector() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + +private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; +}; + + } } diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 992e915f23e4b225a47e4c4524f4685f9bc995fa..b387e2de3b1a02e4ef976903b2b946a9bab768d2 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) { } } -void -CollectDurationMetrics(int index_type, double total_time) { - switch (index_type) { - case meta::TableFileSchema::RAW: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - case meta::TableFileSchema::TO_INDEX: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - default: { - server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); - break; - } - } -} - XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) { index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, @@ -159,7 +141,7 @@ XSearchTask::Execute() { server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); - auto start_time = METRICS_NOW_TIME; + server::CollectDurationMetrics metrics(index_type_); std::vector output_ids; std::vector output_distence; @@ -202,10 +184,6 @@ XSearchTask::Execute() { context->IndexSearchDone(index_id_); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - CollectDurationMetrics(index_type_, total_time); - rc.ElapseFromBegin("totally cost"); // release index in resource diff --git a/cpp/unittest/metrics/metrics_test.cpp b/cpp/unittest/metrics/metrics_test.cpp index a6301e05e1a95cf70fe08bd09ad840ad01c56379..c1c674f12b3e1436cd4bd8d83e28bd3805a8d80d 100644 --- a/cpp/unittest/metrics/metrics_test.cpp +++ b/cpp/unittest/metrics/metrics_test.cpp @@ -119,4 +119,33 @@ TEST_F(MetricTest, Metric_Tes) { delete [] qxb; }; +TEST_F(MetricTest, Collector_Metrics_Test){ + engine::Status status = engine::Status::OK(); + server::CollectInsertMetrics insert_metrics0(0, status); + status = engine::Status::Error("error"); + server::CollectInsertMetrics insert_metrics1(0, status); + + server::CollectQueryMetrics query_metrics(10); + + server::CollectMergeFilesMetrics merge_metrics(); + + server::CollectBuildIndexMetrics build_index_metrics(); + + server::CollectExecutionEngineMetrics execution_metrics(10); + + server::CollectSerializeMetrics serialize_metrics(10); + + server::CollectAddMetrics add_metrics(10, 128); + + server::CollectDurationMetrics duration_metrics_raw(engine::meta::TableFileSchema::RAW); + server::CollectDurationMetrics duration_metrics_index(engine::meta::TableFileSchema::TO_INDEX); + server::CollectDurationMetrics duration_metrics_delete(engine::meta::TableFileSchema::TO_DELETE); + + server::CollectSearchTaskMetrics search_metrics_raw(engine::meta::TableFileSchema::RAW); + server::CollectSearchTaskMetrics search_metrics_index(engine::meta::TableFileSchema::TO_INDEX); + server::CollectSearchTaskMetrics search_metrics_delete(engine::meta::TableFileSchema::TO_DELETE); + + server::MetricCollector metric_collector(); +} + diff --git a/cpp/unittest/server/appendix/server_config.yaml b/cpp/unittest/server/appendix/server_config.yaml index 0f09d570f165f9e5701252c6245b308d20dc9b40..7faabee2240e353f2cfcfb3efa0fc7419c93edb0 100644 --- a/cpp/unittest/server/appendix/server_config.yaml +++ b/cpp/unittest/server/appendix/server_config.yaml @@ -18,7 +18,6 @@ metric_config: is_startup: off # if monitoring start: on, off collector: prometheus # metrics collector: prometheus prometheus_config: # following are prometheus configure - collect_type: pull # prometheus collect data method port: 8080 # the port prometheus use to fetch metrics push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address push_gateway_port: 9091 # push method configure: push gateway port