提交 2729a2a4 编写于 作者: Y Yu Kun

add GpuCacheMgr


Former-commit-id: 23bb00ad1f5ac95ec0d0c587ab868dd6032766a4
上级 4f05a3e2
......@@ -36,8 +36,11 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: 0,1 # gpu id
engine_config:
nprobe: 10
......
......@@ -46,6 +46,8 @@ public:
double freemem_percent() const { return freemem_percent_; };
void set_freemem_percent(double percent) { freemem_percent_ = percent; }
void set_gpu_ids(std::vector<uint64_t> gpu_ids) { gpu_ids_.assign(gpu_ids.begin(), gpu_ids.end()); }
std::vector<uint64_t> gpu_ids() const { return gpu_ids_; }
size_t size() const;
bool exists(const std::string& key);
......@@ -60,6 +62,7 @@ private:
int64_t usage_;
int64_t capacity_;
double freemem_percent_;
std::vector<uint64_t> gpu_ids_;
LRU<std::string, CacheObjPtr> lru_;
mutable std::mutex mutex_;
......
......@@ -4,6 +4,7 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "utils/Log.h"
#include "GpuCacheMgr.h"
#include "server/ServerConfig.h"
......@@ -11,19 +12,53 @@ namespace zilliz {
namespace milvus {
namespace cache {
std::mutex GpuCacheMgr::mutex_;
std::unordered_map<uint64_t, GpuCacheMgr*> GpuCacheMgr::instance_;
namespace {
constexpr int64_t unit = 1024 * 1024 * 1024;
}
GpuCacheMgr::GpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
std::string gpu_ids_str = config.GetValue(server::CONFIG_GPU_IDS, "0,1");
std::vector<uint64_t> gpu_ids;
for (auto i = 0; i < gpu_ids_str.length(); ) {
if (gpu_ids_str[i] != ',') {
int id = 0;
while (gpu_ids_str[i] != ',') {
id = id * 10 + gpu_ids_str[i] - '0';
++i;
}
gpu_ids.push_back(id);
} else {
++i;
}
}
cache_->set_gpu_ids(gpu_ids);
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 1);
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85);
if (free_percent > 0.0 && free_percent <= 1.0) {
cache_->set_freemem_percent(free_percent);
} else {
SERVER_LOG_ERROR << "Invalid gpu_cache_free_percent: " << free_percent <<
", defaultly set to " << cache_->freemem_percent();
}
}
void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
//TODO: copy data to gpu
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->insert(key, data);
}
}
......
......@@ -5,6 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include <unordered_map>
namespace zilliz {
namespace milvus {
......@@ -15,12 +16,23 @@ private:
GpuCacheMgr();
public:
static CacheMgr* GetInstance() {
static GpuCacheMgr s_mgr;
return &s_mgr;
static CacheMgr* GetInstance(uint64_t gpu_id) {
if (!instance_[gpu_id]) {
std::lock_guard<std::mutex> lock(mutex_);
if(!instance_[gpu_id]) {
instance_.insert(std::pair<uint64_t, GpuCacheMgr* >(gpu_id, new GpuCacheMgr()));
}
}
return instance_.at(gpu_id);
// static GpuCacheMgr s_mgr;
// return &s_mgr;
}
void InsertItem(const std::string& key, const DataObjPtr& data) override;
private:
static std::mutex mutex_;
static std::unordered_map<uint64_t, GpuCacheMgr* > instance_;
};
}
......
......@@ -363,7 +363,7 @@ void DBImpl::StartMetricTask() {
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
uint64_t size;
Size(size);
server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
......
......@@ -64,6 +64,8 @@ public:
virtual Status Cache() = 0;
virtual Status GpuCache(uint64_t gpu_id) = 0;
virtual Status Init() = 0;
};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <stdexcept>
#include "src/cache/GpuCacheMgr.h"
#include "src/server/ServerConfig.h"
#include "src/metrics/Metrics.h"
......@@ -144,28 +145,60 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if (!already_in_cache) {
GpuCache(device_id);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
Status ExecutionEngineImpl::CopyToCpu() {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status::Error(e.what());
} catch (std::exception &e) {
return Status::Error(e.what());
}
}
if(!already_in_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
}
return Status::OK();
}
......@@ -246,6 +279,10 @@ Status ExecutionEngineImpl::Cache() {
return Status::OK();
}
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
}
// TODO(linxj): remove.
Status ExecutionEngineImpl::Init() {
using namespace zilliz::milvus::server;
......
......@@ -59,6 +59,8 @@ public:
Status Cache() override;
Status GpuCache(uint64_t gpu_id) override;
Status Init() override;
private:
......
......@@ -31,7 +31,8 @@ class MetricsBase{
virtual void IndexFileSizeHistogramObserve(double value) {};
virtual void BuildIndexDurationSecondsHistogramObserve(double value) {};
virtual void CacheUsageGaugeSet(double value) {};
virtual void CpuCacheUsageGaugeSet(double value) {};
virtual void GpuCacheUsageGaugeSet(double value) {};
virtual void MetaAccessTotalIncrement(double value = 1) {};
virtual void MetaAccessDurationSecondsHistogramObserve(double value) {};
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <cache/GpuCacheMgr.h>
#include "PrometheusMetrics.h"
#include "utils/Log.h"
#include "SystemInfo.h"
......@@ -166,6 +167,18 @@ void PrometheusMetrics::CPUTemperature() {
}
}
void PrometheusMetrics::GpuCacheUsageGaugeSet(double value) {
if(!startup_) return;
int64_t num_processors = server::SystemInfo::GetInstance().num_processor();
for (auto i = 0; i < num_processors; ++i) {
// int gpu_cache_usage = cache::GpuCacheMgr::GetInstance(i)->CacheUsage();
// int gpu_cache_total = cache::GpuCacheMgr::GetInstance(i)->CacheCapacity();
// prometheus::Gauge &gpu_cache = gpu_cache_usage_.Add({{"GPU_Cache", std::to_string(i)}});
// gpu_cache.Set(gpu_cache_usage * 100 / gpu_cache_total);
}
}
}
}
}
......@@ -54,7 +54,8 @@ class PrometheusMetrics: public MetricsBase {
void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);};
void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);};
void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);};
void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);};
void CpuCacheUsageGaugeSet(double value) override { if(startup_) cpu_cache_usage_gauge_.Set(value);};
void GpuCacheUsageGaugeSet(double value) override;
void MetaAccessTotalIncrement(double value = 1) override { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) override { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
......@@ -336,12 +337,18 @@ class PrometheusMetrics: public MetricsBase {
.Register(*registry_);
prometheus::Counter &cache_access_total_ = cache_access_.Add({});
// record cache usage and %
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
// record CPU cache usage and %
prometheus::Family<prometheus::Gauge> &cpu_cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage_bytes")
.Help("current cache usage by bytes")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
prometheus::Gauge &cpu_cache_usage_gauge_ = cpu_cache_usage_.Add({});
//record GPU cache usage and %
prometheus::Family<prometheus::Gauge> &gpu_cache_usage_ = prometheus::BuildGauge()
.Name("gpu_cache_usage_bytes")
.Help("current gpu cache usage by bytes")
.Register(*registry_);
// record query response
using Quantiles = std::vector<prometheus::detail::CKMSQuantiles::Quantile>;
......@@ -360,8 +367,7 @@ class PrometheusMetrics: public MetricsBase {
prometheus::Family<prometheus::Gauge> &query_vector_response_per_second_ = prometheus::BuildGauge()
.Name("query_vector_response_per_microsecond")
.Help("the number of vectors can be queried every second ")
.Register(*registry_);
prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
.Register(*registry_); prometheus::Gauge &query_vector_response_per_second_gauge_ = query_vector_response_per_second_.Add({});
prometheus::Family<prometheus::Gauge> &query_response_per_second_ = prometheus::BuildGauge()
.Name("query_response_per_microsecond")
......
......@@ -210,9 +210,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
int64_t row_count = 0;
conn->DropTable(table);
// stat = conn->CountTable(table, row_count);
// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
// conn->DropTable(table);
stat = conn->CountTable(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
}
}
......@@ -264,6 +264,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
}
int64_t row_count;
conn->CountTable(TABLE_NAME, row_count);
std::cout << "\t" << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
}
}
......
......@@ -38,6 +38,8 @@ static const char* CONFIG_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const char* CACHE_FREE_PERCENT = "cache_free_percent";
static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static const char* CONFIG_GPU_IDS = "gpu_ids";
static const char *GPU_CACHE_FREE_PERCENT = "gpu_cache_free_percent";
static const char* CONFIG_LICENSE = "license_config";
static const char* CONFIG_LICENSE_PATH = "license_path";
......
......@@ -9,6 +9,7 @@
#include "db/meta/MetaConsts.h"
#include "db/Factories.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h
#include "utils/CommonUtil.h"
#include <gtest/gtest.h>
......@@ -437,4 +438,9 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
ConvertTimeRangeToDBDates(start_value, end_value, dates);
db_->DeleteTable(TABLE_NAME, dates);
}
TEST_F(DBTest, GPU_CACHE_MGR_TEST) {
std::vector<uint64_t > gpu_ids = cache::
cache::CpuCacheMgr::GetInstance()->CacheUsage();
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册