提交 1bc7db12 编写于 作者: Z zhiru

fix merge conflict


Former-commit-id: 693ae97543b8240f3bbcdbb54699fbf01fc1161c
......@@ -17,6 +17,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-26 - cmake. Add thirdparty packages
- MS-31 - cmake: add prometheus
- MS-33 - cmake: add -j4 to make third party packages build faster
- MS-27 - support gpu config and disable license build config in cmake
- MS-47 - Add query vps metrics
- MS-54 - cmake: Change Thirft thrid party URL to github.com
### Task
......@@ -27,3 +29,4 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-30 - Use faiss v1.5.2
- MS-32 - Fix thrift error
- MS-34 - Fix prometheus-cpp thirdparty
- MS-37 - Add query, cache usage, disk write speed and file data size metrics
......@@ -58,10 +58,10 @@ endif()
if(CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE -fopenmp")
if (GPU_VERSION STREQUAL "ON")
set(ENABLE_LICENSE "ON")
add_definitions("-DENABLE_LICENSE")
endif ()
# if (GPU_VERSION STREQUAL "ON")
# set(ENABLE_LICENSE "ON")
# add_definitions("-DENABLE_LICENSE")
# endif ()
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE -fopenmp")
endif()
......
......@@ -71,17 +71,28 @@ Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
return status;
}
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
template<typename EngineT>
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
meta::DatesT dates = {meta::Meta::GetDate()};
return Query(table_id, k, nq, vectors, dates, results);
Status result = Query(table_id, k, nq, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
auto average_time = total_time / nq;
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
return result;
}
template<typename EngineT>
......@@ -250,7 +261,12 @@ void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
if (shutting_down_.load(std::memory_order_acquire)) break;
std::this_thread::sleep_for(std::chrono::seconds(interval));
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheUsage();
LOG(DEBUG) << "Cache usage " << cache_total;
server::Metrics::GetInstance().CacheUsageGaugeSet(static_cast<double>(cache_total));
long size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
TrySchedule();
}
}
......
......@@ -8,6 +8,7 @@
#include "MemManager.h"
#include "Meta.h"
#include "MetaConsts.h"
#include "metrics/Metrics.h"
#include <iostream>
#include <sstream>
......@@ -48,8 +49,14 @@ template<typename EngineT>
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
table_id = schema_.table_id;
auto size = ApproximateSize();
auto start_time = METRICS_NOW_TIME;
pEE_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
schema_.size = size;
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time);
schema_.file_type = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
......
......@@ -11,6 +11,7 @@
#include <string>
#include <signal.h>
#include <easylogging++.h>
#include "metrics/Metrics.h"
#include "utils/SignalUtil.h"
#include "utils/CommonUtil.h"
......@@ -25,7 +26,6 @@ using namespace zilliz::vecwise;
int
main(int argc, char *argv[]) {
printf("Vecwise engine server start...\n");
// zilliz::lib::gpu::InitMemoryAllocator();
signal(SIGINT, server::SignalUtil::HandleSignal);
......
......@@ -64,7 +64,13 @@ class MetricsBase{
virtual void IndexFileSizeGaugeSet(double value) {};
virtual void RawFileSizeGaugeSet(double value) {};
virtual void FaissDiskLoadIOSpeedGaugeSet(double value) {};
virtual void QueryResponseSummaryObserve(double value) {};
virtual void DiskStoreIOSpeedGaugeSet(double value) {};
virtual void DataFileSizeGaugeSet(double value) {};
virtual void AddVectorsSuccessGaugeSet(double value) {};
virtual void AddVectorsFailGaugeSet(double value) {};
virtual void QueryVectorResponseSummaryObserve(double value, int count = 1) {};
virtual void QueryVectorResponsePerSecondGaugeSet(double value) {};
};
......
......@@ -4,7 +4,6 @@
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Metrics.h"
#include "PrometheusMetrics.h"
......
......@@ -81,23 +81,29 @@ class PrometheusMetrics: public MetricsBase {
void MemUsageTotalGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Increment(value);};
void MemUsageTotalGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Decrement(value);};
void MetaAccessTotalIncrement(double value = 1) { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
void MetaAccessTotalIncrement(double value = 1) override { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) override { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
void FaissDiskLoadDurationSecondsHistogramObserve(double value) { if(startup_) faiss_disk_load_duration_seconds_histogram_.Observe(value);};
void FaissDiskLoadSizeBytesHistogramObserve(double value) { if(startup_) faiss_disk_load_size_bytes_histogram_.Observe(value);};
void FaissDiskLoadDurationSecondsHistogramObserve(double value) override { if(startup_) faiss_disk_load_duration_seconds_histogram_.Observe(value);};
void FaissDiskLoadSizeBytesHistogramObserve(double value) override { if(startup_) faiss_disk_load_size_bytes_histogram_.Observe(value);};
// void FaissDiskLoadIOSpeedHistogramObserve(double value) { if(startup_) faiss_disk_load_IO_speed_histogram_.Observe(value);};
void FaissDiskLoadIOSpeedGaugeSet(double value) { if(startup_) faiss_disk_load_IO_speed_gauge_.Set(value);};
void CacheAccessTotalIncrement(double value = 1) { if(startup_) cache_access_total_.Increment(value);};
void MemTableMergeDurationSecondsHistogramObserve(double value) { if(startup_) mem_table_merge_duration_seconds_histogram_.Observe(value);};
void SearchIndexDataDurationSecondsHistogramObserve(double value) { if(startup_) search_index_data_duration_seconds_histogram_.Observe(value);};
void SearchRawDataDurationSecondsHistogramObserve(double value) { if(startup_) search_raw_data_duration_seconds_histogram_.Observe(value);};
void IndexFileSizeTotalIncrement(double value = 1) { if(startup_) index_file_size_total_.Increment(value);};
void RawFileSizeTotalIncrement(double value = 1) { if(startup_) raw_file_size_total_.Increment(value);};
void IndexFileSizeGaugeSet(double value) { if(startup_) index_file_size_gauge_.Set(value);};
void RawFileSizeGaugeSet(double value) { if(startup_) raw_file_size_gauge_.Set(value);};
void FaissDiskLoadIOSpeedGaugeSet(double value) override { if(startup_) faiss_disk_load_IO_speed_gauge_.Set(value);};
void CacheAccessTotalIncrement(double value = 1) override { if(startup_) cache_access_total_.Increment(value);};
void MemTableMergeDurationSecondsHistogramObserve(double value) override { if(startup_) mem_table_merge_duration_seconds_histogram_.Observe(value);};
void SearchIndexDataDurationSecondsHistogramObserve(double value) override { if(startup_) search_index_data_duration_seconds_histogram_.Observe(value);};
void SearchRawDataDurationSecondsHistogramObserve(double value) override { if(startup_) search_raw_data_duration_seconds_histogram_.Observe(value);};
void IndexFileSizeTotalIncrement(double value = 1) override { if(startup_) index_file_size_total_.Increment(value);};
void RawFileSizeTotalIncrement(double value = 1) override { if(startup_) raw_file_size_total_.Increment(value);};
void IndexFileSizeGaugeSet(double value) override { if(startup_) index_file_size_gauge_.Set(value);};
void RawFileSizeGaugeSet(double value) override { if(startup_) raw_file_size_gauge_.Set(value);};
void QueryResponseSummaryObserve(double value) override {if(startup_) query_response_summary_.Observe(value);};
void DiskStoreIOSpeedGaugeSet(double value) override { if(startup_) disk_store_IO_speed_gauge_.Set(value);};
void DataFileSizeGaugeSet(double value) override { if(startup_) data_file_size_gauge_.Set(value);};
void AddVectorsSuccessGaugeSet(double value) override { if(startup_) add_vectors_success_gauge_.Set(value);};
void AddVectorsFailGaugeSet(double value) override { if(startup_) add_vectors_fail_gauge_.Set(value);};
void QueryVectorResponseSummaryObserve(double value, int count = 1) override { if (startup_) for(int i = 0 ; i < count ; ++i) query_vector_response_summary_.Observe(value);};
void QueryVectorResponsePerSecondGaugeSet(double value) override {if (startup_) query_vector_response_per_second_gauge_.Set(value);};
......@@ -295,11 +301,6 @@ class PrometheusMetrics: public MetricsBase {
////all form Cache.cpp
//record cache usage, when insert/erase/clear/free
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage")
.Help("total bytes that cache used")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
////all from Meta.cpp
......@@ -386,6 +387,53 @@ class PrometheusMetrics: public MetricsBase {
.Register(*registry_);
prometheus::Counter &cache_access_total_ = cache_access_.Add({});
// record cache usage and %
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage_bytes")
.Help("current cache usage by bytes")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
// record query response
using Quantiles = std::vector<prometheus::detail::CKMSQuantiles::Quantile>;
prometheus::Family<prometheus::Summary> &query_response_ = prometheus::BuildSummary()
.Name("query_response_summary")
.Help("query response summary")
.Register(*registry_);
prometheus::Summary &query_response_summary_ = query_response_.Add({}, Quantiles{{0.95,0.00},{0.9,0.05},{0.8,0.1}});
prometheus::Family<prometheus::Summary> &query_vector_response_ = prometheus::BuildSummary()
.Name("query_vector_response_summary")
.Help("query each vector response summary")
.Register(*registry_);
prometheus::Summary &query_vector_response_summary_ = query_vector_response_.Add({}, Quantiles{{0.95,0.00},{0.9,0.05},{0.8,0.1}});
prometheus::Family<prometheus::Gauge> &query_vector_response_per_second_ = prometheus::BuildGauge()
.Name("query_vector_response_per_microsecond")
.Help("the number of vectors can be queried every second ")
.Register(*registry_);
prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
prometheus::Family<prometheus::Gauge> &disk_store_IO_speed_ = prometheus::BuildGauge()
.Name("disk_store_IO_speed_bytes_per_microseconds")
.Help("disk_store_IO_speed")
.Register(*registry_);
prometheus::Gauge &disk_store_IO_speed_gauge_ = disk_store_IO_speed_.Add({});
prometheus::Family<prometheus::Gauge> &data_file_size_ = prometheus::BuildGauge()
.Name("data_file_size_bytes")
.Help("data file size by bytes")
.Register(*registry_);
prometheus::Gauge &data_file_size_gauge_ = data_file_size_.Add({});
prometheus::Family<prometheus::Gauge> &add_vectors_ = prometheus::BuildGauge()
.Name("add_vectors")
.Help("current added vectors")
.Register(*registry_);
prometheus::Gauge &add_vectors_success_gauge_ = add_vectors_.Add({{"outcome", "success"}});
prometheus::Gauge &add_vectors_fail_gauge_ = add_vectors_.Add({{"outcome", "fail"}});
};
......
......@@ -136,9 +136,6 @@ Server::Daemonize() {
int
Server::Start() {
// server::Metrics::GetInstance().Init();
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
server::Metrics::GetInstance().Init();
if (daemonized_) {
Daemonize();
......@@ -177,7 +174,7 @@ Server::Start() {
signal(SIGINT, SignalUtil::HandleSignal);
signal(SIGHUP, SignalUtil::HandleSignal);
signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init();
SERVER_LOG_INFO << "Vecwise server is running...";
StartService();
......
......@@ -6,14 +6,19 @@
#include "mutex"
#ifdef GPU_VERSION
#include <faiss/gpu/StandardGpuResources.h>
#include "faiss/gpu/GpuIndexIVFFlat.h"
#include "faiss/gpu/GpuAutoTune.h"
#include <faiss/gpu/GpuIndexIVFFlat.h>
#include <faiss/gpu/GpuAutoTune.h>
#endif
#include "faiss/IndexFlat.h"
#include <faiss/IndexFlat.h>
#include <easylogging++.h>
#include "server/ServerConfig.h"
#include "IndexBuilder.h"
......@@ -21,6 +26,31 @@ namespace zilliz {
namespace vecwise {
namespace engine {
class GpuResources {
public:
static GpuResources &GetInstance() {
static GpuResources instance;
return instance;
}
void SelectGpu() {
using namespace zilliz::vecwise::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num = server_config.GetInt32Value("gpu_index", 0);
}
int32_t GetGpu() {
return gpu_num;
}
private:
GpuResources() : gpu_num(0) { SelectGpu(); }
private:
int32_t gpu_num;
};
using std::vector;
static std::mutex gpu_resource;
......@@ -44,7 +74,7 @@ Index_ptr IndexBuilder::build_all(const long &nb,
std::lock_guard<std::mutex> lk(gpu_resource);
faiss::gpu::StandardGpuResources res;
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, 0, ori_index);
auto device_index = faiss::gpu::index_cpu_to_gpu(&res, GpuResources::GetInstance().GetGpu(), ori_index);
if (!device_index->is_trained) {
nt == 0 || xt == nullptr ? device_index->train(nb, xb)
: device_index->train(nt, xt);
......
......@@ -32,14 +32,14 @@ class IndexBuilder {
const long &nt = 0,
const std::vector<float> &xt = std::vector<float>());
void train(const long &nt,
const std::vector<float> &xt);
Index_ptr add(const long &nb,
const std::vector<float> &xb,
const std::vector<long> &ids);
void set_build_option(const Operand_ptr &opd);
//void train(const long &nt,
// const std::vector<float> &xt);
//
//Index_ptr add(const long &nb,
// const std::vector<float> &xb,
// const std::vector<long> &ids);
//
//void set_build_option(const Operand_ptr &opd);
protected:
......
......@@ -31,7 +31,8 @@ set(require_files
../../src/metrics/Metrics.cpp
# ../../src/cache/CacheMgr.cpp
# ../../src/metrics/PrometheusMetrics.cpp
../../src/metrics/PrometheusMetrics.cpp
../../src/metrics/MetricBase.h
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
......
......@@ -32,7 +32,7 @@ TEST_F(DBTest, Metric_Tes) {
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
server::Metrics::GetInstance().Init();
// server::PrometheusMetrics::GetInstance().exposer_ptr()->RegisterCollectable(server::PrometheusMetrics::GetInstance().registry_ptr());
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->SetCapacity(1*1024*1024*1024);
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->SetCapacity(2UL*1024*1024*1024);
std::cout<<zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->CacheCapacity()<<std::endl;
static const std::string group_name = "test_group";
static const int group_dim = 256;
......@@ -109,7 +109,7 @@ TEST_F(DBTest, Metric_Tes) {
} else {
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
std::this_thread::sleep_for(std::chrono::microseconds(2000));
}
search.join();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册