提交 af541659 编写于 作者: G groot

Merge remote-tracking branch 'source/branch-0.4.0' into branch-0.4.0


Former-commit-id: 057de5247ec6b5a9b99b7c3753aa415b02273eb6
......@@ -34,6 +34,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-474 - Create index hang if use branch-0.3.1 server config
- MS-510 - unittest out of memory and crashed
- MS-507 - Dataset 10m-512, index type sq8,performance in-normal when set CPU_CACHE to 16 or 64
- MS-543 - SearchTask fail without exception
## Improvement
- MS-327 - Clean code for milvus
......
......@@ -23,6 +23,9 @@ using ResPtr = std::shared_ptr<Resource>;
using ResWPtr = std::weak_ptr<Resource>;
class FaissGpuResourceMgr {
public:
friend class ResScope;
public:
using ResBQ = zilliz::milvus::server::BlockingQueue<ResPtr>;
......@@ -71,24 +74,30 @@ class FaissGpuResourceMgr {
protected:
bool is_init = false;
std::map<int64_t ,std::unique_ptr<std::mutex>> mutex_cache_;
std::map<int64_t, DeviceParams> devices_params_;
std::map<int64_t, ResBQ> idle_map;
std::map<int64_t, ResBQ> idle_map_;
};
class ResScope {
public:
ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id), move(true) {
ResScope(ResPtr &res, const int64_t& device_id, const bool& isown)
: resource(res), device_id(device_id), move(true), own(isown) {
if (isown) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
ResScope(ResPtr &res) : resource(res), device_id(-1), move(false) {
// specif for search
// get the ownership of gpuresource and gpu
ResScope(ResPtr &res, const int64_t &device_id)
: resource(res), device_id(device_id), move(false), own(true) {
FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->lock();
res->mutex.lock();
}
~ResScope() {
if (move) {
FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
}
if (own) FaissGpuResourceMgr::GetInstance().mutex_cache_[device_id]->unlock();
if (move) FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
resource->mutex.unlock();
}
......@@ -96,6 +105,7 @@ class ResScope {
ResPtr resource;
int64_t device_id;
bool move = true;
bool own = false;
};
class GPUIndex {
......@@ -120,6 +130,7 @@ class GPUIVF : public IVF, public GPUIndex {
explicit GPUIVF(std::shared_ptr<faiss::Index> index, const int64_t &device_id, ResPtr &resource)
: IVF(std::move(index)), GPUIndex(device_id, resource) {};
IndexModelPtr Train(const DatasetPtr &dataset, const Config &config) override;
void Add(const DatasetPtr &dataset, const Config &config) override;
void set_index_model(IndexModelPtr model) override;
//DatasetPtr Search(const DatasetPtr &dataset, const Config &config) override;
VectorIndexPtr CopyGpuToCpu(const Config &config) override;
......
......@@ -34,7 +34,7 @@ IndexModelPtr GPUIVF::Train(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(gpu_id_, temp_resource);
ResScope rs(temp_resource, gpu_id_, true);
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
idx_config.device = gpu_id_;
faiss::gpu::GpuIndexIVFFlat device_index(temp_resource->faiss_res.get(), dim, nlist, metric_type, idx_config);
......@@ -54,7 +54,7 @@ void GPUIVF::set_index_model(IndexModelPtr model) {
auto host_index = std::static_pointer_cast<IVFIndexModel>(model);
if (auto gpures = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(gpu_id_, gpures);
ResScope rs(gpures, gpu_id_, false);
res_ = gpures;
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, host_index->index_.get());
index_.reset(device_index);
......@@ -101,7 +101,7 @@ void GPUIVF::LoadImpl(const BinarySet &index_binary) {
faiss::Index *index = faiss::read_index(&reader);
if (auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) {
ResScope rs(gpu_id_, temp_res);
ResScope rs(temp_res, gpu_id_, false);
res_ = temp_res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res_->faiss_res.get(), gpu_id_, index);
index_.reset(device_index);
......@@ -138,7 +138,7 @@ void GPUIVF::search_impl(int64_t n,
{
// TODO(linxj): allocate mem
ResScope rs(res_);
ResScope rs(res_, gpu_id_);
device_index->search(n, (float *) data, k, distances, labels);
}
}
......@@ -164,6 +164,15 @@ VectorIndexPtr GPUIVF::CopyGpuToGpu(const int64_t &device_id, const Config &conf
auto host_index = CopyGpuToCpu(config);
return std::static_pointer_cast<IVF>(host_index)->CopyCpuToGpu(device_id, config);
}
void GPUIVF::Add(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(temp_resource, gpu_id_, true);
IVF::Add(dataset, config);
} else {
KNOWHERE_THROW_MSG("Add IVF can't get gpu resource");
}
}
IndexModelPtr GPUIVFPQ::Train(const DatasetPtr &dataset, const Config &config) {
auto nlist = config["nlist"].as<size_t>();
......@@ -216,7 +225,7 @@ IndexModelPtr GPUIVFSQ::Train(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_resource != nullptr) {
ResScope rs(gpu_id_, temp_resource );
ResScope rs(temp_resource, gpu_id_, true);
auto device_index = faiss::gpu::index_cpu_to_gpu(temp_resource->faiss_res.get(), gpu_id_, build_index);
device_index->train(rows, (float *) p_data);
......@@ -283,9 +292,12 @@ void FaissGpuResourceMgr::InitResource() {
//std::cout << "InitResource" << std::endl;
for(auto& device : devices_params_) {
auto& device_id = device.first;
mutex_cache_.emplace(device_id, std::make_unique<std::mutex>());
//std::cout << "Device Id: " << device_id << std::endl;
auto& device_param = device.second;
auto& bq = idle_map[device_id];
auto& bq = idle_map_[device_id];
for (int64_t i = 0; i < device_param.resource_num; ++i) {
//std::cout << "Resource Id: " << i << std::endl;
......@@ -305,8 +317,8 @@ ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
const int64_t &alloc_size) {
InitResource();
auto finder = idle_map.find(device_id);
if (finder != idle_map.end()) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
auto&& resource = bq.Take();
AllocateTempMem(resource, device_id, alloc_size);
......@@ -316,15 +328,15 @@ ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
auto finder = idle_map.find(device_id);
if (finder != idle_map.end()) {
auto finder = idle_map_.find(device_id);
if (finder != idle_map_.end()) {
auto& bq = finder->second;
bq.Put(res);
}
}
void FaissGpuResourceMgr::Free() {
for (auto &item : idle_map) {
for (auto &item : idle_map_) {
auto& bq = item.second;
while (!bq.Empty()) {
bq.Take();
......@@ -335,7 +347,7 @@ void FaissGpuResourceMgr::Free() {
void
FaissGpuResourceMgr::Dump() {
for (auto &item : idle_map) {
for (auto &item : idle_map_) {
auto& bq = item.second;
std::cout << "device_id: " << item.first
<< ", resource count:" << bq.Size();
......
......@@ -142,7 +142,7 @@ VectorIndexPtr IDMAP::Clone() {
VectorIndexPtr IDMAP::CopyCpuToGpu(const int64_t &device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
std::shared_ptr<faiss::Index> device_index;
......@@ -211,7 +211,7 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) {
faiss::Index *index = faiss::read_index(&reader);
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){
ResScope rs(gpu_id_, res);
ResScope rs(res, gpu_id_, false);
res_ = res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index);
index_.reset(device_index);
......@@ -242,7 +242,7 @@ void GPUIDMAP::search_impl(int64_t n,
float *distances,
int64_t *labels,
const Config &cfg) {
ResScope rs(res_);
ResScope rs(res_, gpu_id_);
index_->search(n, (float *) data, k, distances, labels);
}
......
......@@ -196,7 +196,7 @@ void IVF::search_impl(int64_t n,
VectorIndexPtr IVF::CopyCpuToGpu(const int64_t& device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, index_.get());
std::shared_ptr<faiss::Index> device_index;
......@@ -271,7 +271,7 @@ VectorIndexPtr IVFSQ::Clone_impl(const std::shared_ptr<faiss::Index> &index) {
VectorIndexPtr IVFSQ::CopyCpuToGpu(const int64_t &device_id, const Config &config) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)){
ResScope rs(device_id, res);
ResScope rs(res, device_id, false);
faiss::gpu::GpuClonerOptions option;
option.allInGpu = true;
......
......@@ -77,21 +77,21 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config::object{{"nlist", 100}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("IVFPQ",
Config(),
Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("IVFPQ",
// Config(),
// Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("GPUIVF",
Config(),
Config::object{{"nlist", 100}, {"gpu_id", device_id}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("GPUIVFPQ",
Config(),
Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("GPUIVFPQ",
// Config(),
// Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("IVFSQ",
Config(),
Config::object{{"nlist", 100}, {"nbits", 8}, {"metric_type", "L2"}},
......@@ -562,6 +562,71 @@ TEST_F(GPURESTEST, copyandsearch) {
tc.RecordSection("Copy&search total");
}
TEST_F(GPURESTEST, TrainAndSearch) {
index_type = "GPUIVFSQ";
//index_type = "GPUIVF";
const int train_count = 1;
const int search_count = 5000;
index_ = IndexFactory(index_type);
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
index_->set_preprocessor(preprocessor);
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
auto model = index_->Train(base_dataset, train_cfg);
auto new_index = IndexFactory(index_type);
new_index->set_index_model(model);
new_index->Add(base_dataset, add_cfg);
auto cpu_idx = CopyGpuToCpu(new_index, Config());
cpu_idx->Seal();
auto search_idx = CopyCpuToGpu(cpu_idx, device_id, Config());
auto train_stage = [&] {
train_cfg = Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}};
for (int i = 0; i < train_count; ++i) {
auto model = index_->Train(base_dataset, train_cfg);
auto test_idx = IndexFactory(index_type);
test_idx->set_index_model(model);
test_idx->Add(base_dataset, add_cfg);
}
};
auto search_stage = [&](VectorIndexPtr& search_idx) {
search_cfg = Config::object{{"k", k}};
for (int i = 0; i < search_count; ++i) {
auto result = search_idx->Search(query_dataset, search_cfg);
AssertAnns(result, nq, k);
}
};
//TimeRecorder tc("record");
//train_stage();
//tc.RecordSection("train cost");
//search_stage(search_idx);
//tc.RecordSection("search cost");
{
// search and build parallel
std::thread search_thread(search_stage, std::ref(search_idx));
std::thread train_thread(train_stage);
train_thread.join();
search_thread.join();
}
{
// build parallel
std::thread train_1(train_stage);
std::thread train_2(train_stage);
train_1.join();
train_2.join();
}
{
// search parallel
auto search_idx_2 = CopyCpuToGpu(cpu_idx, device_id, Config());
std::thread search_1(search_stage, std::ref(search_idx));
std::thread search_2(search_stage, std::ref(search_idx_2));
search_1.join();
search_2.join();
}
}
// TODO(linxj): Add exception test
......@@ -424,6 +424,9 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
scheduler.Schedule(context);
context->WaitResult();
if (!context->GetStatus().ok()) {
return context->GetStatus();
}
//step 3: print time cost information
double load_cost = context->LoadCost();
......
......@@ -69,7 +69,7 @@ std::string Status::ToString() const {
type = "InvalidPath: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unkown code(%d): ",
snprintf(tmp, sizeof(tmp), "Error code(0x%x): ",
static_cast<int>(code()));
type = tmp;
break;
......
......@@ -38,7 +38,9 @@ public:
const ResultSet& GetResult() const { return result_; }
ResultSet& GetResult() { return result_; }
std::string Identity() const { return identity_; }
const std::string& Identity() const { return identity_; }
const Status& GetStatus() const { return status_; }
Status& GetStatus() { return status_; }
void IndexSearchDone(size_t index_id);
void WaitResult();
......@@ -64,6 +66,7 @@ private:
std::condition_variable done_cond_;
std::string identity_; //for debug
Status status_;
double time_cost_load_ = 0.0; //time cost for load all index files, unit: us
double time_cost_search_ = 0.0; //time cost for entire search, unit: us
......
......@@ -61,7 +61,7 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
"\n\014status.proto\022\013milvus.grpc\"D\n\006Status\022*\n"
"\nerror_code\030\001 \001(\0162\026.milvus.grpc.ErrorCod"
"e\022\016\n\006reason\030\002 \001(\t*\230\004\n\tErrorCode\022\013\n\007SUCCE"
"e\022\016\n\006reason\030\002 \001(\t*\253\004\n\tErrorCode\022\013\n\007SUCCE"
"SS\020\000\022\024\n\020UNEXPECTED_ERROR\020\001\022\022\n\016CONNECT_FA"
"ILED\020\002\022\025\n\021PERMISSION_DENIED\020\003\022\024\n\020TABLE_N"
"OT_EXISTS\020\004\022\024\n\020ILLEGAL_ARGUMENT\020\005\022\021\n\rILL"
......@@ -74,8 +74,8 @@ const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(
"CREATE_FOLDER\020\021\022\026\n\022CANNOT_CREATE_FILE\020\022\022"
"\030\n\024CANNOT_DELETE_FOLDER\020\023\022\026\n\022CANNOT_DELE"
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025\022\021\n\rILLE"
"GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027b\006pr"
"oto3"
"GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027\022\021\n\r"
"OUT_OF_MEMORY\020\030b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_status_2eproto_deps[1] = {
};
......@@ -85,7 +85,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sta
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_status_2eproto_once;
static bool descriptor_table_status_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_status_2eproto = {
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 644,
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 663,
&descriptor_table_status_2eproto_once, descriptor_table_status_2eproto_sccs, descriptor_table_status_2eproto_deps, 1, 0,
schemas, file_default_instances, TableStruct_status_2eproto::offsets,
file_level_metadata_status_2eproto, 1, file_level_enum_descriptors_status_2eproto, file_level_service_descriptors_status_2eproto,
......@@ -125,6 +125,7 @@ bool ErrorCode_IsValid(int value) {
case 21:
case 22:
case 23:
case 24:
return true;
default:
return false;
......
......@@ -93,12 +93,13 @@ enum ErrorCode : int {
BUILD_INDEX_ERROR = 21,
ILLEGAL_NLIST = 22,
ILLEGAL_METRIC_TYPE = 23,
OUT_OF_MEMORY = 24,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ErrorCode_IsValid(int value);
constexpr ErrorCode ErrorCode_MIN = SUCCESS;
constexpr ErrorCode ErrorCode_MAX = ILLEGAL_METRIC_TYPE;
constexpr ErrorCode ErrorCode_MAX = OUT_OF_MEMORY;
constexpr int ErrorCode_ARRAYSIZE = ErrorCode_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ErrorCode_descriptor();
......
......@@ -27,6 +27,7 @@ enum ErrorCode {
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
......
......@@ -98,14 +98,18 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
server::TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
try {
if (type == LoadType::DISK2CPU) {
stat = index_engine_->Load();
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = index_engine_->CopyToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
......@@ -117,13 +121,18 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
}
if (!stat.ok()) {
if (error_msg.empty())
error_msg = std::string("Failed to load index file: file not available");
//typical error: file not available
ENGINE_LOG_ERROR << error_msg;
Status s;
if (stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_OUT_OF_MEMORY, error_msg);
} else {
error_msg = "Failed to load index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
for (auto &context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
context->GetStatus() = s;
}
return;
......
......@@ -48,6 +48,7 @@ namespace {
{SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED},
{DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED},
{SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
{SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY},
};
if(code_map.find(code) != code_map.end()) {
......
......@@ -672,7 +672,7 @@ SearchTask::OnExecute() {
rc.RecordSection("search vectors from engine");
if (!stat.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, stat.ToString());
return SetError(stat.code(), stat.ToString());
}
if (results.empty()) {
......
......@@ -66,6 +66,7 @@ constexpr ErrorCode SERVER_INVALID_NPROBE = ToServerErrorCode(113);
constexpr ErrorCode SERVER_INVALID_INDEX_NLIST = ToServerErrorCode(114);
constexpr ErrorCode SERVER_INVALID_INDEX_METRIC_TYPE = ToServerErrorCode(115);
constexpr ErrorCode SERVER_INVALID_INDEX_FILE_SIZE = ToServerErrorCode(116);
constexpr ErrorCode SERVER_OUT_OF_MEMORY = ToServerErrorCode(117);
//db error code
constexpr ErrorCode DB_META_TRANSACTION_FAILED = ToDbErrorCode(1);
......
......@@ -173,6 +173,7 @@ VecIndexPtr read_index(const std::string &location) {
auto binptr = std::make_shared<uint8_t>();
binptr.reset(bin);
load_data_list.Append(std::string(meta, meta_length), binptr, bin_length);
delete[] meta;
}
return LoadVecIndex(current_type, load_data_list);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册