提交 b87ddb9f 编写于 作者: Y Yu Kun

buildindex to scheduler run ok


Former-commit-id: 9c5798e8921b2eb930d2b71df593a17561ba3bc7
上级 2c039f4f
......@@ -242,11 +242,7 @@ DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
}
Status
<<<<<<< HEAD
DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
=======
DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vectors, IDNumbers& vector_ids) {
>>>>>>> upstream/branch-0.5.0
// ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
......@@ -299,17 +295,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
// for IDMAP type, only wait all NEW file converted to RAW file
// for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
std::vector<int> file_types;
<<<<<<< HEAD
if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) {
file_types = {
(int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE,
};
} else {
file_types = {
(int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW,
(int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX,
(int) meta::TableFileSchema::TO_INDEX,
=======
if (index.engine_type_ == static_cast<int32_t>(EngineType::FAISS_IDMAP)) {
file_types = {
static_cast<int32_t>(meta::TableFileSchema::NEW), static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
......@@ -321,7 +306,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
static_cast<int32_t>(meta::TableFileSchema::NEW_MERGE),
static_cast<int32_t>(meta::TableFileSchema::NEW_INDEX),
static_cast<int32_t>(meta::TableFileSchema::TO_INDEX),
>>>>>>> upstream/branch-0.5.0
};
}
......@@ -915,38 +899,36 @@ DBImpl::BackgroundBuildIndex() {
Status status;
scheduler::BuildIndexJobPtr
job = std::make_shared<scheduler::BuildIndexJob>(0);
job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_);
// step 2: put build index task to scheduler
scheduler::JobMgrInst::GetInstance()->Put(job);
// for (auto &file : to_index_files) {
// std::cout << "get to index file" << std::endl;
//
// scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
// job->AddToIndexFiles(file_ptr);
//
// if (!job->GetStatus().ok()) {
// Status status = job->GetStatus();
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// }
// scheduler::JobMgrInst::GetInstance()->Put(job);
// job->WaitBuildIndexFinish();
for (auto &file : to_index_files) {
std::cout << "get to index file" << std::endl;
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
table_file.file_type_ =
meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
status = meta_ptr_->CreateTableFile(table_file);
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
}
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr, table_file);
if (shutting_down_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
break;
}
}
job->WaitBuildIndexFinish();
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE << "Background build index thread exit";
}
......
......@@ -133,6 +133,7 @@ ExecutionEngineImpl::Serialize() {
Status
ExecutionEngineImpl::Load(bool to_cache) {
std::cout << "load" << std::endl;
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
......@@ -161,6 +162,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
std::cout << "copy2gpu" << std::endl;
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
......
......@@ -69,7 +69,7 @@ TaskCreator::Create(const DeleteJobPtr &job) {
}
std::vector<TaskPtr>
TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) {
TaskCreator::Create(const BuildIndexJobPtr &job) {
std::vector<TaskPtr> tasks;
//TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
......
......@@ -172,7 +172,8 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->device_id() == build_index_gpu) {
if (compute_resources[i]->name()
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
......
......@@ -19,27 +19,24 @@
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id)
: Job(id, JobType::BUILD){
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) {
}
bool
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file,
const TableFileSchema table_file) {
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) {
std::unique_lock<std::mutex> lock(mutex_);
if (to_index_file == nullptr) {
if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) {
return false;
}
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;
to_index_files_[to_index_file->id_] = to_index_file;
table_files_[table_file.id_] = table_file;
}
Status&
......@@ -58,6 +55,5 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) {
}
}
}
}
\ No newline at end of file
......@@ -32,7 +32,6 @@
#include "scheduler/Definition.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -43,11 +42,11 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id);
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr);
public:
bool
AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file);
AddToIndexFiles(const TableFileSchemaPtr &to_index_file);
Status &
WaitBuildIndexFinish();
......@@ -66,15 +65,26 @@ class BuildIndexJob : public Job {
// return engine_type_;
// }
Status &
GetStatus() {
return status_;
}
Id2ToIndexMap &
to_index_files() {
return to_index_files_;
}
engine::meta::MetaPtr
meta() const {
return meta_ptr_;
}
private:
Id2ToIndexMap to_index_files_;
Id2ToTableFileMap table_files_;
engine::meta::MetaPtr meta_ptr_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
};
......@@ -83,4 +93,3 @@ using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
}
}
}
\ No newline at end of file
......@@ -26,7 +26,6 @@
#include <thread>
#include <utility>
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -39,7 +38,7 @@ XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file)
}
void
XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) {
XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
......@@ -50,7 +49,7 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i
stat = to_index_engine_->Load();
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToGpu(device_id);
// stat = to_index_engine_->CopyToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = to_index_engine_->CopyToCpu();
......@@ -90,8 +89,8 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
// to_index_id_ = file_->id_;
// to_index_type_ = file_->file_type_;
to_index_id_ = file_->id_;
to_index_type_ = file_->file_type_;
}
void
......@@ -103,22 +102,106 @@ XBuildIndexTask::Execute() {
TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));
if (auto job = job_.lock()) {
auto build_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
std::string location = file_->location_;
EngineType engine_type = (EngineType)file_->engine_type_;
std::shared_ptr<engine::ExecutionEngine> index;
// step 2: create table file
engine::meta::TableFileSchema table_file;
table_file.table_id_ = file_->table_id_;
table_file.date_ = file_->date_;
table_file.file_type_ =
engine::meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
engine::meta::MetaPtr meta_ptr = build_index_job->meta();
Status status = build_index_job->meta()->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
build_index_job->BuildIndexDone(to_index_id_);
//TODO: return status
}
// step 3: build index
try {
index = to_index_engine_->BuildIndex(location, engine_type);
if (index == nullptr) {
table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
//TODO: updatetablefile
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
return;
}
} catch (std::exception &ex) {
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 4: if table has been deleted, dont save index file
bool has_table = false;
meta_ptr->HasTable(file_->table_id_, has_table);
if (!has_table) {
meta_ptr->DeleteTableFiles(file_->table_id_);
// return Status::OK();
}
// step 5: save index file
try {
index->Serialize();
} catch (std::exception &ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
// return Status(DB_ERROR, msg);
}
// step 6: update meta
table_file.file_type_ = engine::meta::TableFileSchema::INDEX;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
auto origin_file = *file_;
origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP;
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr->UpdateTableFiles(update_files);
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
<< " bytes"
<< " from file " << origin_file.file_id_;
// index->Cache();
} else {
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX;
status = meta_ptr->UpdateTableFile(origin_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ << " to to_delete";
}
build_job->BuildIndexDone(to_index_id_);
build_index_job->BuildIndexDone(to_index_id_);
}
rc.ElapseFromBegin("totally cost");
......@@ -128,4 +211,3 @@ XBuildIndexTask::Execute() {
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
......@@ -22,7 +22,6 @@
#include "scheduler/job/BuildIndexJob.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
......@@ -42,8 +41,8 @@ class XBuildIndexTask : public Task {
size_t to_index_id_ = 0;
int to_index_type_ = 0;
ExecutionEnginePtr to_index_engine_ = nullptr;
};
} // namespace scheduler
} // namespace milvus
} // namespace zilliz
} // namespace milvus
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册