提交 882c7bdf 编写于 作者: J jinhai

Merge branch 'branch-0.5.0-yk' into 'branch-0.5.0'

MS-603 Add BuildIndex to scheduler

See merge request megasearch/milvus!676

Former-commit-id: 5ce5e03956aa7279530e112f0595de4653a047a1
......@@ -28,6 +28,7 @@
#include "scheduler/SchedInst.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -50,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
} // namespace
DBImpl::DBImpl(const DBOptions& options)
DBImpl::DBImpl(const DBOptions &options)
: options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
......@@ -110,7 +111,7 @@ DBImpl::DropAll() {
}
Status
DBImpl::CreateTable(meta::TableSchema& table_schema) {
DBImpl::CreateTable(meta::TableSchema &table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -121,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) {
}
Status
DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -146,7 +147,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
}
Status
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
DBImpl::DescribeTable(meta::TableSchema &table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -157,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) {
}
Status
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -166,7 +167,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
}
Status
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -175,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
}
Status
DBImpl::PreloadTable(const std::string& table_id) {
DBImpl::PreloadTable(const std::string &table_id) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -194,11 +195,11 @@ DBImpl::PreloadTable(const std::string& table_id) {
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t available_size = cache_total - cache_usage;
for (auto& day_files : files) {
for (auto& file : day_files.second) {
for (auto &day_files : files) {
for (auto &file : day_files.second) {
ExecutionEnginePtr engine =
EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType)file.metric_type_, file.nlist_);
EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
(MetricType) file.metric_type_, file.nlist_);
if (engine == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type");
......@@ -211,7 +212,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
try {
// step 1: load index
engine->Load(true);
} catch (std::exception& ex) {
} catch (std::exception &ex) {
std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
......@@ -223,7 +224,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
}
Status
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -232,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
}
Status
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -260,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
}
Status
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
......@@ -315,7 +316,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
while (!file_ids.empty()) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) {
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
}
......@@ -328,19 +329,19 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
}
Status
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) {
return meta_ptr_->DescribeTableIndex(table_id, index);
}
Status
DBImpl::DropIndex(const std::string& table_id) {
DBImpl::DropIndex(const std::string &table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
return meta_ptr_->DropTableIndex(table_id);
}
Status
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
QueryResults& results) {
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -352,8 +353,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
Status
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
const meta::DatesT &dates, QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -369,8 +370,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
meta::TableFilesSchema file_id_array;
for (auto& day_files : files) {
for (auto& file : day_files.second) {
for (auto &day_files : files) {
for (auto &file : day_files.second) {
file_id_array.push_back(file);
}
}
......@@ -382,8 +383,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
Status
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -392,7 +393,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
// get specified files
std::vector<size_t> ids;
for (auto& id : file_ids) {
for (auto &id : file_ids) {
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
std::string::size_type sz;
......@@ -406,8 +407,8 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
}
meta::TableFilesSchema file_id_array;
for (auto& day_files : files_array) {
for (auto& file : day_files.second) {
for (auto &day_files : files_array) {
for (auto &file : day_files.second) {
file_id_array.push_back(file);
}
}
......@@ -423,7 +424,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
}
Status
DBImpl::Size(uint64_t& result) {
DBImpl::Size(uint64_t &result) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
......@@ -435,8 +436,8 @@ DBImpl::Size(uint64_t& result) {
// internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) {
server::CollectQueryMetrics metrics(nq);
TimeRecorder rc("");
......@@ -445,7 +446,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
<< " date range count: " << dates.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
for (auto& file : files) {
for (auto &file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr);
}
......@@ -513,7 +514,7 @@ DBImpl::BackgroundTimerTask() {
void
DBImpl::WaitMergeFileFinish() {
std::lock_guard<std::mutex> lck(compact_result_mutex_);
for (auto& iter : compact_thread_results_) {
for (auto &iter : compact_thread_results_) {
iter.wait();
}
}
......@@ -521,7 +522,7 @@ DBImpl::WaitMergeFileFinish() {
void
DBImpl::WaitBuildIndexFinish() {
std::lock_guard<std::mutex> lck(index_result_mutex_);
for (auto& iter : index_thread_results_) {
for (auto &iter : index_thread_results_) {
iter.wait();
}
}
......@@ -562,7 +563,7 @@ DBImpl::MemSerialize() {
std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids);
for (auto& id : temp_table_ids) {
for (auto &id : temp_table_ids) {
compact_table_ids_.insert(id);
}
......@@ -607,7 +608,7 @@ DBImpl::StartCompactionTask() {
}
Status
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const meta::TableFilesSchema &files) {
ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
// step 1: create table file
......@@ -624,13 +625,13 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 2: merge files
ExecutionEnginePtr index =
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
(MetricType)table_file.metric_type_, table_file.nlist_);
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_,
(MetricType) table_file.metric_type_, table_file.nlist_);
meta::TableFilesSchema updated;
int64_t index_size = 0;
for (auto& file : files) {
for (auto &file : files) {
server::CollectMergeFilesMetrics metrics;
index->Merge(file.location_);
......@@ -648,7 +649,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 3: serialize to disk
try {
index->Serialize();
} catch (std::exception& ex) {
} catch (std::exception &ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
......@@ -666,7 +667,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 4: update table files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
// else set file type to RAW, no need to build index
if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) {
table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
: meta::TableFileSchema::RAW;
} else {
......@@ -686,7 +687,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
}
Status
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
DBImpl::BackgroundMergeFiles(const std::string &table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
......@@ -695,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
bool has_merge = false;
for (auto& kv : raw_files) {
for (auto &kv : raw_files) {
auto files = kv.second;
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
......@@ -718,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << " Background compaction thread start";
Status status;
for (auto& table_id : table_ids) {
for (auto &table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
......@@ -770,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) {
}
Status
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType)file.metric_type_, file.nlist_);
DBImpl::BuildIndex(const meta::TableFileSchema &file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
(MetricType) file.metric_type_, file.nlist_);
if (to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type");
......@@ -803,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
try {
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_);
if (index == nullptr) {
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
......@@ -812,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
return status;
}
} catch (std::exception& ex) {
} catch (std::exception &ex) {
// typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
......@@ -838,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
// step 5: save index file
try {
index->Serialize();
} catch (std::exception& ex) {
} catch (std::exception &ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
......@@ -881,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
}
} catch (std::exception& ex) {
} catch (std::exception &ex) {
std::string msg = "Build index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
......@@ -898,18 +899,34 @@ DBImpl::BackgroundBuildIndex() {
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
}
if (shutting_down_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
break;
}
scheduler::BuildIndexJobPtr
job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
// step 2: put build index task to scheduler
for (auto &file : to_index_files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr);
}
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE << "Background build index thread exit";
}
......
......@@ -66,6 +66,9 @@ class ExecutionEngine {
virtual Status
CopyToGpu(uint64_t device_id) = 0;
virtual Status
CopyToIndexFileToGpu(uint64_t device_id) = 0;
virtual Status
CopyToCpu() = 0;
......
......@@ -187,6 +187,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (!already_in_cache) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(nullptr, PhysicalSize());
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
}
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToCpu() {
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
......
......@@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
CopyToGpu(uint64_t device_id) override;
Status
CopyToIndexFileToGpu(uint64_t device_id) override;
Status
CopyToCpu() override;
......
......@@ -15,15 +15,18 @@
// specific language governing permissions and limitations
// under the License.
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "scheduler/TaskCreator.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "SchedInst.h"
namespace milvus {
namespace scheduler {
std::vector<TaskPtr>
TaskCreator::Create(const JobPtr& job) {
TaskCreator::Create(const JobPtr &job) {
switch (job->type()) {
case JobType::SEARCH: {
return Create(std::static_pointer_cast<SearchJob>(job));
......@@ -31,6 +34,9 @@ TaskCreator::Create(const JobPtr& job) {
case JobType::DELETE: {
return Create(std::static_pointer_cast<DeleteJob>(job));
}
case JobType::BUILD: {
return Create(std::static_pointer_cast<BuildIndexJob>(job));
}
default: {
// TODO(wxyu): error
return std::vector<TaskPtr>();
......@@ -39,7 +45,7 @@ TaskCreator::Create(const JobPtr& job) {
}
std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
TaskCreator::Create(const SearchJobPtr &job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto label = std::make_shared<DefaultLabel>();
......@@ -52,7 +58,7 @@ TaskCreator::Create(const SearchJobPtr& job) {
}
std::vector<TaskPtr>
TaskCreator::Create(const DeleteJobPtr& job) {
TaskCreator::Create(const DeleteJobPtr &job) {
std::vector<TaskPtr> tasks;
auto label = std::make_shared<BroadcastLabel>();
auto task = std::make_shared<XDeleteTask>(job, label);
......@@ -62,5 +68,20 @@ TaskCreator::Create(const DeleteJobPtr& job) {
return tasks;
}
std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr &job) {
std::vector<TaskPtr> tasks;
//TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
for (auto &to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
task->job_ = job;
tasks.emplace_back(task);
}
return tasks;
}
} // namespace scheduler
} // namespace milvus
......@@ -32,6 +32,7 @@
#include "job/SearchJob.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/BuildIndexTask.h"
#include "task/Task.h"
namespace milvus {
......@@ -48,6 +49,9 @@ class TaskCreator {
static std::vector<TaskPtr>
Create(const DeleteJobPtr& job);
static std::vector<TaskPtr>
Create(const BuildIndexJobPtr& job);
};
} // namespace scheduler
......
......@@ -20,14 +20,16 @@
#include "../Algorithm.h"
#include "Action.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
namespace milvus {
namespace scheduler {
std::vector<ResourcePtr>
get_neighbours(const ResourcePtr& self) {
get_neighbours(const ResourcePtr &self) {
std::vector<ResourcePtr> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node)
continue;
......@@ -41,9 +43,9 @@ get_neighbours(const ResourcePtr& self) {
}
std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr& self) {
get_neighbours_with_connetion(const ResourcePtr &self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node)
continue;
......@@ -57,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) {
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t> speeds;
uint64_t total_speed = 0;
for (auto& neighbour : neighbours) {
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed);
total_speed += speed;
......@@ -87,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
void
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
for (auto& neighbour : neighbours) {
for (auto &neighbour : neighbours) {
neighbour->task_table().Put(task);
}
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) {
dest->task_table().Put(task);
}
......@@ -137,32 +139,55 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
auto compute_resources = res_mgr.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
if (task->job_.lock()->type() == JobType::SEARCH) {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost =
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
} else if (task->job_.lock()->type() == JobType::BUILD) {
//step2: Read device id in config
//get build index gpu resource
server::Config &config = server::Config::GetInstance();
int32_t build_index_gpu;
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
bool find_gpu_res = false;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
if (compute_resources[i]->name()
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
uint64_t cost =
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
if (resource->name() == task->path().Last()) {
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexJob.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
}
bool
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) {
std::unique_lock<std::mutex> lock(mutex_);
if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) {
return false;
}
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;
to_index_files_[to_index_file->id_] = to_index_file;
}
Status&
BuildIndexJob::WaitBuildIndexFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return to_index_files_.empty(); });
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done";
}
void
BuildIndexJob::BuildIndexDone(size_t to_index_id) {
std::unique_lock<std::mutex> lock(mutex_);
to_index_files_.erase(to_index_id);
cv_.notify_all();
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include <vector>
#include <list>
#include <queue>
#include <deque>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include "Job.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
namespace milvus {
namespace scheduler {
using engine::meta::TableFileSchemaPtr;
using Id2ToIndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
public:
bool
AddToIndexFiles(const TableFileSchemaPtr &to_index_file);
Status &
WaitBuildIndexFinish();
void
BuildIndexDone(size_t to_index_id);
public:
Status &
GetStatus() {
return status_;
}
Id2ToIndexMap &
to_index_files() {
return to_index_files_;
}
engine::meta::MetaPtr
meta() const {
return meta_ptr_;
}
engine::DBOptions
options() const {
return options_;
}
private:
Id2ToIndexMap to_index_files_;
engine::meta::MetaPtr meta_ptr_;
engine::DBOptions options_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
};
using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexTask.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <string>
#include <thread>
#include <utility>
namespace milvus {
namespace scheduler {
XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)), file_(file) {
if (file_) {
to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_,
(MetricType) file_->metric_type_, file_->nlist_);
}
}
void
XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
auto options = build_index_job->options();
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load(options.insert_cache_immediately_);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToIndexFileToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = to_index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception &ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (!stat.ok()) {
Status s;
if (stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} else {
error_msg = "Failed to load to_index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
build_index_job->BuildIndexDone(file_->id_);
}
return;
}
size_t file_size = to_index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" +
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
to_index_id_ = file_->id_;
to_index_type_ = file_->file_type_;
}
}
void
XBuildIndexTask::Execute() {
if (to_index_engine_ == nullptr) {
return;
}
TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
std::string location = file_->location_;
EngineType engine_type = (EngineType) file_->engine_type_;
std::shared_ptr<engine::ExecutionEngine> index;
// step 2: create table file
engine::meta::TableFileSchema table_file;
table_file.table_id_ = file_->table_id_;
table_file.date_ = file_->date_;
table_file.file_type_ =
engine::meta::TableFileSchema::NEW_INDEX;
engine::meta::MetaPtr meta_ptr = build_index_job->meta();
Status status = build_index_job->meta()->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = status;
return;
}
// step 3: build index
try {
index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_);
if (index == nullptr) {
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
return;
}
} catch (std::exception &ex) {
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 4: if table has been deleted, dont save index file
bool has_table = false;
meta_ptr->HasTable(file_->table_id_, has_table);
if (!has_table) {
meta_ptr->DeleteTableFiles(file_->table_id_);
return;
}
// step 5: save index file
try {
index->Serialize();
} catch (std::exception &ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 6: update meta
table_file.file_type_ = engine::meta::TableFileSchema::INDEX;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
auto origin_file = *file_;
origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP;
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr->UpdateTableFiles(update_files);
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
<< " bytes"
<< " from file " << origin_file.file_id_;
// index->Cache();
} else {
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX;
status = meta_ptr->UpdateTableFile(origin_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_
<< " to to_delete";
}
build_index_job->BuildIndexDone(to_index_id_);
}
rc.ElapseFromBegin("totally cost");
to_index_engine_ = nullptr;
}
} // namespace scheduler
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "Task.h"
#include "scheduler/Definition.h"
#include "scheduler/job/BuildIndexJob.h"
namespace milvus {
namespace scheduler {
class XBuildIndexTask : public Task {
public:
explicit XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
TableFileSchemaPtr file_;
TableFileSchema table_file_;
size_t to_index_id_ = 0;
int to_index_type_ = 0;
ExecutionEnginePtr to_index_engine_ = nullptr;
};
} // namespace scheduler
} // namespace milvus
......@@ -39,6 +39,7 @@ enum class LoadType {
enum class TaskType {
SearchTask,
DeleteTask,
BuildIndexTask,
TestTask,
};
......
......@@ -18,13 +18,14 @@
#pragma once
#include "TaskLabel.h"
#include "scheduler/ResourceMgr.h"
#include <memory>
#include <string>
class Resource;
using ResourceWPtr = std::weak_ptr<Resource>;
//class Resource;
//
//using ResourceWPtr = std::weak_ptr<Resource>;
namespace milvus {
namespace scheduler {
......
......@@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() {
}
void MetricTest::SetUp() {
boost::filesystem::remove_all("/tmp/milvus_test");
InitLog();
auto options = GetOptions();
db_ = ms::engine::DBFactory::Build(options);
......
......@@ -351,11 +351,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
handler->Insert(&context, &request, &vector_ids);
//Show table
// ::milvus::grpc::Command cmd;
// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer;
// status = handler->ShowTables(&context, &cmd, writer);
// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
//show tables
::milvus::grpc::Command cmd;
::milvus::grpc::TableNameList table_name_list;
status = handler->ShowTables(&context, &cmd, &table_name_list);
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
//Count Table
::milvus::grpc::TableRowCount count;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册