未验证 提交 d34bd505 编写于 作者: C Cai Yudong 提交者: GitHub

#1735 fix search fail with gpu out of memory using ivf_flag (#1748)

* #1735 fix search fail with gpu out of memory using ivf_flag
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1735 add lock for each GpuCacheMgr instance
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1735 optimize cache debug info
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1735 set smaller cache threshold
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1735 update cache debug log
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix unittest
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1735 fix cache issue
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 cda57cf7
......@@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1708 NSG search crashed
- \#1724 Remove unused unittests
- \#1734 Opentracing for combined search request
- \#1735 Fix search out of memory with ivf_flat
## Feature
- \#1603 BinaryFlat add 2 Metric: Substructure and Superstructure
......
......@@ -26,7 +26,7 @@ template <typename ItemObj>
class Cache {
public:
// mem_capacity, units:GB
Cache(int64_t capacity_gb, uint64_t cache_max_count);
Cache(int64_t capacity_gb, int64_t cache_max_count, const std::string& header = "");
~Cache() = default;
int64_t
......@@ -80,6 +80,7 @@ class Cache {
free_memory();
private:
std::string header_;
int64_t usage_;
int64_t capacity_;
double freemem_percent_;
......
......@@ -12,12 +12,16 @@
namespace milvus {
namespace cache {
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85;
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.7;
constexpr double WARNING_THRESHHOLD_PERCENT = 0.9;
template <typename ItemObj>
Cache<ItemObj>::Cache(int64_t capacity, uint64_t cache_max_count)
: usage_(0), capacity_(capacity), freemem_percent_(DEFAULT_THRESHHOLD_PERCENT), lru_(cache_max_count) {
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
Cache<ItemObj>::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header)
: usage_(0),
capacity_(capacity),
header_(header),
freemem_percent_(DEFAULT_THRESHHOLD_PERCENT),
lru_(cache_max_count) {
}
template <typename ItemObj>
......@@ -61,12 +65,7 @@ Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
return;
}
// if(item->size() > capacity_) {
// SERVER_LOG_ERROR << "Item size " << item->size()
// << " is too large to insert into cache, capacity " << capacity_;
// return;
// }
size_t item_size = item->Size();
// calculate usage
{
std::lock_guard<std::mutex> lock(mutex_);
......@@ -78,13 +77,13 @@ Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
}
// plus new item size
usage_ += item->Size();
usage_ += item_size;
}
// if usage exceed capacity, free some items
if (usage_ > capacity_) {
SERVER_LOG_DEBUG << "Current usage " << usage_ << " exceeds cache capacity " << capacity_
<< ", start free memory";
if (usage_ > (int64_t)(capacity_ * WARNING_THRESHHOLD_PERCENT)) {
SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity "
<< (capacity_ >> 20) << "MB, start free memory";
free_memory();
}
......@@ -93,8 +92,9 @@ Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_
<< " bytes," << " capacity: " << capacity_ << " bytes";
SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
}
......@@ -106,13 +106,15 @@ Cache<ItemObj>::erase(const std::string& key) {
return;
}
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_
<< " bytes," << " capacity: " << capacity_ << " bytes";
const ItemObj& item = lru_.get(key);
size_t item_size = item->Size();
lru_.erase(key);
usage_ -= item_size;
SERVER_LOG_DEBUG << header_ << " Erase " << key << " size: " << (item_size >> 20) << "MB from cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
template <typename ItemObj>
......@@ -121,15 +123,15 @@ Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << "Clear cache !";
SERVER_LOG_DEBUG << header_ << " Clear cache !";
}
/* free memory space when CACHE occupation exceed its capacity */
template <typename ItemObj>
void
Cache<ItemObj>::free_memory() {
if (usage_ <= capacity_)
return;
// if (usage_ <= capacity_)
// return;
int64_t threshhold = capacity_ * freemem_percent_;
int64_t delta_size = usage_ - threshhold;
......@@ -154,13 +156,11 @@ Cache<ItemObj>::free_memory() {
}
}
SERVER_LOG_DEBUG << "to be released memory size: " << released_size;
SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB";
for (auto& key : key_array) {
erase(key);
}
print();
}
template <typename ItemObj>
......@@ -177,7 +177,8 @@ Cache<ItemObj>::print() {
#endif
}
SERVER_LOG_DEBUG << "[Cache] [item count]: " << cache_count << " [capacity] " << capacity_ << "(bytes) [usage] " << usage_ << "(bytes)";
SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20)
<< "MB, [capacity] " << (capacity_ >> 20) << "MB";
}
} // namespace cache
......
......@@ -32,7 +32,7 @@ CpuCacheMgr::CpuCacheMgr() {
int64_t cpu_cache_cap;
config.GetCacheConfigCpuCacheCapacity(cpu_cache_cap);
int64_t cap = cpu_cache_cap * unit;
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32);
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32, "[CACHE CPU]");
float cpu_cache_threshold;
config.GetCacheConfigCpuCacheThreshold(cpu_cache_threshold);
......
......@@ -21,21 +21,22 @@ namespace milvus {
namespace cache {
#ifdef MILVUS_GPU_VERSION
std::mutex GpuCacheMgr::mutex_;
std::unordered_map<uint64_t, GpuCacheMgrPtr> GpuCacheMgr::instance_;
std::mutex GpuCacheMgr::global_mutex_;
std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> GpuCacheMgr::instance_;
namespace {
constexpr int64_t G_BYTE = 1024 * 1024 * 1024;
}
GpuCacheMgr::GpuCacheMgr() {
GpuCacheMgr::GpuCacheMgr(int64_t gpu_id) : gpu_id_(gpu_id) {
// All config values have been checked in Config::ValidateConfig()
server::Config& config = server::Config::GetInstance();
int64_t gpu_cache_cap;
config.GetGpuResourceConfigCacheCapacity(gpu_cache_cap);
int64_t cap = gpu_cache_cap * G_BYTE;
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32);
std::string header = "[CACHE GPU" + std::to_string(gpu_id) + "]";
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL << 32, header);
float gpu_mem_threshold;
config.GetGpuResourceConfigCacheThreshold(gpu_mem_threshold);
......@@ -51,20 +52,6 @@ GpuCacheMgr::~GpuCacheMgr() {
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_);
}
GpuCacheMgr*
GpuCacheMgr::GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_.insert(std::pair<uint64_t, GpuCacheMgrPtr>(gpu_id, std::make_shared<GpuCacheMgr>()));
}
return instance_[gpu_id].get();
} else {
std::lock_guard<std::mutex> lock(mutex_);
return instance_[gpu_id].get();
}
}
DataObjPtr
GpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
......@@ -78,10 +65,33 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr&
}
}
GpuCacheMgrPtr
GpuCacheMgr::GetInstance(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
}
}
return instance_[gpu_id].first;
}
MutexPtr
GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
}
}
return instance_[gpu_id].second;
}
void
GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) {
for (auto& iter : instance_) {
iter.second->SetCapacity(capacity * G_BYTE);
std::lock_guard<std::mutex> lock(*(iter.second.second));
iter.second.first->SetCapacity(capacity * G_BYTE);
}
}
......
......@@ -10,8 +10,10 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include "cache/CacheMgr.h"
#include "cache/DataObj.h"
......@@ -23,31 +25,36 @@ namespace cache {
#ifdef MILVUS_GPU_VERSION
class GpuCacheMgr;
using GpuCacheMgrPtr = std::shared_ptr<GpuCacheMgr>;
using MutexPtr = std::shared_ptr<std::mutex>;
class GpuCacheMgr : public CacheMgr<DataObjPtr>, public server::GpuResourceConfigHandler {
public:
GpuCacheMgr();
explicit GpuCacheMgr(int64_t gpu_id);
~GpuCacheMgr();
static GpuCacheMgr*
GetInstance(uint64_t gpu_id);
DataObjPtr
GetIndex(const std::string& key);
void
InsertItem(const std::string& key, const DataObjPtr& data);
static GpuCacheMgrPtr
GetInstance(int64_t gpu_id);
static MutexPtr
GetInstanceMutex(int64_t gpu_id);
protected:
void
OnGpuCacheCapacityChanged(int64_t capacity) override;
private:
bool gpu_enable_ = true;
int64_t gpu_id_;
static std::mutex global_mutex_;
static std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> instance_;
std::string identity_;
static std::mutex mutex_;
static std::unordered_map<uint64_t, GpuCacheMgrPtr> instance_;
};
#endif
......
......@@ -86,7 +86,7 @@ static const char* CONFIG_CACHE = "cache_config";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY = "cpu_cache_capacity";
static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT = "4";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_cache_threshold";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.7";
static const char* CONFIG_CACHE_INSERT_BUFFER_SIZE = "insert_buffer_size";
static const char* CONFIG_CACHE_INSERT_BUFFER_SIZE_DEFAULT = "1";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data";
......@@ -123,7 +123,7 @@ static const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT = "false";
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY = "cache_capacity";
static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY_DEFAULT = "1";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD = "cache_threshold";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.7";
static const char* CONFIG_GPU_RESOURCE_DELIMITER = ",";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES = "search_resources";
static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES_DEFAULT = "gpu0";
......
......@@ -544,9 +544,15 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
}
try {
/* Index data is copied to GPU first, then added into GPU cache.
* We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time,
* which will potentially cause GPU out of memory.
*/
std::lock_guard<std::mutex> lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id)));
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start";
index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config());
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
GpuCache(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished";
} catch (std::exception& e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
......
......@@ -148,7 +148,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
hybrid = true;
}
stat = index_engine_->CopyToGpu(device_id, hybrid);
type_str = "CPU2GPU:" + std::to_string(device_id);
type_str = "CPU2GPU" + std::to_string(device_id);
} else if (type == LoadType::GPU2CPU) {
stat = index_engine_->CopyToCpu();
type_str = "GPU2CPU";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册