提交 338e0320 编写于 作者: Y Yu Kun

reconstruct MetricsCollector


Former-commit-id: 4bb27e69d00974f5d21824e233f885a88a7249cb
上级 b4eeed0a
...@@ -41,6 +41,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -41,6 +41,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-400 - Add timestamp record in task state change function - MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem - MS-402 - Add dump implementation for TaskTableItem
- MS-403 - Add GpuCacheMgr - MS-403 - Add GpuCacheMgr
- MS-407 - Reconstruct MetricsCollector
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -25,7 +25,6 @@ metric_config: ...@@ -25,7 +25,6 @@ metric_config:
is_startup: off # if monitoring start: on, off is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port push_gateway_port: 9091 # push method configure: push gateway port
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <cstring> #include <cstring>
#include <cache/CpuCacheMgr.h> #include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <src/cache/GpuCacheMgr.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; ...@@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
} }
...@@ -164,27 +139,21 @@ Status DBImpl::InsertVectors(const std::string& table_id_, ...@@ -164,27 +139,21 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
auto start_time = METRICS_NOW_TIME; auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_); Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME; zilliz::milvus::server::CollectInsertMetrics metrics(start_time, n, status.ok());
double total_time = METRICS_MICROSECONDS(start_time,end_time);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time); // std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n; // double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished"; ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status; return status;
} }
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) { const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME; server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()}; meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results); Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return result; return result;
} }
...@@ -251,7 +220,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string> ...@@ -251,7 +220,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) { const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME; server::CollectQueryMetrics metrics(nq);
server::TimeRecorder rc(""); server::TimeRecorder rc("");
//step 1: get files to search //step 1: get files to search
...@@ -294,11 +264,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch ...@@ -294,11 +264,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult(); results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost"); rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK(); return Status::OK();
} }
...@@ -418,14 +383,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, ...@@ -418,14 +383,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
long index_size = 0; long index_size = 0;
for (auto& file : files) { for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
auto start_time = METRICS_NOW_TIME;
index->Merge(file.location_); index->Merge(file.location_);
auto file_schema = file; auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema); updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
...@@ -641,11 +602,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { ...@@ -641,11 +602,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
std::shared_ptr<ExecutionEngine> index; std::shared_ptr<ExecutionEngine> index;
try { try {
auto start_time = METRICS_NOW_TIME; server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_); index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
} catch (std::exception& ex) { } catch (std::exception& ex) {
//typical error: out of gpu memory //typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception" + std::string(ex.what()); std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
......
...@@ -116,9 +116,11 @@ Status ExecutionEngineImpl::Serialize() { ...@@ -116,9 +116,11 @@ Status ExecutionEngineImpl::Serialize() {
} }
Status ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::Load(bool to_cache) {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr); bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) { if (!index_) {
try { try {
index_ = read_index(location_); index_ = read_index(location_);
...@@ -133,22 +135,16 @@ Status ExecutionEngineImpl::Load(bool to_cache) { ...@@ -133,22 +135,16 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
if (!already_in_cache && to_cache) { if (!already_in_cache && to_cache) {
Cache(); Cache();
auto end_time = METRICS_NOW_TIME; physical_size = PhysicalSize();
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
} }
return Status::OK(); return Status::OK();
} }
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr); bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) { if (!index_) {
try { try {
index_ = index_->CopyToGpu(device_id); index_ = index_->CopyToGpu(device_id);
...@@ -163,21 +159,17 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ...@@ -163,21 +159,17 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
if (!already_in_cache) { if (!already_in_cache) {
GpuCache(device_id); GpuCache(device_id);
auto end_time = METRICS_NOW_TIME; physical_size = PhysicalSize();
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
} }
return Status::OK(); return Status::OK();
} }
Status ExecutionEngineImpl::CopyToCpu() { Status ExecutionEngineImpl::CopyToCpu() {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr); bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) { if (!index_) {
try { try {
index_ = index_->CopyToCpu(); index_ = index_->CopyToCpu();
...@@ -192,14 +184,8 @@ Status ExecutionEngineImpl::CopyToCpu() { ...@@ -192,14 +184,8 @@ Status ExecutionEngineImpl::CopyToCpu() {
if(!already_in_cache) { if(!already_in_cache) {
Cache(); Cache();
auto end_time = METRICS_NOW_TIME; physical_size = PhysicalSize();
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
} }
return Status::OK(); return Status::OK();
} }
......
...@@ -80,20 +80,15 @@ bool MemTableFile::IsFull() { ...@@ -80,20 +80,15 @@ bool MemTableFile::IsFull() {
} }
Status MemTableFile::Serialize() { Status MemTableFile::Serialize() {
size_t size;
auto start_time = METRICS_NOW_TIME; server::CollectSerializeMetrics metrics(size);
size = GetCurrentMem();
auto size = GetCurrentMem();
execution_engine_->Serialize(); execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize(); table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count(); table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
......
...@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, ...@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
size_t &num_vectors_added, size_t &num_vectors_added,
IDNumbers &vector_ids) { IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME; server::CollectorAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added; num_vectors_to_add : n_ - current_num_vectors_added;
...@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, ...@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
} }
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status; return status;
} }
......
...@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) { ...@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what()); return Status::DBTransactionError(desc, e.what());
} }
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
} }
Status MySQLMetaImpl::NextTableId(std::string &table_id) { Status MySQLMetaImpl::NextTableId(std::string &table_id) {
...@@ -272,7 +254,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, ...@@ -272,7 +254,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -390,7 +372,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) { ...@@ -390,7 +372,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) { Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -457,7 +439,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T ...@@ -457,7 +439,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) { Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -504,7 +486,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex ...@@ -504,7 +486,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) { Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -560,7 +542,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) { ...@@ -560,7 +542,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) { Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -603,7 +585,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) { ...@@ -603,7 +585,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) { Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -642,7 +624,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) { ...@@ -642,7 +624,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -697,7 +679,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { ...@@ -697,7 +679,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -737,7 +719,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { ...@@ -737,7 +719,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) { Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -802,7 +784,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { ...@@ -802,7 +784,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
} }
try { try {
MetricCollector metric; server::MetricCollector metric;
NextFileId(file_schema.file_id_); NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_; file_schema.dimension_ = table_schema.dimension_;
...@@ -870,7 +852,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { ...@@ -870,7 +852,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -953,7 +935,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -953,7 +935,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1067,7 +1049,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -1067,7 +1049,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1177,7 +1159,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, ...@@ -1177,7 +1159,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
//check table existence //check table existence
TableSchema table_schema; TableSchema table_schema;
...@@ -1457,7 +1439,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) { ...@@ -1457,7 +1439,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try { try {
MetricCollector metric; server::MetricCollector metric;
bool status; bool status;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1529,7 +1511,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { ...@@ -1529,7 +1511,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1643,7 +1625,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) { ...@@ -1643,7 +1625,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1733,7 +1715,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1733,7 +1715,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files //remove to_delete files
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1812,7 +1794,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1812,7 +1794,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables //remove to_delete tables
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1870,7 +1852,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1870,7 +1852,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder //remove deleted table folder
//don't remove table folder until all its files has been deleted //don't remove table folder until all its files has been deleted
try { try {
MetricCollector metric; server::MetricCollector metric;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -1955,7 +1937,7 @@ Status MySQLMetaImpl::CleanUp() { ...@@ -1955,7 +1937,7 @@ Status MySQLMetaImpl::CleanUp() {
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try { try {
MetricCollector metric; server::MetricCollector metric;
TableSchema table_schema; TableSchema table_schema;
table_schema.table_id_ = table_id; table_schema.table_id_ = table_id;
......
...@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) { ...@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what()); return Status::DBTransactionError(desc, e.what());
} }
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
} }
inline auto StoragePrototype(const std::string &path) { inline auto StoragePrototype(const std::string &path) {
...@@ -170,7 +152,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id, ...@@ -170,7 +152,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) { Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -212,7 +194,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) { ...@@ -212,7 +194,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) { Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -236,7 +218,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) { ...@@ -236,7 +218,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) { Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -261,7 +243,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) { ...@@ -261,7 +243,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try { try {
MetricCollector metric; server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_, auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_, &TableSchema::state_,
...@@ -350,7 +332,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) ...@@ -350,7 +332,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) { Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -399,7 +381,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const ...@@ -399,7 +381,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) { Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try { try {
MetricCollector metric; server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_, auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
&TableSchema::nlist_, &TableSchema::nlist_,
...@@ -426,7 +408,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde ...@@ -426,7 +408,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) { Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -464,7 +446,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { ...@@ -464,7 +446,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false; has_or_not = false;
try { try {
MetricCollector metric; server::MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_), auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE)); and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
...@@ -483,7 +465,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { ...@@ -483,7 +465,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) { Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try { try {
MetricCollector metric; server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_, auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_, &TableSchema::table_id_,
...@@ -527,7 +509,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { ...@@ -527,7 +509,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
} }
try { try {
MetricCollector metric; server::MetricCollector metric;
NextFileId(file_schema.file_id_); NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_; file_schema.dimension_ = table_schema.dimension_;
...@@ -558,7 +540,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { ...@@ -558,7 +540,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_, &TableFileSchema::table_id_,
...@@ -616,7 +598,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -616,7 +598,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
if (partition.empty()) { if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
...@@ -716,7 +698,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, ...@@ -716,7 +698,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition, const DatesT &partition,
DatePartionedTableFilesSchema &files) { DatePartionedTableFilesSchema &files) {
files.clear(); files.clear();
MetricCollector metric; server::MetricCollector metric;
try { try {
auto select_columns = columns(&TableFileSchema::id_, auto select_columns = columns(&TableFileSchema::id_,
...@@ -793,7 +775,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id, ...@@ -793,7 +775,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files.clear(); files.clear();
try { try {
MetricCollector metric; server::MetricCollector metric;
//check table existence //check table existence
TableSchema table_schema; TableSchema table_schema;
...@@ -967,7 +949,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) { ...@@ -967,7 +949,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1024,7 +1006,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) { ...@@ -1024,7 +1006,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1050,7 +1032,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { ...@@ -1050,7 +1032,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1072,7 +1054,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) { ...@@ -1072,7 +1054,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) { Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1121,7 +1103,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1121,7 +1103,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files //remove to_delete files
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1165,7 +1147,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1165,7 +1147,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables //remove to_delete tables
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1195,7 +1177,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1195,7 +1177,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder //remove deleted table folder
//don't remove table folder until all its files has been deleted //don't remove table folder until all its files has been deleted
try { try {
MetricCollector metric; server::MetricCollector metric;
for(auto& table_id : table_ids) { for(auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_), auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
...@@ -1214,7 +1196,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { ...@@ -1214,7 +1196,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status SqliteMetaImpl::CleanUp() { Status SqliteMetaImpl::CleanUp() {
try { try {
MetricCollector metric; server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_); std::lock_guard<std::mutex> meta_lock(meta_mutex_);
...@@ -1245,7 +1227,7 @@ Status SqliteMetaImpl::CleanUp() { ...@@ -1245,7 +1227,7 @@ Status SqliteMetaImpl::CleanUp() {
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) { Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try { try {
MetricCollector metric; server::MetricCollector metric;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_), auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
......
...@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t ...@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t
} }
} }
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
} }
SearchTask::SearchTask() SearchTask::SearchTask()
...@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() { ...@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME; server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2); bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
...@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() { ...@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
context->IndexSearchDone(index_id_); context->IndexSearchDone(index_id_);
} }
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost"); rc.ElapseFromBegin("totally cost");
return nullptr; return nullptr;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#pragma once #pragma once
#include "MetricBase.h" #include "MetricBase.h"
#include "db/meta/MetaTypes.h"
namespace zilliz { namespace zilliz {
...@@ -29,6 +30,233 @@ class Metrics { ...@@ -29,6 +30,233 @@ class Metrics {
static MetricsBase &CreateMetricsCollector(); static MetricsBase &CreateMetricsCollector();
}; };
class CollectInsertMetrics {
public:
CollectInsertMetrics(std::chrono::system_clock::time_point start_time,
size_t n, bool succeed) : start_time_(start_time), n_(n), succeed_(succeed) {
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed_) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
bool succeed_;
};
class CollectQueryMetrics {
public:
CollectQueryMetrics(size_t nq) : nq_(nq) {
start_time_ = METRICS_NOW_TIME;
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t nq_;
};
class CollectMergeFilesMetrics {
public:
CollectMergeFilesMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectMergeFilesMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectBuildIndexMetrics {
public:
CollectBuildIndexMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectBuildIndexMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectExecutionEngineMetrics {
public:
CollectExecutionEngineMetrics(double& physical_size) : physical_size_(physical_size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectExecutionEngineMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time));
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
double& physical_size_;
};
class CollectSerializeMetrics {
public:
CollectSerializeMetrics(size_t& size) : size_(size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSerializeMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t& size_;
};
class CollectorAddMetrics {
public:
CollectorAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) {
start_time_ = METRICS_NOW_TIME;
}
~CollectorAddMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(dimension_),
total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
uint16_t dimension_;
};
class CollectorDurationMetrics {
public:
CollectorDurationMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectorDurationMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch (index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class CollectSearchTaskMetrics {
public:
CollectSearchTaskMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSearchTaskMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch(index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
} }
} }
......
...@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) { ...@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) {
} }
} }
void
CollectDurationMetrics(int index_type, double total_time) {
switch (index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) { XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_, index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_, file_->location_,
...@@ -159,7 +141,7 @@ XSearchTask::Execute() { ...@@ -159,7 +141,7 @@ XSearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME; server::CollectorDurationMetrics metrics(index_type_);
std::vector<long> output_ids; std::vector<long> output_ids;
std::vector<float> output_distence; std::vector<float> output_distence;
...@@ -202,10 +184,6 @@ XSearchTask::Execute() { ...@@ -202,10 +184,6 @@ XSearchTask::Execute() {
context->IndexSearchDone(index_id_); context->IndexSearchDone(index_id_);
} }
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.ElapseFromBegin("totally cost"); rc.ElapseFromBegin("totally cost");
} }
......
...@@ -21,7 +21,7 @@ TEST(MetricbaseTest, METRICBASE_TEST){ ...@@ -21,7 +21,7 @@ TEST(MetricbaseTest, METRICBASE_TEST){
instance.RawFileSizeHistogramObserve(1.0); instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0); instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0); instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0); instance.CpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement(); instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0); instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0); instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
...@@ -22,7 +22,7 @@ TEST(PrometheusTest, PROMETHEUS_TEST){ ...@@ -22,7 +22,7 @@ TEST(PrometheusTest, PROMETHEUS_TEST){
instance.RawFileSizeHistogramObserve(1.0); instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0); instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0); instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0); instance.CpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement(); instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0); instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0); instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
...@@ -18,7 +18,6 @@ metric_config: ...@@ -18,7 +18,6 @@ metric_config:
is_startup: off # if monitoring start: on, off is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port push_gateway_port: 9091 # push method configure: push gateway port
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册