提交 2cd29687 编写于 作者: Y Yu Kun

reconstruct MetricsCollector


Former-commit-id: 4f7a219b9eb10d98068a5488967319b039fd695a
上级 ed037699
......@@ -41,6 +41,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-400 - Add timestamp record in task state change function
- MS-402 - Add dump implementation for TaskTableItem
- MS-403 - Add GpuCacheMgr
- MS-407 - Reconstruct MetricsCollector
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -25,7 +25,6 @@ metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
......
......@@ -23,6 +23,7 @@
#include <cstring>
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
#include <src/cache/GpuCacheMgr.h>
namespace zilliz {
namespace milvus {
......@@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
}
......@@ -164,27 +139,21 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
zilliz::milvus::server::CollectInsertMetrics metrics(start_time, n, status.ok());
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
}
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return result;
}
......@@ -251,7 +220,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
server::TimeRecorder rc("");
//step 1: get files to search
......@@ -294,11 +264,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK();
}
......@@ -418,14 +383,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
long index_size = 0;
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
auto start_time = METRICS_NOW_TIME;
index->Merge(file.location_);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
......@@ -641,11 +602,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
std::shared_ptr<ExecutionEngine> index;
try {
auto start_time = METRICS_NOW_TIME;
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
} catch (std::exception& ex) {
//typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
......
......@@ -116,9 +116,11 @@ Status ExecutionEngineImpl::Serialize() {
}
Status ExecutionEngineImpl::Load(bool to_cache) {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = read_index(location_);
......@@ -133,22 +135,16 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
physical_size = PhysicalSize();
}
return Status::OK();
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
......@@ -163,21 +159,17 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
physical_size = PhysicalSize();
}
return Status::OK();
}
Status ExecutionEngineImpl::CopyToCpu() {
double physical_size;
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
......@@ -192,14 +184,8 @@ Status ExecutionEngineImpl::CopyToCpu() {
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
physical_size = PhysicalSize();
}
return Status::OK();
}
......
......@@ -80,20 +80,15 @@ bool MemTableFile::IsFull() {
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
size_t size;
server::CollectSerializeMetrics metrics(size);
size = GetCurrentMem();
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
......
......@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
size_t &num_vectors_added,
IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME;
server::CollectorAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
......@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status;
}
......
......@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
......@@ -272,7 +254,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -390,7 +372,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -457,7 +439,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -504,7 +486,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -560,7 +542,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -603,7 +585,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -642,7 +624,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -697,7 +679,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -737,7 +719,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -802,7 +784,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -870,7 +852,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -953,7 +935,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1067,7 +1049,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1177,7 +1159,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -1457,7 +1439,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1529,7 +1511,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1643,7 +1625,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1733,7 +1715,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1812,7 +1794,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1870,7 +1852,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1955,7 +1937,7 @@ Status MySQLMetaImpl::CleanUp() {
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
TableSchema table_schema;
table_schema.table_id_ = table_id;
......
......@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
inline auto StoragePrototype(const std::string &path) {
......@@ -170,7 +152,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -212,7 +194,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -236,7 +218,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -261,7 +243,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
......@@ -350,7 +332,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -399,7 +381,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
&TableSchema::nlist_,
......@@ -426,7 +408,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -464,7 +446,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
try {
MetricCollector metric;
server::MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
......@@ -483,7 +465,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
......@@ -527,7 +509,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -558,7 +540,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
......@@ -616,7 +598,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
......@@ -716,7 +698,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
MetricCollector metric;
server::MetricCollector metric;
try {
auto select_columns = columns(&TableFileSchema::id_,
......@@ -793,7 +775,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -967,7 +949,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1024,7 +1006,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1050,7 +1032,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1072,7 +1054,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1121,7 +1103,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1165,7 +1147,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1195,7 +1177,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
for(auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
......@@ -1214,7 +1196,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status SqliteMetaImpl::CleanUp() {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1245,7 +1227,7 @@ Status SqliteMetaImpl::CleanUp() {
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
......
......@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
}
SearchTask::SearchTask()
......@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
......@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost");
return nullptr;
......
......@@ -6,6 +6,7 @@
#pragma once
#include "MetricBase.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
......@@ -29,6 +30,233 @@ class Metrics {
static MetricsBase &CreateMetricsCollector();
};
class CollectInsertMetrics {
public:
CollectInsertMetrics(std::chrono::system_clock::time_point start_time,
size_t n, bool succeed) : start_time_(start_time), n_(n), succeed_(succeed) {
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed_) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
bool succeed_;
};
class CollectQueryMetrics {
public:
CollectQueryMetrics(size_t nq) : nq_(nq) {
start_time_ = METRICS_NOW_TIME;
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t nq_;
};
class CollectMergeFilesMetrics {
public:
CollectMergeFilesMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectMergeFilesMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectBuildIndexMetrics {
public:
CollectBuildIndexMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectBuildIndexMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectExecutionEngineMetrics {
public:
CollectExecutionEngineMetrics(double& physical_size) : physical_size_(physical_size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectExecutionEngineMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time));
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
double& physical_size_;
};
class CollectSerializeMetrics {
public:
CollectSerializeMetrics(size_t& size) : size_(size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSerializeMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t& size_;
};
class CollectorAddMetrics {
public:
CollectorAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) {
start_time_ = METRICS_NOW_TIME;
}
~CollectorAddMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(dimension_),
total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
uint16_t dimension_;
};
class CollectorDurationMetrics {
public:
CollectorDurationMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectorDurationMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch (index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class CollectSearchTaskMetrics {
public:
CollectSearchTaskMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSearchTaskMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch(index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
}
......
......@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
void
CollectDurationMetrics(int index_type, double total_time) {
switch (index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
......@@ -159,7 +141,7 @@ XSearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectorDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<float> output_distence;
......@@ -202,10 +184,6 @@ XSearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.ElapseFromBegin("totally cost");
}
......
......@@ -21,7 +21,7 @@ TEST(MetricbaseTest, METRICBASE_TEST){
instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0);
instance.CpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
......@@ -22,7 +22,7 @@ TEST(PrometheusTest, PROMETHEUS_TEST){
instance.RawFileSizeHistogramObserve(1.0);
instance.IndexFileSizeHistogramObserve(1.0);
instance.BuildIndexDurationSecondsHistogramObserve(1.0);
instance.CacheUsageGaugeSet(1.0);
instance.CpuCacheUsageGaugeSet(1.0);
instance.MetaAccessTotalIncrement();
instance.MetaAccessDurationSecondsHistogramObserve(1.0);
instance.FaissDiskLoadDurationSecondsHistogramObserve(1.0);
......
......@@ -18,7 +18,6 @@ metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册