From a95b29aa27512b723160633279fce17392eb4c9e Mon Sep 17 00:00:00 2001 From: yu yunfeng Date: Tue, 28 May 2019 15:06:17 +0800 Subject: [PATCH] add metrics without prometheus Former-commit-id: e158ea586681abb5ec44bdc9025ec27e3aa21567 --- cpp/src/cache/CacheMgr.cpp | 8 +- cpp/src/db/DBImpl.cpp | 34 +-- cpp/src/db/DBMetaImpl.cpp | 42 +-- cpp/src/db/FaissExecutionEngine.cpp | 6 +- cpp/src/metrics/Metrics.cpp | 35 ++- cpp/src/metrics/Metrics.h | 445 +--------------------------- cpp/src/metrics/PrometheusMetrics.h | 2 +- cpp/src/server/RocksIdMapper.cpp | 179 +++++++---- cpp/src/server/RocksIdMapper.h | 26 +- cpp/src/server/VecIdMapper.cpp | 14 + cpp/src/server/VecIdMapper.h | 6 + 11 files changed, 245 insertions(+), 552 deletions(-) diff --git a/cpp/src/cache/CacheMgr.cpp b/cpp/src/cache/CacheMgr.cpp index b90b059e..24b4bf59 100644 --- a/cpp/src/cache/CacheMgr.cpp +++ b/cpp/src/cache/CacheMgr.cpp @@ -38,7 +38,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) { if(cache_ == nullptr) { return nullptr; } - METRICS_INSTANCE.CacheAccessTotalIncrement(); + server::Metrics::GetInstance().CacheAccessTotalIncrement(); return cache_->get(key); } @@ -57,7 +57,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) { } cache_->insert(key, data); - METRICS_INSTANCE.CacheAccessTotalIncrement(); + server::Metrics::GetInstance().CacheAccessTotalIncrement(); } void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) { @@ -67,7 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index DataObjPtr obj = std::make_shared(index); cache_->insert(key, obj); - METRICS_INSTANCE.CacheAccessTotalIncrement(); + server::Metrics::GetInstance().CacheAccessTotalIncrement(); } void CacheMgr::EraseItem(const std::string& key) { @@ -76,7 +76,7 @@ void CacheMgr::EraseItem(const std::string& key) { } cache_->erase(key); - METRICS_INSTANCE.CacheAccessTotalIncrement(); + server::Metrics::GetInstance().CacheAccessTotalIncrement(); } void CacheMgr::PrintInfo() { diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 175feafd..9f2a6cfc 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -112,16 +112,16 @@ Status DBImpl::add_vectors(const std::string& group_id_, double total_time = METRICS_MICROSECONDS(start_time,end_time); double avg_time = total_time / n; for (int i = 0; i < n; ++i) { - METRICS_INSTANCE.AddVectorsDurationHistogramOberve(avg_time); + server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time); } // server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); if (!status.ok()) { - METRICS_INSTANCE.AddVectorsFailTotalIncrement(n); + server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n); return status; } - METRICS_INSTANCE.AddVectorsSuccessTotalIncrement(n); + server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n); } template @@ -202,22 +202,22 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq, auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); if(file.file_type == meta::GroupFileSchema::RAW) { - METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time); - METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024); - METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024); - METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024); + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024); } else if(file.file_type == meta::GroupFileSchema::TO_INDEX) { - METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time); - METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024); - METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024); - METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024); + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024); } else { - METRICS_INSTANCE.SearchIndexDataDurationSecondsHistogramObserve(total_time); - METRICS_INSTANCE.IndexFileSizeHistogramObserve(file_size*1024*1024); - METRICS_INSTANCE.IndexFileSizeTotalIncrement(file_size*1024*1024); - METRICS_INSTANCE.IndexFileSizeGaugeSet(file_size*1024*1024); + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size*1024*1024); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size*1024*1024); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size*1024*1024); } cluster(output_ids, output_distence, inner_k); // cluster to each query memset(output_distence, 0, k * nq * sizeof(float)); @@ -357,7 +357,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::Dat auto file_schema = file; auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MemTableMergeDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time); file_schema.file_type = meta::GroupFileSchema::TO_DELETE; updated.push_back(file_schema); @@ -435,7 +435,7 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) { auto index = to_index.BuildIndex(group_file.location); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); group_file.file_type = meta::GroupFileSchema::INDEX; group_file.size = index->Size(); diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 9f35bc09..fe02a4f2 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -150,14 +150,14 @@ Status DBMetaImpl::delete_group_partitions(const std::string& group_id, } Status DBMetaImpl::add_group(GroupSchema& group_info) { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); if (group_info.group_id == "") { NextGroupId(group_info.group_id); } group_info.files_cnt = 0; group_info.id = -1; group_info.created_on = utils::GetMicroSecTimeStamp(); - auto start_time = MERTICS_NOW_TIME; + auto start_time = METRICS_NOW_TIME; { try { auto id = ConnectorPtr->insert(group_info); @@ -168,7 +168,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) { } auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); auto group_path = GetGroupPath(group_info.group_id); @@ -189,7 +189,7 @@ Status DBMetaImpl::get_group(GroupSchema& group_info) { Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto groups = ConnectorPtr->select(columns(&GroupSchema::id, &GroupSchema::group_id, @@ -198,7 +198,7 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { where(c(&GroupSchema::group_id) == group_info.group_id)); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); assert(groups.size() <= 1); if (groups.size() == 1) { group_info.id = std::get<0>(groups[0]); @@ -217,13 +217,13 @@ Status DBMetaImpl::get_group_no_lock(GroupSchema& group_info) { Status DBMetaImpl::has_group(const std::string& group_id, bool& has_or_not) { try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto groups = ConnectorPtr->select(columns(&GroupSchema::id), where(c(&GroupSchema::group_id) == group_id)); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); assert(groups.size() <= 1); if (groups.size() == 1) { has_or_not = true; @@ -258,12 +258,12 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) { { try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto id = ConnectorPtr->insert(group_file); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); group_file.id = id; } catch (...) { return Status::DBTransactionError("Add file Error"); @@ -287,7 +287,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { files.clear(); try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time =METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, @@ -298,7 +298,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) { where(c(&GroupFileSchema::file_type) == (int)GroupFileSchema::TO_INDEX)); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); std::map groups; @@ -340,7 +340,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, const DatesT& dates = (partition.empty() == true) ? today : partition; try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, @@ -355,7 +355,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id, c(&GroupFileSchema::file_type) == (int) GroupFileSchema::INDEX))); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); @@ -392,7 +392,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, files.clear(); try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::id, &GroupFileSchema::group_id, @@ -404,7 +404,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id, c(&GroupFileSchema::group_id) == group_id)); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); @@ -592,12 +592,12 @@ Status DBMetaImpl::discard_files_of_size(long to_discard_size) { Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { group_file.updated_time = utils::GetMicroSecTimeStamp(); try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; ConnectorPtr->update(group_file); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); } catch (std::exception & e) { LOG(DEBUG) << e.what(); LOG(DEBUG) << "id= " << group_file.id << " file_id=" << group_file.file_id; @@ -608,7 +608,7 @@ Status DBMetaImpl::update_group_file(GroupFileSchema& group_file) { Status DBMetaImpl::update_files(GroupFilesSchema& files) { try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto commited = ConnectorPtr->transaction([&] () mutable { for (auto& file : files) { @@ -617,7 +617,7 @@ Status DBMetaImpl::update_files(GroupFilesSchema& files) { } auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); return true; }); if (!commited) { @@ -706,7 +706,7 @@ Status DBMetaImpl::cleanup() { Status DBMetaImpl::count(const std::string& group_id, long& result) { try { - METRICS_INSTANCE.MetaAccessTotalIncrement(); + server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; auto selected = ConnectorPtr->select(columns(&GroupFileSchema::size, &GroupFileSchema::date), @@ -716,7 +716,7 @@ Status DBMetaImpl::count(const std::string& group_id, long& result) { c(&GroupFileSchema::group_id) == group_id)); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time,end_time); - METRICS_INSTANCE.MetaAccessDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); GroupSchema group_info; group_info.group_id = group_id; auto status = get_group_no_lock(group_info); diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index e8f21996..70c204be 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -79,12 +79,12 @@ Status FaissExecutionEngine::Load() { auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - METRICS_INSTANCE.FaissDiskLoadDurationSecondsHistogramObserve(total_time); + server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time); double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4; - METRICS_INSTANCE.FaissDiskLoadSizeBytesHistogramObserve(total_size); - METRICS_INSTANCE.FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time)); + server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size); + server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time)); } return Status::OK(); diff --git a/cpp/src/metrics/Metrics.cpp b/cpp/src/metrics/Metrics.cpp index 4d0dd507..ee823b3c 100644 --- a/cpp/src/metrics/Metrics.cpp +++ b/cpp/src/metrics/Metrics.cpp @@ -10,22 +10,27 @@ namespace zilliz { namespace vecwise { namespace server { -ServerError -PrometheusMetrics::Init() { - ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC); - startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false; - // Following should be read from config file. - const std::string bind_address = "8080"; - const std::string uri = std::string("/metrics"); - const std::size_t num_threads = 2; - - // Init Exposer - exposer_ptr_ = std::make_shared(bind_address, uri, num_threads); - - // Exposer Registry - exposer_ptr_->RegisterCollectable(registry_); +MetricsBase & +Metrics::CreateMetricsCollector(MetricCollectorType collector_type) { + switch (collector_type) { + case MetricCollectorType::PROMETHEUS: +// static PrometheusMetrics instance = PrometheusMetrics::GetInstance(); + return MetricsBase::GetInstance(); + default:return MetricsBase::GetInstance(); + } +} - return SERVER_SUCCESS; +MetricsBase & +Metrics::GetInstance() { + ConfigNode &config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC); + std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR); + if (collector_typr_str == "prometheus") { + return CreateMetricsCollector(MetricCollectorType::PROMETHEUS); + } else if (collector_typr_str == "zabbix") { + return CreateMetricsCollector(MetricCollectorType::ZABBIX); + } else { + return CreateMetricsCollector(MetricCollectorType::INVALID); + } } } diff --git a/cpp/src/metrics/Metrics.h b/cpp/src/metrics/Metrics.h index 666a9793..cce69515 100644 --- a/cpp/src/metrics/Metrics.h +++ b/cpp/src/metrics/Metrics.h @@ -10,455 +10,34 @@ #include -#include -#include -#include "server/ServerConfig.h" - -#define METRICS_NOW_TIME std::chrono::system_clock::now() -#define METRICS_INSTANCE server::GetInstance() -#define METRICS_MICROSECONDS(a,b) (std::chrono::duration_cast (b-a)).count(); +#pragma once +#include "MetricBase.h" +//#include "PrometheusMetrics.h" namespace zilliz { namespace vecwise { namespace server { -class MetricsBase{ - public: - static MetricsBase& - GetInstance(){ - static MetricsBase instance; - return instance; - } - - virtual ServerError Init() {}; - virtual void AddGroupSuccessTotalIncrement(double value = 1) {}; - virtual void AddGroupFailTotalIncrement(double value = 1) {}; - virtual void HasGroupSuccessTotalIncrement(double value = 1) {}; - virtual void HasGroupFailTotalIncrement(double value = 1) {}; - virtual void GetGroupSuccessTotalIncrement(double value = 1) {}; - virtual void GetGroupFailTotalIncrement(double value = 1) {}; - virtual void GetGroupFilesSuccessTotalIncrement(double value = 1) {}; - virtual void GetGroupFilesFailTotalIncrement(double value = 1) {}; - virtual void AddVectorsSuccessTotalIncrement(double value = 1) {}; - virtual void AddVectorsFailTotalIncrement(double value = 1) {}; - virtual void AddVectorsDurationHistogramOberve(double value) {}; - virtual void SearchSuccessTotalIncrement(double value = 1) {}; - virtual void SearchFailTotalIncrement(double value = 1) {}; - virtual void SearchDurationHistogramObserve(double value) {}; - virtual void RawFileSizeHistogramObserve(double value) {}; - virtual void IndexFileSizeHistogramObserve(double value) {}; - virtual void BuildIndexDurationSecondsHistogramObserve(double value) {}; - virtual void AllBuildIndexDurationSecondsHistogramObserve(double value) {}; - virtual void CacheUsageGaugeIncrement(double value = 1) {}; - virtual void CacheUsageGaugeDecrement(double value = 1) {}; - virtual void CacheUsageGaugeSet(double value) {}; - virtual void MetaVisitTotalIncrement(double value = 1) {}; - virtual void MetaVisitDurationSecondsHistogramObserve(double value) {}; - virtual void MemUsagePercentGaugeSet(double value) {}; - virtual void MemUsagePercentGaugeIncrement(double value = 1) {}; - virtual void MemUsagePercentGaugeDecrement(double value = 1) {}; - virtual void MemUsageTotalGaugeSet(double value) {}; - virtual void MemUsageTotalGaugeIncrement(double value = 1) {}; - virtual void MemUsageTotalGaugeDecrement(double value = 1) {}; - virtual void MetaAccessTotalIncrement(double value = 1) {}; - virtual void MetaAccessDurationSecondsHistogramObserve(double value) {}; - virtual void FaissDiskLoadDurationSecondsHistogramObserve(double value) {}; - virtual void FaissDiskLoadSizeBytesHistogramObserve(double value) {}; - virtual void FaissDiskLoadIOSpeedHistogramObserve(double value) {}; - virtual void CacheAccessTotalIncrement(double value = 1) {}; - virtual void MemTableMergeDurationSecondsHistogramObserve(double value) {}; - virtual void SearchIndexDataDurationSecondsHistogramObserve(double value) {}; - virtual void SearchRawDataDurationSecondsHistogramObserve(double value) {}; - virtual void IndexFileSizeTotalIncrement(double value = 1) {}; - virtual void RawFileSizeTotalIncrement(double value = 1) {}; - virtual void IndexFileSizeGaugeSet(double value) {}; - virtual void RawFileSizeGaugeSet(double value) {}; - - - -}; +#define METRICS_NOW_TIME std::chrono::system_clock::now() +//#define server::Metrics::GetInstance() server::Metrics::GetInstance() +#define METRICS_MICROSECONDS(a, b) (std::chrono::duration_cast (b-a)).count(); -enum class MetricCollectorType{ +enum class MetricCollectorType { INVALID, PROMETHEUS, ZABBIX }; - - -class PrometheusMetrics: public MetricsBase { - - public: - static PrometheusMetrics & - GetInstance() { -// switch(MetricCollectorType) { -// case: prometheus:: -// static -// } - static PrometheusMetrics instance; - return instance; - } - - ServerError - Init(); - - private: - std::shared_ptr exposer_ptr_; - std::shared_ptr registry_ = std::make_shared(); - bool startup_ = false; +class Metrics { public: + static MetricsBase & + CreateMetricsCollector(MetricCollectorType collector_type); - void AddGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_group_success_total_.Increment(value);}; - void AddGroupFailTotalIncrement(double value = 1.0) override { if(startup_) add_group_fail_total_.Increment(value);}; - void HasGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) has_group_success_total_.Increment(value);}; - void HasGroupFailTotalIncrement(double value = 1.0) override { if(startup_) has_group_fail_total_.Increment(value);}; - void GetGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_success_total_.Increment(value);}; - void GetGroupFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_fail_total_.Increment(value);}; - void GetGroupFilesSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_success_total_.Increment(value);}; - void GetGroupFilesFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_fail_total_.Increment(value);}; - void AddVectorsSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_success_total_.Increment(value);}; - void AddVectorsFailTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_fail_total_.Increment(value);}; - void AddVectorsDurationHistogramOberve(double value) override { if(startup_) add_vectors_duration_histogram_.Observe(value);}; - void SearchSuccessTotalIncrement(double value = 1.0) override { if(startup_) search_success_total_.Increment(value);}; - void SearchFailTotalIncrement(double value = 1.0) override { if(startup_) search_fail_total_.Increment(value); }; - void SearchDurationHistogramObserve(double value) override { if(startup_) search_duration_histogram_.Observe(value);}; - void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);}; - void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);}; - void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);}; - void AllBuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) all_build_index_duration_seconds_histogram_.Observe(value);}; - void CacheUsageGaugeIncrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Increment(value);}; - void CacheUsageGaugeDecrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Decrement(value);}; - void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);}; -// void MetaVisitTotalIncrement(double value = 1.0) override { meta_visit_total_.Increment(value);}; -// void MetaVisitDurationSecondsHistogramObserve(double value) override { meta_visit_duration_seconds_histogram_.Observe(value);}; - void MemUsagePercentGaugeSet(double value) override { if(startup_) mem_usage_percent_gauge_.Set(value);}; - void MemUsagePercentGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Increment(value);}; - void MemUsagePercentGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Decrement(value);}; - void MemUsageTotalGaugeSet(double value) override { if(startup_) mem_usage_total_gauge_.Set(value);}; - void MemUsageTotalGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Increment(value);}; - void MemUsageTotalGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Decrement(value);}; - - void MetaAccessTotalIncrement(double value = 1) { if(startup_) meta_access_total_.Increment(value);}; - void MetaAccessDurationSecondsHistogramObserve(double value) { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);}; - - void FaissDiskLoadDurationSecondsHistogramObserve(double value) { if(startup_) faiss_disk_load_duration_seconds_histogram_.Observe(value);}; - void FaissDiskLoadSizeBytesHistogramObserve(double value) { if(startup_) faiss_disk_load_size_bytes_histogram_.Observe(value);}; - void FaissDiskLoadIOSpeedHistogramObserve(double value) { if(startup_) faiss_disk_load_IO_speed_histogram_.Observe(value);}; - - void CacheAccessTotalIncrement(double value = 1) { if(startup_) cache_access_total_.Increment(value);}; - void MemTableMergeDurationSecondsHistogramObserve(double value) { if(startup_) mem_table_merge_duration_seconds_histogram_.Observe(value);}; - void SearchIndexDataDurationSecondsHistogramObserve(double value) { if(startup_) search_index_data_duration_seconds_histogram_.Observe(value);}; - void SearchRawDataDurationSecondsHistogramObserve(double value) { if(startup_) search_raw_data_duration_seconds_histogram_.Observe(value);}; - void IndexFileSizeTotalIncrement(double value = 1) { if(startup_) index_file_size_total_.Increment(value);}; - void RawFileSizeTotalIncrement(double value = 1) { if(startup_) raw_file_size_total_.Increment(value);}; - void IndexFileSizeGaugeSet(double value) { if(startup_) index_file_size_gauge_.Set(value);}; - void RawFileSizeGaugeSet(double value) { if(startup_) raw_file_size_gauge_.Set(value);}; - - - - - -// prometheus::Counter &connection_total() {return connection_total_; } -// -// prometheus::Counter &add_group_success_total() { return add_group_success_total_; } -// prometheus::Counter &add_group_fail_total() { return add_group_fail_total_; } -// -// prometheus::Counter &get_group_success_total() { return get_group_success_total_;} -// prometheus::Counter &get_group_fail_total() { return get_group_fail_total_;} -// -// prometheus::Counter &has_group_success_total() { return has_group_success_total_;} -// prometheus::Counter &has_group_fail_total() { return has_group_fail_total_;} -// -// prometheus::Counter &get_group_files_success_total() { return get_group_files_success_total_;}; -// prometheus::Counter &get_group_files_fail_total() { return get_group_files_fail_total_;} -// -// prometheus::Counter &add_vectors_success_total() { return add_vectors_success_total_; } -// prometheus::Counter &add_vectors_fail_total() { return add_vectors_fail_total_; } -// -// prometheus::Histogram &add_vectors_duration_histogram() { return add_vectors_duration_histogram_;} -// -// prometheus::Counter &search_success_total() { return search_success_total_; } -// prometheus::Counter &search_fail_total() { return search_fail_total_; } -// -// prometheus::Histogram &search_duration_histogram() { return search_duration_histogram_; } -// prometheus::Histogram &raw_files_size_histogram() { return raw_files_size_histogram_; } -// prometheus::Histogram &index_files_size_histogram() { return index_files_size_histogram_; } -// -// prometheus::Histogram &build_index_duration_seconds_histogram() { return build_index_duration_seconds_histogram_; } -// -// prometheus::Histogram &all_build_index_duration_seconds_histogram() { return all_build_index_duration_seconds_histogram_; } -// -// prometheus::Gauge &cache_usage_gauge() { return cache_usage_gauge_; } -// -// prometheus::Counter &meta_visit_total() { return meta_visit_total_; } -// -// prometheus::Histogram &meta_visit_duration_seconds_histogram() { return meta_visit_duration_seconds_histogram_; } -// -// prometheus::Gauge &mem_usage_percent_gauge() { return mem_usage_percent_gauge_; } -// -// prometheus::Gauge &mem_usage_total_gauge() { return mem_usage_total_gauge_; } - - - - - std::shared_ptr &exposer_ptr() {return exposer_ptr_; } -// prometheus::Exposer& exposer() { return exposer_;} - std::shared_ptr ®istry_ptr() {return registry_; } - - // ..... - private: - ////all from db_connection.cpp -// prometheus::Family &connect_request_ = prometheus::BuildCounter() -// .Name("connection_total") -// .Help("total number of connection has been made") -// .Register(*registry_); -// prometheus::Counter &connection_total_ = connect_request_.Add({}); - - - - ////all from DBImpl.cpp - using BucketBoundaries = std::vector; - //record add_group request - prometheus::Family &add_group_request_ = prometheus::BuildCounter() - .Name("add_group_request_total") - .Help("the number of add_group request") - .Register(*registry_); - - prometheus::Counter &add_group_success_total_ = add_group_request_.Add({{"outcome", "success"}}); - prometheus::Counter &add_group_fail_total_ = add_group_request_.Add({{"outcome", "fail"}}); - - - //record get_group request - prometheus::Family &get_group_request_ = prometheus::BuildCounter() - .Name("get_group_request_total") - .Help("the number of get_group request") - .Register(*registry_); - - prometheus::Counter &get_group_success_total_ = get_group_request_.Add({{"outcome", "success"}}); - prometheus::Counter &get_group_fail_total_ = get_group_request_.Add({{"outcome", "fail"}}); - - - //record has_group request - prometheus::Family &has_group_request_ = prometheus::BuildCounter() - .Name("has_group_request_total") - .Help("the number of has_group request") - .Register(*registry_); - - prometheus::Counter &has_group_success_total_ = has_group_request_.Add({{"outcome", "success"}}); - prometheus::Counter &has_group_fail_total_ = has_group_request_.Add({{"outcome", "fail"}}); - - - //record get_group_files - prometheus::Family &get_group_files_request_ = prometheus::BuildCounter() - .Name("get_group_files_request_total") - .Help("the number of get_group_files request") - .Register(*registry_); - - prometheus::Counter &get_group_files_success_total_ = get_group_files_request_.Add({{"outcome", "success"}}); - prometheus::Counter &get_group_files_fail_total_ = get_group_files_request_.Add({{"outcome", "fail"}}); - - - //record add_vectors count and average time - //need to be considered - prometheus::Family &add_vectors_request_ = prometheus::BuildCounter() - .Name("add_vectors_request_total") - .Help("the number of vectors added") - .Register(*registry_); - prometheus::Counter &add_vectors_success_total_ = add_vectors_request_.Add({{"outcome", "success"}}); - prometheus::Counter &add_vectors_fail_total_ = add_vectors_request_.Add({{"outcome", "fail"}}); - - prometheus::Family &add_vectors_duration_seconds_ = prometheus::BuildHistogram() - .Name("add_vector_duration_seconds") - .Help("average time of adding every vector") - .Register(*registry_); - prometheus::Histogram &add_vectors_duration_histogram_ = add_vectors_duration_seconds_.Add({}, BucketBoundaries{0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.08, 0.1, 0.5, 1}); - - - //record search count and average time - prometheus::Family &search_request_ = prometheus::BuildCounter() - .Name("search_request_total") - .Help("the number of search request") - .Register(*registry_); - prometheus::Counter &search_success_total_ = search_request_.Add({{"outcome","success"}}); - prometheus::Counter &search_fail_total_ = search_request_.Add({{"outcome","fail"}}); - - prometheus::Family &search_request_duration_seconds_ = prometheus::BuildHistogram() - .Name("search_request_duration_second") - .Help("histogram of processing time for each search") - .Register(*registry_); - prometheus::Histogram &search_duration_histogram_ = search_request_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - //record raw_files size histogram - prometheus::Family &raw_files_size_ = prometheus::BuildHistogram() - .Name("search_raw_files_bytes") - .Help("histogram of raw files size by bytes") - .Register(*registry_); - prometheus::Histogram &raw_files_size_histogram_ = raw_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - //record index_files size histogram - prometheus::Family &index_files_size_ = prometheus::BuildHistogram() - .Name("search_index_files_bytes") - .Help("histogram of index files size by bytes") - .Register(*registry_); - prometheus::Histogram &index_files_size_histogram_ = index_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - //record index and raw files size counter - prometheus::Family &file_size_total_ = prometheus::BuildCounter() - .Name("search_file_size_total") - .Help("searched index and raw file size") - .Register(*registry_); - prometheus::Counter &index_file_size_total_ = file_size_total_.Add({{"type", "index"}}); - prometheus::Counter &raw_file_size_total_ = file_size_total_.Add({{"type", "raw"}}); - - //record index and raw files size counter - prometheus::Family &file_size_gauge_ = prometheus::BuildGauge() - .Name("search_file_size_gauge") - .Help("searched current index and raw file size") - .Register(*registry_); - prometheus::Gauge &index_file_size_gauge_ = file_size_gauge_.Add({{"type", "index"}}); - prometheus::Gauge &raw_file_size_gauge_ = file_size_gauge_.Add({{"type", "raw"}}); - - //record processing time for building index - prometheus::Family &build_index_duration_seconds_ = prometheus::BuildHistogram() - .Name("build_index_duration_seconds") - .Help("histogram of processing time for building index") - .Register(*registry_); - prometheus::Histogram &build_index_duration_seconds_histogram_ = build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - - //record processing time for all building index - prometheus::Family &all_build_index_duration_seconds_ = prometheus::BuildHistogram() - .Name("all_build_index_duration_seconds") - .Help("histogram of processing time for building index") - .Register(*registry_); - prometheus::Histogram &all_build_index_duration_seconds_histogram_ = all_build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - //record duration of merging mem table - prometheus::Family &mem_table_merge_duration_seconds_ = prometheus::BuildHistogram() - .Name("mem_table_merge_duration_seconds") - .Help("histogram of processing time for merging mem tables") - .Register(*registry_); - prometheus::Histogram &mem_table_merge_duration_seconds_histogram_ = mem_table_merge_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - //record search index and raw data duration - prometheus::Family &search_data_duration_seconds_ = prometheus::BuildHistogram() - .Name("search_data_duration_seconds") - .Help("histograms of processing time for search index and raw data") - .Register(*registry_); - prometheus::Histogram &search_index_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "index"}}, BucketBoundaries{0.1, 1.0, 10.0}); - prometheus::Histogram &search_raw_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "raw"}}, BucketBoundaries{0.1, 1.0, 10.0}); - - - ////all form Cache.cpp - //record cache usage, when insert/erase/clear/free - prometheus::Family &cache_usage_ = prometheus::BuildGauge() - .Name("cache_usage") - .Help("total bytes that cache used") - .Register(*registry_); - prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({}); - - - ////all from Meta.cpp - //record meta visit count and time -// prometheus::Family &meta_visit_ = prometheus::BuildCounter() -// .Name("meta_visit_total") -// .Help("the number of accessing Meta") -// .Register(*registry_); -// prometheus::Counter &meta_visit_total_ = meta_visit_.Add({{}}); -// -// prometheus::Family &meta_visit_duration_seconds_ = prometheus::BuildHistogram() -// .Name("meta_visit_duration_seconds") -// .Help("histogram of processing time to get data from mata") -// .Register(*registry_); -// prometheus::Histogram &meta_visit_duration_seconds_histogram_ = meta_visit_duration_seconds_.Add({{}}, BucketBoundaries{0.1, 1.0, 10.0}); - - - ////all from MemManager.cpp - //record memory usage percent - prometheus::Family &mem_usage_percent_ = prometheus::BuildGauge() - .Name("memory_usage_percent") - .Help("memory usage percent") - .Register(*registry_); - prometheus::Gauge &mem_usage_percent_gauge_ = mem_usage_percent_.Add({}); - - //record memory usage toal - prometheus::Family &mem_usage_total_ = prometheus::BuildGauge() - .Name("memory_usage_total") - .Help("memory usage total") - .Register(*registry_); - prometheus::Gauge &mem_usage_total_gauge_ = mem_usage_total_.Add({}); - - - - ////all from DBMetaImpl.cpp - //record meta access count - prometheus::Family &meta_access_ = prometheus::BuildCounter() - .Name("meta_access_total") - .Help("the number of meta accessing") - .Register(*registry_); - prometheus::Counter &meta_access_total_ = meta_access_.Add({}); - - //record meta access duration - prometheus::Family &meta_access_duration_seconds_ = prometheus::BuildHistogram() - .Name("meta_access_duration_seconds") - .Help("histogram of processing time for accessing mata") - .Register(*registry_); - prometheus::Histogram &meta_access_duration_seconds_histogram_ = meta_access_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0}); - - - - ////all from FaissExecutionEngine.cpp - //record data loading from disk count, size, duration, IO speed - prometheus::Family &disk_load_duration_second_ = prometheus::BuildHistogram() - .Name("disk_load_duration_seconds") - .Help("Histogram of processing time for loading data from disk") - .Register(*registry_); - prometheus::Histogram &faiss_disk_load_duration_seconds_histogram_ = disk_load_duration_second_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); - - prometheus::Family &disk_load_size_bytes_ = prometheus::BuildHistogram() - .Name("disk_load_size_bytes") - .Help("Histogram of data size by bytes for loading data from disk") - .Register(*registry_); - prometheus::Histogram &faiss_disk_load_size_bytes_histogram_ = disk_load_size_bytes_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); - - prometheus::Family &disk_load_IO_speed_ = prometheus::BuildHistogram() - .Name("disk_load_IO_speed_byte_per_sec") - .Help("Histogram of IO speed for loading data from disk") - .Register(*registry_); - prometheus::Histogram &faiss_disk_load_IO_speed_histogram_ = disk_load_IO_speed_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0}); - - ////all from CacheMgr.cpp - //record cache access count - prometheus::Family &cache_access_ = prometheus::BuildCounter() - .Name("cache_access_total") - .Help("the count of accessing cache ") - .Register(*registry_); - prometheus::Counter &cache_access_total_ = cache_access_.Add({}); - + static MetricsBase & + GetInstance(); }; -static MetricsBase& CreateMetricsCollector(MetricCollectorType collector_type) { - switch(collector_type) { - case MetricCollectorType::PROMETHEUS: - static PrometheusMetrics instance = PrometheusMetrics::GetInstance(); - return instance; - default: - return MetricsBase::GetInstance(); - } -} - -static MetricsBase& GetInstance(){ - ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC); - std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR); - if(collector_typr_str == "prometheus") { - return CreateMetricsCollector(MetricCollectorType::PROMETHEUS); - } else if(collector_typr_str == "zabbix"){ - return CreateMetricsCollector(MetricCollectorType::ZABBIX); - } else { - return CreateMetricsCollector(MetricCollectorType::INVALID); - } -} } diff --git a/cpp/src/metrics/PrometheusMetrics.h b/cpp/src/metrics/PrometheusMetrics.h index a619a794..06fa64bf 100644 --- a/cpp/src/metrics/PrometheusMetrics.h +++ b/cpp/src/metrics/PrometheusMetrics.h @@ -17,7 +17,7 @@ #define METRICS_NOW_TIME std::chrono::system_clock::now() -#define METRICS_INSTANCE server::GetInstance() +#define server::Metrics::GetInstance() server::GetInstance() #define METRICS_MICROSECONDS(a,b) (std::chrono::duration_cast (b-a)).count(); diff --git a/cpp/src/server/RocksIdMapper.cpp b/cpp/src/server/RocksIdMapper.cpp index 2dba5442..6eec10c6 100644 --- a/cpp/src/server/RocksIdMapper.cpp +++ b/cpp/src/server/RocksIdMapper.cpp @@ -18,8 +18,10 @@ namespace zilliz { namespace vecwise { namespace server { +static const std::string ROCKSDB_DEFAULT_GROUP = "default"; + RocksIdMapper::RocksIdMapper() -: db_(nullptr) { + : db_(nullptr) { OpenDb(); } @@ -28,6 +30,8 @@ RocksIdMapper::~RocksIdMapper() { } void RocksIdMapper::OpenDb() { + std::lock_guard lck(db_mutex_); + if(db_) { return; } @@ -79,6 +83,8 @@ void RocksIdMapper::OpenDb() { } void RocksIdMapper::CloseDb() { + std::lock_guard lck(db_mutex_); + for(auto& iter : column_handles_) { delete iter.second; } @@ -90,7 +96,117 @@ void RocksIdMapper::CloseDb() { } } +ServerError RocksIdMapper::AddGroup(const std::string& group) { + std::lock_guard lck(db_mutex_); + + return AddGroupInternal(group); +} + +bool RocksIdMapper::IsGroupExist(const std::string& group) const { + std::lock_guard lck(db_mutex_); + + return IsGroupExistInternal(group); +} + + ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { + std::lock_guard lck(db_mutex_); + + return PutInternal(nid, sid, group); +} + +ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { + if(nid.size() != sid.size()) { + return SERVER_INVALID_ARGUMENT; + } + + std::lock_guard lck(db_mutex_); + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + err = PutInternal(nid[i], sid[i], group); + if(err != SERVER_SUCCESS) { + return err; + } + } + + return err; +} + +ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { + std::lock_guard lck(db_mutex_); + + return GetInternal(nid, sid, group); +} + +ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { + sid.clear(); + + std::lock_guard lck(db_mutex_); + + ServerError err = SERVER_SUCCESS; + for(size_t i = 0; i < nid.size(); i++) { + std::string str_id; + ServerError temp_err = GetInternal(nid[i], str_id, group); + if(temp_err != SERVER_SUCCESS) { + sid.push_back(""); + SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; + err = temp_err; + continue; + } + + sid.push_back(str_id); + } + + return err; +} + +ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { + std::lock_guard lck(db_mutex_); + + return DeleteInternal(nid, group); +} + +ServerError RocksIdMapper::DeleteGroup(const std::string& group) { + std::lock_guard lck(db_mutex_); + + return DeleteGroupInternal(group); +} + +//internal methods(whitout lock) +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +ServerError RocksIdMapper::AddGroupInternal(const std::string& group) { + if(!IsGroupExistInternal(group)) { + if(db_ == nullptr) { + return SERVER_NULL_POINTER; + } + + try {//add group + rocksdb::ColumnFamilyHandle *cfh = nullptr; + rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh); + if (!s.ok()) { + SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString(); + return SERVER_UNEXPECTED_ERROR; + } else { + column_handles_.insert(std::make_pair(group, cfh)); + } + } catch(std::exception& ex) { + SERVER_LOG_ERROR << "ID mapper failed to create group: " << ex.what(); + return SERVER_UNEXPECTED_ERROR; + } + } + + return SERVER_SUCCESS; +} + +bool RocksIdMapper::IsGroupExistInternal(const std::string& group) const { + std::string group_name = group; + if(group_name.empty()){ + group_name = ROCKSDB_DEFAULT_GROUP; + } + return (column_handles_.count(group_name) > 0 && column_handles_[group_name] != nullptr); +} + +ServerError RocksIdMapper::PutInternal(const std::string& nid, const std::string& sid, const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -104,22 +220,12 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c return SERVER_UNEXPECTED_ERROR; } } else { - rocksdb::ColumnFamilyHandle *cfh = nullptr; - if(column_handles_.count(group) == 0) { - try {//add group - rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh); - if (!s.ok()) { - SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString(); - } else { - column_handles_.insert(std::make_pair(group, cfh)); - } - } catch(std::exception& ex) { - std::cout << ex.what() << std::endl; - } - } else { - cfh = column_handles_[group]; + //try create group + if(AddGroupInternal(group) != SERVER_SUCCESS){ + return SERVER_UNEXPECTED_ERROR; } + rocksdb::ColumnFamilyHandle *cfh = column_handles_[group]; rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value); if (!s.ok()) { SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString(); @@ -130,23 +236,7 @@ ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, c return SERVER_SUCCESS; } -ServerError RocksIdMapper::Put(const std::vector& nid, const std::vector& sid, const std::string& group) { - if(nid.size() != sid.size()) { - return SERVER_INVALID_ARGUMENT; - } - - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - err = Put(nid[i], sid[i], group); - if(err != SERVER_SUCCESS) { - return err; - } - } - - return err; -} - -ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const { +ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid, const std::string& group) const { sid = ""; if(db_ == nullptr) { return SERVER_NULL_POINTER; @@ -173,27 +263,7 @@ ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const s return SERVER_SUCCESS; } -ServerError RocksIdMapper::Get(const std::vector& nid, std::vector& sid, const std::string& group) const { - sid.clear(); - - ServerError err = SERVER_SUCCESS; - for(size_t i = 0; i < nid.size(); i++) { - std::string str_id; - ServerError temp_err = Get(nid[i], str_id, group); - if(temp_err != SERVER_SUCCESS) { - sid.push_back(""); - SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i]; - err = temp_err; - continue; - } - - sid.push_back(str_id); - } - - return err; -} - -ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) { +ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -218,7 +288,7 @@ ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& gro return SERVER_SUCCESS; } -ServerError RocksIdMapper::DeleteGroup(const std::string& group) { +ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) { if(db_ == nullptr) { return SERVER_NULL_POINTER; } @@ -237,6 +307,7 @@ ServerError RocksIdMapper::DeleteGroup(const std::string& group) { return SERVER_SUCCESS; } + } } } \ No newline at end of file diff --git a/cpp/src/server/RocksIdMapper.h b/cpp/src/server/RocksIdMapper.h index 8c731559..1ffee7f3 100644 --- a/cpp/src/server/RocksIdMapper.h +++ b/cpp/src/server/RocksIdMapper.h @@ -13,16 +13,20 @@ #include #include #include +#include namespace zilliz { namespace vecwise { namespace server { class RocksIdMapper : public IVecIdMapper{ -public: + public: RocksIdMapper(); ~RocksIdMapper(); + ServerError AddGroup(const std::string& group) override; + bool IsGroupExist(const std::string& group) const override; + ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override; ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") override; @@ -32,15 +36,29 @@ public: ServerError Delete(const std::string& nid, const std::string& group = "") override; ServerError DeleteGroup(const std::string& group) override; -private: + private: void OpenDb(); void CloseDb(); -private: + ServerError AddGroupInternal(const std::string& group); + + bool IsGroupExistInternal(const std::string& group) const; + + ServerError PutInternal(const std::string& nid, const std::string& sid, const std::string& group); + + ServerError GetInternal(const std::string& nid, std::string& sid, const std::string& group) const; + + ServerError DeleteInternal(const std::string& nid, const std::string& group); + + ServerError DeleteGroupInternal(const std::string& group); + + private: rocksdb::DB* db_; - std::unordered_map column_handles_; + mutable std::unordered_map column_handles_; + mutable std::mutex db_mutex_; }; + } } } diff --git a/cpp/src/server/VecIdMapper.cpp b/cpp/src/server/VecIdMapper.cpp index ecf5058a..d9de2ca3 100644 --- a/cpp/src/server/VecIdMapper.cpp +++ b/cpp/src/server/VecIdMapper.cpp @@ -39,6 +39,20 @@ SimpleIdMapper::~SimpleIdMapper() { } +ServerError +SimpleIdMapper::AddGroup(const std::string& group) { + if(id_groups_.count(group) == 0) { + id_groups_.insert(std::make_pair(group, ID_MAPPING())); + } +} + +//not thread-safe +bool +SimpleIdMapper::IsGroupExist(const std::string& group) const { + return id_groups_.count(group) > 0; +} + + //not thread-safe ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) { ID_MAPPING& mapping = id_groups_[group]; diff --git a/cpp/src/server/VecIdMapper.h b/cpp/src/server/VecIdMapper.h index 9bb6d500..f3c2bdde 100644 --- a/cpp/src/server/VecIdMapper.h +++ b/cpp/src/server/VecIdMapper.h @@ -25,6 +25,9 @@ public: virtual ~IVecIdMapper(){} + virtual ServerError AddGroup(const std::string& group) = 0; + virtual bool IsGroupExist(const std::string& group) const = 0; + virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0; virtual ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") = 0; @@ -41,6 +44,9 @@ public: SimpleIdMapper(); ~SimpleIdMapper(); + ServerError AddGroup(const std::string& group) override; + bool IsGroupExist(const std::string& group) const override; + ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override; ServerError Put(const std::vector& nid, const std::vector& sid, const std::string& group = "") override; -- GitLab