提交 de8d6d6c 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-407: Reconstruct MetricsCollector

See merge request megasearch/milvus!414

Former-commit-id: 816cd0063cfc296b22b2f1ef1115719118881679
......@@ -44,6 +44,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support
- MS-407 - Reconstruct MetricsCollector
- MS-408 - Add device_id in resource construct function
## New Feature
......
......@@ -25,7 +25,6 @@ metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
......
......@@ -23,6 +23,7 @@
#include <cstring>
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
#include <src/cache/GpuCacheMgr.h>
namespace zilliz {
namespace milvus {
......@@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
}
......@@ -166,29 +141,23 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
Status status;
zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
}
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return result;
}
......@@ -255,7 +224,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
server::TimeRecorder rc("");
//step 1: get files to search
......@@ -298,11 +268,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK();
}
......@@ -422,14 +387,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
long index_size = 0;
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
auto start_time = METRICS_NOW_TIME;
index->Merge(file.location_);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
......@@ -645,11 +606,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
std::shared_ptr<ExecutionEngine> index;
try {
auto start_time = METRICS_NOW_TIME;
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
} catch (std::exception& ex) {
//typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
......
......@@ -118,9 +118,10 @@ Status ExecutionEngineImpl::Serialize() {
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = read_index(location_);
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
} catch (knowhere::KnowhereException &e) {
......@@ -133,14 +134,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
}
return Status::OK();
}
......@@ -148,7 +141,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
......@@ -163,12 +155,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
......@@ -177,7 +163,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Status ExecutionEngineImpl::CopyToCpu() {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
......@@ -192,14 +177,7 @@ Status ExecutionEngineImpl::CopyToCpu() {
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -212,6 +190,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
try {
double physical_size = server::CommonUtil::GetFileSize(location);
server::CollectExecutionEngineMetrics metrics(physical_size);
to_merge = read_index(location);
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
......
......@@ -80,20 +80,13 @@ bool MemTableFile::IsFull() {
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
size_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size);
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
......
......@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
size_t &num_vectors_added,
IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME;
server::CollectAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
......@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status;
}
......
......@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
......@@ -273,7 +255,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -391,7 +373,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -458,7 +440,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -498,7 +480,7 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -545,7 +527,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -601,7 +583,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -644,7 +626,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -683,7 +665,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -738,7 +720,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -778,7 +760,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -843,7 +825,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -911,7 +893,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -994,7 +976,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1108,7 +1090,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1218,7 +1200,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -1498,7 +1480,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1570,7 +1552,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1684,7 +1666,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1774,7 +1756,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1853,7 +1835,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1911,7 +1893,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1996,7 +1978,7 @@ Status MySQLMetaImpl::CleanUp() {
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
TableSchema table_schema;
table_schema.table_id_ = table_id;
......
......@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
inline auto StoragePrototype(const std::string &path) {
......@@ -171,7 +153,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -213,7 +195,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -237,7 +219,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -262,7 +244,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
......@@ -353,7 +335,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -405,7 +387,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
server::MetricCollector metric;
//set all backup file to raw
ConnectorPtr->update_all(
......@@ -426,7 +408,7 @@ Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
&TableSchema::nlist_,
......@@ -453,7 +435,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -491,7 +473,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
try {
MetricCollector metric;
server::MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
......@@ -510,7 +492,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
......@@ -556,7 +538,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -587,7 +569,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
......@@ -645,7 +627,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
......@@ -745,7 +727,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
MetricCollector metric;
server::MetricCollector metric;
try {
auto select_columns = columns(&TableFileSchema::id_,
......@@ -822,7 +804,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -996,7 +978,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1053,7 +1035,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1079,7 +1061,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1101,7 +1083,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1150,7 +1132,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1194,7 +1176,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1224,7 +1206,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
for(auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
......@@ -1243,7 +1225,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status SqliteMetaImpl::CleanUp() {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1274,7 +1256,7 @@ Status SqliteMetaImpl::CleanUp() {
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
......
......@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
}
SearchTask::SearchTask()
......@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
......@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost");
return nullptr;
......
......@@ -6,6 +6,7 @@
#pragma once
#include "MetricBase.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
......@@ -29,6 +30,233 @@ class Metrics {
static MetricsBase &CreateMetricsCollector();
};
class CollectInsertMetrics {
public:
CollectInsertMetrics(size_t n, engine::Status& status) : n_(n), status_(status) {
start_time_ = METRICS_NOW_TIME;
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (status_.ok()) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
engine::Status& status_;
};
class CollectQueryMetrics {
public:
CollectQueryMetrics(size_t nq) : nq_(nq) {
start_time_ = METRICS_NOW_TIME;
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t nq_;
};
class CollectMergeFilesMetrics {
public:
CollectMergeFilesMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectMergeFilesMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectBuildIndexMetrics {
public:
CollectBuildIndexMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectBuildIndexMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectExecutionEngineMetrics {
public:
CollectExecutionEngineMetrics(double physical_size) : physical_size_(physical_size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectExecutionEngineMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time));
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
double physical_size_;
};
class CollectSerializeMetrics {
public:
CollectSerializeMetrics(size_t size) : size_(size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSerializeMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t size_;
};
class CollectAddMetrics {
public:
CollectAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) {
start_time_ = METRICS_NOW_TIME;
}
~CollectAddMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(dimension_),
total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
uint16_t dimension_;
};
class CollectDurationMetrics {
public:
CollectDurationMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectDurationMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch (index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class CollectSearchTaskMetrics {
public:
CollectSearchTaskMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSearchTaskMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch(index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
}
......
......@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
void
CollectDurationMetrics(int index_type, double total_time) {
switch (index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
......@@ -159,7 +141,7 @@ XSearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<float> output_distence;
......@@ -202,10 +184,6 @@ XSearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.ElapseFromBegin("totally cost");
// release index in resource
......
......@@ -119,4 +119,33 @@ TEST_F(MetricTest, Metric_Tes) {
delete [] qxb;
};
TEST_F(MetricTest, Collector_Metrics_Test){
engine::Status status = engine::Status::OK();
server::CollectInsertMetrics insert_metrics0(0, status);
status = engine::Status::Error("error");
server::CollectInsertMetrics insert_metrics1(0, status);
server::CollectQueryMetrics query_metrics(10);
server::CollectMergeFilesMetrics merge_metrics();
server::CollectBuildIndexMetrics build_index_metrics();
server::CollectExecutionEngineMetrics execution_metrics(10);
server::CollectSerializeMetrics serialize_metrics(10);
server::CollectAddMetrics add_metrics(10, 128);
server::CollectDurationMetrics duration_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectDurationMetrics duration_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectDurationMetrics duration_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::CollectSearchTaskMetrics search_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectSearchTaskMetrics search_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectSearchTaskMetrics search_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::MetricCollector metric_collector();
}
......@@ -18,7 +18,6 @@ metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
collect_type: pull # prometheus collect data method
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册