提交 06e64755 编写于 作者: S starlord

refine code


Former-commit-id: 144ba2695e0574b059481bdaef0f362d4e990f1d
......@@ -44,6 +44,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-403 - Add GpuCacheMgr
- MS-404 - Release index after search task done avoid memory increment continues
- MS-405 - Add delete task support
- MS-407 - Reconstruct MetricsCollector
- MS-408 - Add device_id in resource construct function
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -18,6 +18,7 @@ db_config:
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
......
......@@ -23,6 +23,7 @@
#include <cstring>
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
#include <src/cache/GpuCacheMgr.h>
namespace zilliz {
namespace milvus {
......@@ -34,32 +35,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
}
......@@ -166,29 +141,23 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
Status status;
zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
}
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return result;
}
......@@ -255,7 +224,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
auto start_time = METRICS_NOW_TIME;
server::CollectQueryMetrics metrics(nq);
server::TimeRecorder rc("");
//step 1: get files to search
......@@ -298,11 +268,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
CollectQueryMetrics(total_time, nq);
return Status::OK();
}
......@@ -422,14 +387,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
long index_size = 0;
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
auto start_time = METRICS_NOW_TIME;
index->Merge(file.location_);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
......@@ -585,7 +546,7 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
//step 2: drop old index files
DropIndex(table_id);
if(index.metric_type_ == (int)EngineType::FAISS_IDMAP) {
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
ENGINE_LOG_DEBUG << "index type = IDMAP, no need to build index";
return Status::OK();
}
......@@ -650,11 +611,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
std::shared_ptr<ExecutionEngine> index;
try {
auto start_time = METRICS_NOW_TIME;
index = to_index->BuildIndex(table_file.location_);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
} catch (std::exception& ex) {
//typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
......
......@@ -60,7 +60,7 @@ public:
float *distances,
long *labels) const = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string&) = 0;
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string& location, EngineType engine_type) = 0;
virtual Status Cache() = 0;
......
......@@ -118,9 +118,10 @@ Status ExecutionEngineImpl::Serialize() {
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = read_index(location_);
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
} catch (knowhere::KnowhereException &e) {
......@@ -133,14 +134,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
}
return Status::OK();
}
......@@ -148,7 +141,6 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
......@@ -163,12 +155,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
......@@ -177,7 +163,6 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Status ExecutionEngineImpl::CopyToCpu() {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
......@@ -192,14 +177,7 @@ Status ExecutionEngineImpl::CopyToCpu() {
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -212,6 +190,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
try {
double physical_size = server::CommonUtil::GetFileSize(location);
server::CollectExecutionEngineMetrics metrics(physical_size);
to_merge = read_index(location);
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
......@@ -234,11 +214,11 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
}
ExecutionEnginePtr
ExecutionEngineImpl::BuildIndex(const std::string &location) {
ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_type) {
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
auto to_index = CreatetVecIndex(index_type_);
auto to_index = CreatetVecIndex(engine_type);
if (!to_index) {
throw Exception("Create Empty VecIndex");
}
......@@ -256,7 +236,7 @@ ExecutionEngineImpl::BuildIndex(const std::string &location) {
build_cfg);
if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
return std::make_shared<ExecutionEngineImpl>(to_index, location, index_type_, metric_type_, nlist_);
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
}
Status ExecutionEngineImpl::Search(long n,
......
......@@ -59,7 +59,7 @@ public:
float *distances,
long *labels) const override;
ExecutionEnginePtr BuildIndex(const std::string &) override;
ExecutionEnginePtr BuildIndex(const std::string &location, EngineType engine_type) override;
Status Cache() override;
......
......@@ -80,20 +80,13 @@ bool MemTableFile::IsFull() {
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
size_t size = GetCurrentMem();
server::CollectSerializeMetrics metrics(size);
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.file_size_ = execution_engine_->PhysicalSize();
table_file_schema_.row_count_ = execution_engine_->Count();
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= table_file_schema_.index_file_size_) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
......
......@@ -24,7 +24,7 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
size_t &num_vectors_added,
IDNumbers &vector_ids) {
auto start_time = METRICS_NOW_TIME;
server::CollectAddMetrics metrics(n_, table_file_schema.dimension_);
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
......@@ -49,12 +49,6 @@ Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status;
}
......
......@@ -39,24 +39,6 @@ Status HandleException(const std::string &desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
......@@ -273,7 +255,7 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -391,7 +373,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -458,7 +440,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -498,7 +480,7 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -545,7 +527,7 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -601,7 +583,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -644,7 +626,7 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -683,7 +665,7 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -738,7 +720,7 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -778,7 +760,7 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -843,7 +825,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -912,7 +894,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -996,7 +978,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1112,7 +1094,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1224,7 +1206,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -1508,7 +1490,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
bool status;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1580,7 +1562,7 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1694,7 +1676,7 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1784,7 +1766,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1863,7 +1845,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -1921,7 +1903,7 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
......@@ -2006,7 +1988,7 @@ Status MySQLMetaImpl::CleanUp() {
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
TableSchema table_schema;
table_schema.table_id_ = table_id;
......
......@@ -34,24 +34,6 @@ Status HandleException(const std::string& desc, std::exception &e) {
return Status::DBTransactionError(desc, e.what());
}
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
inline auto StoragePrototype(const std::string &path) {
......@@ -171,7 +153,7 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -213,7 +195,7 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -237,7 +219,7 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -262,7 +244,7 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::state_,
......@@ -353,7 +335,7 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -405,7 +387,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
try {
MetricCollector metric;
server::MetricCollector metric;
//set all backup file to raw
ConnectorPtr->update_all(
......@@ -426,7 +408,7 @@ Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag
Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto groups = ConnectorPtr->select(columns(&TableSchema::engine_type_,
&TableSchema::nlist_,
......@@ -453,7 +435,7 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -491,7 +473,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
try {
MetricCollector metric;
server::MetricCollector metric;
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
where(c(&TableSchema::table_id_) == table_id
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
......@@ -510,7 +492,7 @@ Status SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableSchema::id_,
&TableSchema::table_id_,
......@@ -556,7 +538,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
try {
MetricCollector metric;
server::MetricCollector metric;
NextFileId(file_schema.file_id_);
file_schema.dimension_ = table_schema.dimension_;
......@@ -588,7 +570,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
......@@ -647,7 +629,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
......@@ -749,7 +731,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
MetricCollector metric;
server::MetricCollector metric;
try {
auto select_columns = columns(&TableFileSchema::id_,
......@@ -828,7 +810,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files.clear();
try {
MetricCollector metric;
server::MetricCollector metric;
//check table existence
TableSchema table_schema;
......@@ -1005,7 +987,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1062,7 +1044,7 @@ Status SqliteMetaImpl::DiscardFiles(long to_discard_size) {
Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1088,7 +1070,7 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1110,7 +1092,7 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1159,7 +1141,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete files
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1203,7 +1185,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove to_delete tables
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1233,7 +1215,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
//remove deleted table folder
//don't remove table folder until all its files has been deleted
try {
MetricCollector metric;
server::MetricCollector metric;
for(auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
......@@ -1252,7 +1234,7 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status SqliteMetaImpl::CleanUp() {
try {
MetricCollector metric;
server::MetricCollector metric;
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
......@@ -1283,7 +1265,7 @@ Status SqliteMetaImpl::CleanUp() {
Status SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
try {
MetricCollector metric;
server::MetricCollector metric;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::row_count_),
......
......@@ -59,23 +59,6 @@ void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
}
SearchTask::SearchTask()
......@@ -92,7 +75,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
......@@ -137,10 +120,6 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(file_type_, total_time);
rc.ElapseFromBegin("totally cost");
return nullptr;
......
......@@ -6,6 +6,7 @@
#pragma once
#include "MetricBase.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
......@@ -29,6 +30,233 @@ class Metrics {
static MetricsBase &CreateMetricsCollector();
};
class CollectInsertMetrics {
public:
CollectInsertMetrics(size_t n, engine::Status& status) : n_(n), status_(status) {
start_time_ = METRICS_NOW_TIME;
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (status_.ok()) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
engine::Status& status_;
};
class CollectQueryMetrics {
public:
CollectQueryMetrics(size_t nq) : nq_(nq) {
start_time_ = METRICS_NOW_TIME;
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t nq_;
};
class CollectMergeFilesMetrics {
public:
CollectMergeFilesMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectMergeFilesMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectBuildIndexMetrics {
public:
CollectBuildIndexMetrics() {
start_time_ = METRICS_NOW_TIME;
}
~CollectBuildIndexMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
class CollectExecutionEngineMetrics {
public:
CollectExecutionEngineMetrics(double physical_size) : physical_size_(physical_size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectExecutionEngineMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size_);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size_ / double(total_time));
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
double physical_size_;
};
class CollectSerializeMetrics {
public:
CollectSerializeMetrics(size_t size) : size_(size) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSerializeMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size_ / total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t size_;
};
class CollectAddMetrics {
public:
CollectAddMetrics(size_t n, uint16_t dimension) : n_(n), dimension_(dimension) {
start_time_ = METRICS_NOW_TIME;
}
~CollectAddMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(dimension_),
total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
size_t n_;
uint16_t dimension_;
};
class CollectDurationMetrics {
public:
CollectDurationMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectDurationMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch (index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class CollectSearchTaskMetrics {
public:
CollectSearchTaskMetrics(int index_type) : index_type_(index_type) {
start_time_ = METRICS_NOW_TIME;
}
~CollectSearchTaskMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
switch(index_type_) {
case engine::meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case engine::meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
int index_type_;
};
class MetricCollector {
public:
MetricCollector() {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
start_time_ = METRICS_NOW_TIME;
}
~MetricCollector() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
}
private:
using TIME_POINT = std::chrono::system_clock::time_point;
TIME_POINT start_time_;
};
}
}
......
......@@ -13,15 +13,16 @@ namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name,
const std::string &alias,
const std::string &type,
uint64_t device_id,
bool enable_loader,
bool enable_executor) {
if (name == "disk") {
return std::make_shared<DiskResource>(alias, enable_loader, enable_executor);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias, enable_loader, enable_executor);
} else if (name == "gpu") {
return std::make_shared<GpuResource>(alias, enable_loader, enable_executor);
if (type == "DISK") {
return std::make_shared<DiskResource>(name, device_id, enable_loader, enable_executor);
} else if (type == "CPU") {
return std::make_shared<CpuResource>(name, device_id, enable_loader, enable_executor);
} else if (type == "GPU") {
return std::make_shared<GpuResource>(name, device_id, enable_loader, enable_executor);
} else {
return nullptr;
}
......
......@@ -22,7 +22,8 @@ class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name,
const std::string &alias = "",
const std::string &type,
uint64_t device_id,
bool enable_loader = true,
bool enable_executor = true);
};
......
......@@ -48,6 +48,16 @@ ResourceMgr::Add(ResourcePtr &&resource) {
return ret;
}
void
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
auto res1 = get_resource_by_name(name1);
auto res2 = get_resource_by_name(name2);
if (res1 && res2) {
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
}
}
void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
if (auto observe_a = res1.lock()) {
......@@ -116,6 +126,16 @@ ResourceMgr::DumpTaskTables() {
return ss.str();
}
ResourcePtr
ResourceMgr::get_resource_by_name(const std::string &name) {
for (auto &res : resources_) {
if (res->Name() == name) {
return res;
}
}
return nullptr;
}
void
ResourceMgr::event_process() {
while (running_) {
......
......@@ -49,6 +49,9 @@ public:
ResourceWPtr
Add(ResourcePtr &&resource);
void
Connect(const std::string &res1, const std::string &res2, Connection &connection);
/*
* Create connection between A and B;
*/
......@@ -80,6 +83,9 @@ public:
DumpTaskTables();
private:
ResourcePtr
get_resource_by_name(const std::string &name);
void
event_process();
......
......@@ -48,6 +48,7 @@ ToString(const TaskTimestamp &timestamp) {
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ", finish=" << timestamp.finish;
ss << ">";
return ss.str();
}
......@@ -92,6 +93,7 @@ TaskTableItem::Executed() {
state = TaskTableItemState::EXECUTED;
lock.unlock();
timestamp.executed = get_now_timestamp();
timestamp.finish = get_now_timestamp();
return true;
}
return false;
......@@ -114,6 +116,7 @@ TaskTableItem::Moved() {
state = TaskTableItemState::MOVED;
lock.unlock();
timestamp.moved = get_now_timestamp();
timestamp.finish = get_now_timestamp();
return true;
}
return false;
......
......@@ -36,6 +36,7 @@ struct TaskTimestamp {
uint64_t loaded = 0;
uint64_t execute = 0;
uint64_t executed = 0;
uint64_t finish = 0;
};
struct TaskTableItem {
......
......@@ -20,7 +20,7 @@ next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
}
}
// TODO: this function called with only on tasks, so it will always push task to first neighbour
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
......@@ -31,7 +31,7 @@ push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighb
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
task = task->Clone();
// task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
return out;
}
CpuResource::CpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, enable_loader, enable_executor) {}
CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {}
void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
......
......@@ -17,7 +17,7 @@ namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name, bool enable_loader, bool enable_executor);
CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -15,8 +15,8 @@ std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
return out;
}
DiskResource::DiskResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, enable_loader, enable_executor) {
DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class DiskResource : public Resource {
public:
explicit
DiskResource(std::string name, bool enable_loader, bool enable_executor);
DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -16,11 +16,11 @@ std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
return out;
}
GpuResource::GpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, enable_loader, enable_executor) {}
GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
task->Load(LoadType::CPU2GPU, device_id_);
}
void GpuResource::Process(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class GpuResource : public Resource {
public:
explicit
GpuResource(std::string name, bool enable_loader, bool enable_executor);
GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -18,10 +18,12 @@ std::ostream &operator<<(std::ostream &out, const Resource &resource) {
Resource::Resource(std::string name,
ResourceType type,
uint64_t device_id,
bool enable_loader,
bool enable_executor)
: name_(std::move(name)),
type_(type),
device_id_(device_id),
running_(false),
enable_loader_(enable_loader),
enable_executor_(enable_executor),
......
......@@ -82,6 +82,11 @@ public:
subscriber_ = std::move(subscriber);
}
inline std::string
Name() const {
return name_;
}
inline ResourceType
Type() const {
return type_;
......@@ -112,6 +117,7 @@ public:
protected:
Resource(std::string name,
ResourceType type,
uint64_t device_id,
bool enable_loader,
bool enable_executor);
......@@ -163,6 +169,9 @@ private:
void
executor_function();
protected:
uint64_t device_id_;
private:
std::string name_;
ResourceType type_;
......
......@@ -81,24 +81,6 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
void
CollectDurationMetrics(int index_type, double total_time) {
switch (index_type) {
case meta::TableFileSchema::RAW: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
break;
}
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
......@@ -159,7 +141,7 @@ XSearchTask::Execute() {
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
server::CollectDurationMetrics metrics(index_type_);
std::vector<long> output_ids;
std::vector<float> output_distence;
......@@ -202,10 +184,6 @@ XSearchTask::Execute() {
context->IndexSearchDone(index_id_);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.ElapseFromBegin("totally cost");
// release index in resource
......
......@@ -119,4 +119,33 @@ TEST_F(MetricTest, Metric_Tes) {
delete [] qxb;
};
TEST_F(MetricTest, Collector_Metrics_Test){
engine::Status status = engine::Status::OK();
server::CollectInsertMetrics insert_metrics0(0, status);
status = engine::Status::Error("error");
server::CollectInsertMetrics insert_metrics1(0, status);
server::CollectQueryMetrics query_metrics(10);
server::CollectMergeFilesMetrics merge_metrics();
server::CollectBuildIndexMetrics build_index_metrics();
server::CollectExecutionEngineMetrics execution_metrics(10);
server::CollectSerializeMetrics serialize_metrics(10);
server::CollectAddMetrics add_metrics(10, 128);
server::CollectDurationMetrics duration_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectDurationMetrics duration_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectDurationMetrics duration_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::CollectSearchTaskMetrics search_metrics_raw(engine::meta::TableFileSchema::RAW);
server::CollectSearchTaskMetrics search_metrics_index(engine::meta::TableFileSchema::TO_INDEX);
server::CollectSearchTaskMetrics search_metrics_delete(engine::meta::TableFileSchema::TO_DELETE);
server::MetricCollector metric_collector();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册