提交 19ef5547 编写于 作者: J jinhai

Merge branch 'ms549' into 'branch-0.4.0'

MS-549 server crash when search and train on same gpu

See merge request megasearch/milvus!561

Former-commit-id: 15dca05839512d35a70daf31c55ea0178afd4b70
......@@ -23,6 +23,9 @@ using ResPtr = std::shared_ptr<Resource>;
using ResWPtr = std::weak_ptr<Resource>;
class FaissGpuResourceMgr {
public:
friend class ResScope;
public:
using ResBQ = zilliz::milvus::server::BlockingQueue<ResPtr>;
......@@ -71,24 +74,30 @@ class FaissGpuResourceMgr {
protected:
bool is_init = false;
std::map<int64_t ,std::unique_ptr<std::mutex>> mutex_cache_;
std::map<int64_t, DeviceParams> devices_params_;
std::map<int64_t, ResBQ> idle_map;
std::map<int64_t, ResBQ> idle_map_;
};
class ResScope {
public:
ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id), move(true) {
ResScope(ResPtr &res, const int64_t& device_id, const bool& isown)
: resource(res), device_id(device_id), move(true), own(isown) {
if (isown) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
ResScope(ResPtr &res) : resource(res), device_id(-1), move(false) {
// specif for search
// get the ownership of gpuresource and gpu
ResScope(ResPtr &res, const int64_t &device_id)
: resource(res), device_id(device_id), move(false), own(true) {
FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
~ResScope() {
if (move) {
FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
}
if (own) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->unlock();
if (move) FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
resource->mutex.unlock();
}
......@@ -96,6 +105,7 @@ class ResScope {
ResPtr resource;
int64_t device_id;
bool move = true;
bool own = false;
};
class GPUIndex {
......@@ -120,6 +130,7 @@ class GPUIVF : public IVF, public GPUIndex {
explicit GPUIVF(std::shared_ptr<faiss::Index> index, const int64_t &device_id, ResPtr &resource)
: IVF(std::move(index)), GPUIndex(device_id, resource) {};
IndexModelPtr Train(const DatasetPtr &dataset, const Config &config) override;
void Add(const DatasetPtr &dataset, const Config &config) override;
void set_index_model(IndexModelPtr model) override;
//DatasetPtr Search(const DatasetPtr &dataset, const Config &config) override;
VectorIndexPtr CopyGpuToCpu(const Config &config) override;
......
......@@ -34,7 +34,7 @@ IndexModelPtr GPUIVF::Train(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(gpu_id_, temp_resource);
ResScope rs(temp_resource, gpu_id_, true);
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
idx_config.device = gpu_id_;
faiss::gpu::GpuIndexIVFFlat device_index(temp_resource->faiss_res.get(), dim, nlist, metric_type, idx_config);
......@@ -54,7 +54,7 @@ void GPUIVF::set_index_model(IndexModelPtr model) {
auto host_index = std::static_pointer_cast<IVFIndexModel>(model);
if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(gpu_id_, gpures);
ResScope rs(gpures, gpu_id_, false);
res_ = gpures;
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, host_index->index_.get());
index_.reset(device_index);
......@@ -101,7 +101,7 @@ void GPUIVF::LoadImpl(const BinarySet &index_binary) {
faiss::Index *index = faiss::read_index(&reader);
if (auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(gpu_id_, temp_res);
ResScope rs(temp_res, gpu_id_, false);
res_ = temp_res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, index);
index_.reset(device_index);
......@@ -138,7 +138,7 @@ void GPUIVF::search_impl(int64_t n,
{
// TODO(linxj): allocate mem
ResScope rs(res_);
ResScope rs(res_, gpu_id_);
device_index->search(n, (float *) data, k, distances, labels);
}
}
......@@ -164,6 +164,15 @@ VectorIndexPtr GPUIVF::CopyGpuToGpu(const int64_t &device_id, const Config &conf
auto host_index = CopyGpuToCpu(config);
return std::static_pointer_cast<IVF>(host_index)->CopyCpuToGpu(device_id, config);
}
void GPUIVF::Add(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(temp_resource, gpu_id_, true);
IVF::Add(dataset, config);
} else {
KNOWHERE_THROW_MSG("Add IVF can't get gpu resource");
}
}
IndexModelPtr GPUIVFPQ::Train(const DatasetPtr &dataset, const Config &config) {
auto nlist = config["nlist"].as<size_t>();
......@@ -216,7 +225,7 @@ IndexModelPtr GPUIVFSQ::Train(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(gpu_id_, temp_resource );
ResScope rs(temp_resource, gpu_id_, true);
auto device_index = faiss::gpu::index_cpu_to_gpu(temp_resource->faiss_res.get(), gpu_id_, build_index);
device_index->train(rows, (float *) p_data);
......@@ -283,9 +292,12 @@ void FaissGpuResourceMgr::InitResource() {
//std::cout << "InitResource" << std::endl;
for(auto& device : devices_params_) {
auto& device_id = device.first;
mutex_cache_.emplace(device_id, std::make_unique<std::mutex>());
//std::cout << "Device Id: " << device_id << std::endl;
auto& device_param = device.second;
auto& bq = idle_map[device_id];
auto& bq = idle_map_[device_id];
for (int64_t i = 0; i < device_param.resource_num; ++i) {
//std::cout << "Resource Id: " << i << std::endl;
......@@ -305,8 +317,8 @@ ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
const int64_t &alloc_size) {
InitResource();
auto finder = idle_map.find(device_id);
if (finder != idle_map.end()) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
auto&& resource = bq.Take();
AllocateTempMem(resource, device_id, alloc_size);
......@@ -316,15 +328,15 @@ ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
auto finder = idle_map.find(device_id);
if (finder != idle_map.end()) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
bq.Put(res);
}
}
void FaissGpuResourceMgr::Free() {
for (auto &item : idle_map) {
for (auto &item : idle_map_) {
auto& bq = item.second;
while (!bq.Empty()) {
bq.Take();
......@@ -335,7 +347,7 @@ void FaissGpuResourceMgr::Free() {
void
FaissGpuResourceMgr::Dump() {
for (auto &item : idle_map) {
for (auto &item : idle_map_) {
auto& bq = item.second;
std::cout << "device_id: " << item.first
<< ", resource count:" << bq.Size();
......
......@@ -142,7 +142,7 @@ VectorIndexPtr IDMAP::Clone() {
VectorIndexPtr IDMAP::CopyCpuToGpu(const int64_t &device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
std::shared_ptr<faiss::Index> device_index;
......@@ -211,7 +211,7 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) {
faiss::Index *index = faiss::read_index(&reader);
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){
ResScope rs(gpu_id_, res);
ResScope rs(res, gpu_id_, false);
res_ = res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index);
index_.reset(device_index);
......@@ -242,7 +242,7 @@ void GPUIDMAP::search_impl(int64_t n,
float *distances,
int64_t *labels,
const Config &cfg) {
ResScope rs(res_);
ResScope rs(res_, gpu_id_);
index_->search(n, (float *) data, k, distances, labels);
}
......
......@@ -196,7 +196,7 @@ void IVF::search_impl(int64_t n,
VectorIndexPtr IVF::CopyCpuToGpu(const int64_t& device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
std::shared_ptr<faiss::Index> device_index;
......@@ -271,7 +271,7 @@ VectorIndexPtr IVFSQ::Clone_impl(const std::shared_ptr<faiss::Index> &index) {
VectorIndexPtr IVFSQ::CopyCpuToGpu(const int64_t &device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
faiss::gpu::GpuClonerOptions option;
option.allInGpu = true;
......
......@@ -77,21 +77,21 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config::object{{"nlist", 100}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("IVFPQ",
Config(),
Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("IVFPQ",
// Config(),
// Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("GPUIVF",
Config(),
Config::object{{"nlist", 100}, {"gpu_id", device_id}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("GPUIVFPQ",
Config(),
Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("GPUIVFPQ",
// Config(),
// Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("IVFSQ",
Config(),
Config::object{{"nlist", 100}, {"nbits", 8}, {"metric_type", "L2"}},
......@@ -562,6 +562,71 @@ TEST_F(GPURESTEST, copyandsearch) {
tc.RecordSection("Copy&search total");
}
TEST_F(GPURESTEST, TrainAndSearch) {
index_type = "GPUIVFSQ";
//index_type = "GPUIVF";
const int train_count = 1;
const int search_count = 5000;
index_ = IndexFactory(index_type);
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
index_->set_preprocessor(preprocessor);
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
auto model = index_->Train(base_dataset, train_cfg);
auto new_index = IndexFactory(index_type);
new_index->set_index_model(model);
new_index->Add(base_dataset, add_cfg);
auto cpu_idx = CopyGpuToCpu(new_index, Config());
cpu_idx->Seal();
auto search_idx = CopyCpuToGpu(cpu_idx, device_id, Config());
auto train_stage = [&] {
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
for (int i = 0; i < train_count; ++i) {
auto model = index_->Train(base_dataset, train_cfg);
auto test_idx = IndexFactory(index_type);
test_idx->set_index_model(model);
test_idx->Add(base_dataset, add_cfg);
}
};
auto search_stage = [&](VectorIndexPtr& search_idx) {
search_cfg = Config::object{{"k", k}};
for (int i = 0; i < search_count; ++i) {
auto result = search_idx->Search(query_dataset, search_cfg);
AssertAnns(result, nq, k);
}
};
//TimeRecorder tc("record");
//train_stage();
//tc.RecordSection("train cost");
//search_stage(search_idx);
//tc.RecordSection("search cost");
{
// search and build parallel
std::thread search_thread(search_stage, std::ref(search_idx));
std::thread train_thread(train_stage);
train_thread.join();
search_thread.join();
}
{
// build parallel
std::thread train_1(train_stage);
std::thread train_2(train_stage);
train_1.join();
train_2.join();
}
{
// search parallel
auto search_idx_2 = CopyCpuToGpu(cpu_idx, device_id, Config());
std::thread search_1(search_stage, std::ref(search_idx));
std::thread search_2(search_stage, std::ref(search_idx_2));
search_1.join();
search_2.join();
}
}
// TODO(linxj): Add exception test
......@@ -173,6 +173,7 @@ VecIndexPtr read_index(const std::string &location) {
auto binptr = std::make_shared<uint8_t>();
binptr.reset(bin);
load_data_list.Append(std::string(meta, meta_length), binptr, bin_length);
delete[] meta;
}
return LoadVecIndex(current_type, load_data_list);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册