提交 6373c486 编写于 作者: Y yinghao.zou

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

add GpuCacheMgr and metrics

See merge request megasearch/milvus!407

Former-commit-id: cd5639c8be8d6075a0447df5572004205f57cc3c
......@@ -36,8 +36,11 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: 0,1 # gpu id
engine_config:
nprobe: 10
......
......@@ -46,6 +46,9 @@ public:
double freemem_percent() const { return freemem_percent_; };
void set_freemem_percent(double percent) { freemem_percent_ = percent; }
void set_gpu_ids(std::vector<uint64_t>& gpu_ids) { gpu_ids_ = gpu_ids; }
std::vector<uint64_t> gpu_ids() const { return gpu_ids_; }
size_t size() const;
bool exists(const std::string& key);
......@@ -60,6 +63,7 @@ private:
int64_t usage_;
int64_t capacity_;
double freemem_percent_;
std::vector<uint64_t> gpu_ids_;
LRU<std::string, CacheObjPtr> lru_;
mutable std::mutex mutex_;
......
......@@ -56,6 +56,7 @@ engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) {
}
void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
std::cout << "dashalk\n";
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -130,6 +131,24 @@ void CacheMgr::SetCapacity(int64_t capacity) {
cache_->set_capacity(capacity);
}
std::vector<uint64_t> CacheMgr::GpuIds() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
std::vector<uint64_t> gpu_ids;
return gpu_ids;
}
return cache_->gpu_ids();
}
void CacheMgr::SetGpuIds(std::vector<uint64_t> gpu_ids){
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->set_gpu_ids(gpu_ids);
}
}
}
}
......@@ -33,6 +33,8 @@ public:
int64_t CacheUsage() const;
int64_t CacheCapacity() const;
void SetCapacity(int64_t capacity);
std::vector<uint64_t > GpuIds() const;
void SetGpuIds(std::vector<uint64_t> gpu_ids);
protected:
CacheMgr();
......
......@@ -16,6 +16,7 @@ private:
CpuCacheMgr();
public:
//TODO: use smart pointer instead
static CacheMgr* GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
......
......@@ -4,6 +4,7 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "utils/Log.h"
#include "GpuCacheMgr.h"
#include "server/ServerConfig.h"
......@@ -11,19 +12,57 @@ namespace zilliz {
namespace milvus {
namespace cache {
std::mutex GpuCacheMgr::mutex_;
std::unordered_map<uint64_t, GpuCacheMgrPtr> GpuCacheMgr::instance_;
namespace {
constexpr int64_t unit = 1024 * 1024 * 1024;
void parse_gpu_ids(std::string gpu_ids_str, std::vector<uint64_t>& gpu_ids) {
for (auto i = 0; i < gpu_ids_str.length(); ) {
if (gpu_ids_str[i] != ',') {
int id = 0;
while (gpu_ids_str[i] <= '9' && gpu_ids_str[i] >= '0') {
id = id * 10 + gpu_ids_str[i] - '0';
++i;
}
gpu_ids.push_back(id);
} else {
++i;
}
}
}
}
GpuCacheMgr::GpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 1);
std::string gpu_ids_str = config.GetValue(server::CONFIG_GPU_IDS, "0,1");
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 2);
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
std::vector<uint64_t> gpu_ids;
parse_gpu_ids(gpu_ids_str, gpu_ids);
cache_->set_gpu_ids(gpu_ids);
double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85);
if (free_percent > 0.0 && free_percent <= 1.0) {
cache_->set_freemem_percent(free_percent);
} else {
SERVER_LOG_ERROR << "Invalid gpu_cache_free_percent: " << free_percent <<
", defaultly set to " << cache_->freemem_percent();
}
}
void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
//TODO: copy data to gpu
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->insert(key, data);
}
}
......
......@@ -5,22 +5,35 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include <unordered_map>
#include <memory>
namespace zilliz {
namespace milvus {
namespace cache {
class GpuCacheMgr;
using GpuCacheMgrPtr = std::shared_ptr<GpuCacheMgr>;
class GpuCacheMgr : public CacheMgr {
private:
public:
GpuCacheMgr();
public:
static CacheMgr* GetInstance() {
static GpuCacheMgr s_mgr;
return &s_mgr;
static CacheMgr* GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(mutex_);
instance_.insert(std::pair<uint64_t, GpuCacheMgrPtr>(gpu_id, std::make_shared<GpuCacheMgr>()));
// instance_[gpu_id] = std::make_shared<GpuCacheMgr>();
}
return instance_[gpu_id].get();
}
void InsertItem(const std::string& key, const DataObjPtr& data) override;
private:
static std::mutex mutex_;
static std::unordered_map<uint64_t, GpuCacheMgrPtr> instance_;
};
}
......
......@@ -342,7 +342,7 @@ void DBImpl::StartMetricTask() {
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
uint64_t size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
......
......@@ -64,6 +64,8 @@ public:
virtual Status Cache() = 0;
virtual Status GpuCache(uint64_t gpu_id) = 0;
virtual Status Init() = 0;
};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <stdexcept>
#include "src/cache/GpuCacheMgr.h"
#include "src/server/ServerConfig.h"
#include "src/metrics/Metrics.h"
......@@ -144,28 +145,60 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
Status ExecutionEngineImpl::CopyToCpu() {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -246,6 +279,10 @@ Status ExecutionEngineImpl::Cache() {
return Status::OK();
}
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
}
// TODO(linxj): remove.
Status ExecutionEngineImpl::Init() {
using namespace zilliz::milvus::server;
......
......@@ -59,6 +59,8 @@ public:
Status Cache() override;
Status GpuCache(uint64_t gpu_id) override;
Status Init() override;
private:
......
......@@ -31,7 +31,8 @@ class MetricsBase{
virtual void IndexFileSizeHistogramObserve(double value) {};
virtual void BuildIndexDurationSecondsHistogramObserve(double value) {};
virtual void CacheUsageGaugeSet(double value) {};
virtual void CpuCacheUsageGaugeSet(double value) {};
virtual void GpuCacheUsageGaugeSet(double value) {};
virtual void MetaAccessTotalIncrement(double value = 1) {};
virtual void MetaAccessDurationSecondsHistogramObserve(double value) {};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <cache/GpuCacheMgr.h>
#include "PrometheusMetrics.h"
#include "utils/Log.h"
#include "SystemInfo.h"
......@@ -166,6 +167,18 @@ void PrometheusMetrics::CPUTemperature() {
}
}
void PrometheusMetrics::GpuCacheUsageGaugeSet(double value) {
if(!startup_) return;
int64_t num_processors = server::SystemInfo::GetInstance().num_processor();
for (auto i = 0; i < num_processors; ++i) {
// int gpu_cache_usage = cache::GpuCacheMgr::GetInstance(i)->CacheUsage();
// int gpu_cache_total = cache::GpuCacheMgr::GetInstance(i)->CacheCapacity();
// prometheus::Gauge &gpu_cache = gpu_cache_usage_.Add({{"GPU_Cache", std::to_string(i)}});
// gpu_cache.Set(gpu_cache_usage * 100 / gpu_cache_total);
}
}
}
}
}
......@@ -54,7 +54,8 @@ class PrometheusMetrics: public MetricsBase {
void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);};
void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);};
void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);};
void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);};
void CpuCacheUsageGaugeSet(double value) override { if(startup_) cpu_cache_usage_gauge_.Set(value);};
void GpuCacheUsageGaugeSet(double value) override;
void MetaAccessTotalIncrement(double value = 1) override { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) override { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
......@@ -336,12 +337,18 @@ class PrometheusMetrics: public MetricsBase {
.Register(*registry_);
prometheus::Counter &cache_access_total_ = cache_access_.Add({});
// record cache usage and %
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
// record CPU cache usage and %
prometheus::Family<prometheus::Gauge> &cpu_cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage_bytes")
.Help("current cache usage by bytes")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
prometheus::Gauge &cpu_cache_usage_gauge_ = cpu_cache_usage_.Add({});
//record GPU cache usage and %
prometheus::Family<prometheus::Gauge> &gpu_cache_usage_ = prometheus::BuildGauge()
.Name("gpu_cache_usage_bytes")
.Help("current gpu cache usage by bytes")
.Register(*registry_);
// record query response
using Quantiles = std::vector<prometheus::detail::CKMSQuantiles::Quantile>;
......@@ -360,8 +367,7 @@ class PrometheusMetrics: public MetricsBase {
prometheus::Family<prometheus::Gauge> &query_vector_response_per_second_ = prometheus::BuildGauge()
.Name("query_vector_response_per_microsecond")
.Help("the number of vectors can be queried every second ")
.Register(*registry_);
prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
.Register(*registry_); prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
prometheus::Family<prometheus::Gauge> &query_response_per_second_ = prometheus::BuildGauge()
.Name("query_response_per_microsecond")
......
......@@ -209,9 +209,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
int64_t row_count = 0;
conn->DropTable(table);
// stat = conn->CountTable(table, row_count);
// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
// conn->DropTable(table);
stat = conn->CountTable(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
}
}
......@@ -263,6 +263,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
}
int64_t row_count;
conn->CountTable(TABLE_NAME, row_count);
std::cout << "\t" << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
}
}
......
......@@ -36,8 +36,10 @@ static const char* CONFIG_LOG = "log_config";
static const char* CONFIG_CACHE = "cache_config";
static const char* CONFIG_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const char* CACHE_FREE_PERCENT = "cache_free_percent";
static const char* CACHE_FREE_PERCENT = "cpu_cache_free_percent";
static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const char* CONFIG_GPU_IDS = "gpu_ids";
static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent";
static const char* CONFIG_LICENSE = "license_config";
static const char* CONFIG_LICENSE_PATH = "license_path";
......
......@@ -146,7 +146,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
}
TEST(CacheTest, GPU_CACHE_TEST) {
cache::CacheMgr* gpu_mgr = cache::GpuCacheMgr::GetInstance();
cache::CacheMgr* gpu_mgr = cache::GpuCacheMgr::GetInstance(0);
const int dim = 256;
......@@ -164,6 +164,25 @@ TEST(CacheTest, GPU_CACHE_TEST) {
gpu_mgr->ClearCache();
ASSERT_EQ(gpu_mgr->ItemCount(), 0);
gpu_mgr->SetCapacity(4096000000);
for (auto i = 0; i < 3; i++) {
MockVecIndex *mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000; //2G
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(index);
std::cout << data_obj->size() <<std::endl;
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
// ASSERT_EQ(gpu_mgr->ItemCount(), 2);
// auto obj0 = gpu_mgr->GetItem("index_0");
// ASSERT_EQ(obj0, nullptr);
// auto obj1 = gpu_mgr->GetItem("index_1");
// auto obj2 = gpu_mgr->GetItem("index_2");
gpu_mgr->ClearCache();
ASSERT_EQ(gpu_mgr->ItemCount(), 0);
}
TEST(CacheTest, INVALID_TEST) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册