diff --git a/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h b/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h index e1395d49f8be6253b7069df8ad9e029662d756d2..f789a06b1a466f770172bb26ba19b464aae33299 100644 --- a/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h +++ b/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h @@ -3,6 +3,7 @@ #include #include "ivf.h" +#include "src/utils/BlockingQueue.h" namespace zilliz { @@ -16,12 +17,15 @@ struct Resource { std::shared_ptr faiss_res; int64_t id; + std::mutex mutex; }; using ResPtr = std::shared_ptr; using ResWPtr = std::weak_ptr; class FaissGpuResourceMgr { public: + using ResBQ = zilliz::milvus::server::BlockingQueue; + struct DeviceParams { int64_t temp_mem_size = 0; int64_t pinned_mem_size = 0; @@ -55,11 +59,8 @@ class FaissGpuResourceMgr { // allocate gpu memory before search // this func will return True if the device is idle and exists an idle resource. - bool - GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0); - - void - MoveToInuse(const int64_t &device_id, const ResPtr& res); + //bool + //GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0); void MoveToIdle(const int64_t &device_id, const ResPtr& res); @@ -67,33 +68,34 @@ class FaissGpuResourceMgr { void Dump(); - protected: - void - RemoveResource(const int64_t& device_id, const ResPtr& res, std::map>& resource_pool); - protected: bool is_init = false; - std::mutex mutex_; std::map devices_params_; - std::map> in_use_; - std::map> idle_; + std::map idle_map; }; class ResScope { public: - ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id) { - FaissGpuResourceMgr::GetInstance().MoveToInuse(device_id, resource); + ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id), move(true) { + res->mutex.lock(); + } + + ResScope(ResPtr &res) : resource(res), device_id(-1), move(false) { + res->mutex.lock(); } ~ResScope() { - //resource->faiss_res->noTempMemory(); - FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource); + if (move) { + FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource); + } + resource->mutex.unlock(); } private: ResPtr resource; int64_t device_id; + bool move = true; }; class GPUIndex { diff --git a/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp b/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp index 6c54afb1b8377e20f9164dcbae63425fc602ac1b..bb89efbcd4845991f80ec5ba9375f8818f2328a0 100644 --- a/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp +++ b/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp @@ -130,19 +130,17 @@ void GPUIVF::search_impl(int64_t n, float *distances, int64_t *labels, const Config &cfg) { - // TODO(linxj): allocate mem - auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_); - if (temp_res) { - ResScope rs(gpu_id_, temp_res); - if (auto device_index = std::static_pointer_cast(index_)) { - auto nprobe = cfg.get_with_default("nprobe", size_t(1)); - - std::lock_guard lk(mutex_); - device_index->setNumProbes(nprobe); + std::lock_guard lk(mutex_); + + if (auto device_index = std::static_pointer_cast(index_)) { + auto nprobe = cfg.get_with_default("nprobe", size_t(1)); + device_index->setNumProbes(nprobe); + + { + // TODO(linxj): allocate mem + ResScope rs(res_); device_index->search(n, (float *) data, k, distances, labels); } - } else { - KNOWHERE_THROW_MSG("search can't get gpu resource"); } } @@ -283,119 +281,70 @@ void FaissGpuResourceMgr::InitResource() { is_init = true; for(auto& device : devices_params_) { - auto& resource_vec = idle_[device.first]; + auto& device_id = device.first; + auto& device_param = device.second; + auto& bq = idle_map[device_id]; - for (int64_t i = 0; i < device.second.resource_num; ++i) { - auto res = std::make_shared(); + for (int64_t i = 0; i < device_param.resource_num; ++i) { + auto raw_resource = std::make_shared(); // TODO(linxj): enable set pinned memory - //res->noTempMemory(); - auto res_wrapper = std::make_shared(res); - AllocateTempMem(res_wrapper, device.first, 0); + auto res_wrapper = std::make_shared(raw_resource); + AllocateTempMem(res_wrapper, device_id, 0); - resource_vec.emplace_back(res_wrapper); + bq.Put(res_wrapper); } } } ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id, const int64_t &alloc_size) { - std::lock_guard lk(mutex_); - - if (!is_init) { - InitResource(); - is_init = true; - } - - auto search = idle_.find(device_id); - if (search != idle_.end()) { - auto res = search->second.back(); - //AllocateTempMem(res, device_id, alloc_size); - - search->second.pop_back(); - return res; + InitResource(); + + auto finder = idle_map.find(device_id); + if (finder != idle_map.end()) { + auto& bq = finder->second; + auto&& resource = bq.Take(); + AllocateTempMem(resource, device_id, alloc_size); + return resource; } return nullptr; } -bool FaissGpuResourceMgr::GetRes(const int64_t &device_id, - ResPtr &res, - const int64_t &alloc_size) { - std::lock_guard lk(mutex_); - - if (!is_init) { - InitResource(); - is_init = true; - } - - auto search = idle_.find(device_id); - if (search != idle_.end()) { - auto &res_vec = search->second; - for (auto it = res_vec.cbegin(); it != res_vec.cend(); ++it) { - if ((*it)->id == res->id) { - //AllocateTempMem(res, device_id, alloc_size); - res_vec.erase(it); - return true; - } - } - } - // else - return false; -} - -void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const ResPtr &res) { - std::lock_guard lk(mutex_); - RemoveResource(device_id, res, idle_); - in_use_[device_id].push_back(res); -} +//bool FaissGpuResourceMgr::GetRes(const int64_t &device_id, +// ResPtr &res, +// const int64_t &alloc_size) { +// InitResource(); +// +// std::lock_guard lk(res->mutex); +// AllocateTempMem(res, device_id, alloc_size); +// return true; +//} void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) { - std::lock_guard lk(mutex_); - RemoveResource(device_id, res, in_use_); - auto it = idle_[device_id].begin(); - idle_[device_id].insert(it, res); -} - -void -FaissGpuResourceMgr::RemoveResource(const int64_t &device_id, - const ResPtr &res, - std::map> &resource_pool) { - if (resource_pool.find(device_id) != resource_pool.end()) { - std::vector &res_array = resource_pool[device_id]; - res_array.erase(std::remove_if(res_array.begin(), res_array.end(), - [&](ResPtr &ptr) { return ptr->id == res->id; }), - res_array.end()); + auto finder = idle_map.find(device_id); + if (finder != idle_map.end()) { + auto& bq = finder->second; + bq.Put(res); } } void FaissGpuResourceMgr::Free() { - for (auto &item : in_use_) { - auto& res_vec = item.second; - res_vec.clear(); - } - for (auto &item : idle_) { - auto& res_vec = item.second; - res_vec.clear(); + for (auto &item : idle_map) { + auto& bq = item.second; + while (!bq.Empty()) { + bq.Take(); + } } is_init = false; } void FaissGpuResourceMgr::Dump() { - std::cout << "In used resource" << std::endl; - for(auto& item: in_use_) { - std::cout << "device_id: " << item.first << std::endl; - for(auto& elem : item.second) { - std::cout << "resource_id: " << elem->id << std::endl; - } - } - - std::cout << "Idle resource" << std::endl; - for(auto& item: idle_) { - std::cout << "device_id: " << item.first << std::endl; - for(auto& elem : item.second) { - std::cout << "resource_id: " << elem->id << std::endl; - } + for (auto &item : idle_map) { + auto& bq = item.second; + std::cout << "device_id: " << item.first + << ", resource count:" << bq.Size(); } } diff --git a/cpp/src/core/test/test_ivf.cpp b/cpp/src/core/test/test_ivf.cpp index 1e91618e83624be580c4708d23ebb8b17d2ebf48..625a9ca0fd1f70b9386b752703d23c82490de394 100644 --- a/cpp/src/core/test/test_ivf.cpp +++ b/cpp/src/core/test/test_ivf.cpp @@ -386,7 +386,7 @@ class GPURESTEST int64_t elems = 0; }; -const int search_count = 10; +const int search_count = 18; const int load_count = 3; TEST_F(GPURESTEST, gpu_ivf_resource_test) { diff --git a/cpp/src/utils/BlockingQueue.inl b/cpp/src/utils/BlockingQueue.inl index e703f01576d5e98d27ad9d683c08b9a394137174..21146cdfbbed143f3e7936fa0ca464acf79cccf8 100644 --- a/cpp/src/utils/BlockingQueue.inl +++ b/cpp/src/utils/BlockingQueue.inl @@ -1,6 +1,6 @@ #pragma once -#include "Log.h" +//#include "Log.h" #include "Error.h" namespace zilliz { @@ -17,7 +17,7 @@ BlockingQueue::Put(const T &task) { std::string error_msg = "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + std::to_string(queue_.size()); - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } @@ -33,7 +33,7 @@ BlockingQueue::Take() { if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } @@ -57,7 +57,7 @@ BlockingQueue::Front() { empty_.wait(lock, [this] { return !queue_.empty(); }); if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } T front(queue_.front()); @@ -72,7 +72,7 @@ BlockingQueue::Back() { if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); }