提交 59f45763 编写于 作者: Y Yu Kun

MS-636 Add optimizer in scheduler for FAISS_IVFSQ8H


Former-commit-id: 4dff6236c98a86965e686b4a94e5492e0c1d5722
上级 c884b9fd
......@@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA.
## New Feature
- MS-627 - Integrate new index: IVFSQHybrid
- MS-631 - IVFSQ8H Index support
- MS-636 - Add optimizer in scheduler for FAISS_IVFSQ8H
## Task
- MS-554 - Change license to Apache 2.0
......
......@@ -900,20 +900,21 @@ DBImpl::BackgroundBuildIndex() {
meta_ptr_->FilesToIndex(to_index_files);
Status status;
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
if (!to_index_files.empty()) {
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
// step 2: put build index task to scheduler
for (auto& file : to_index_files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr);
}
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
// step 2: put build index task to scheduler
for (auto& file : to_index_files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr);
}
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
}
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
......
......@@ -29,8 +29,6 @@ constexpr uint64_t MAXINT = std::numeric_limits<uint32_t>::max();
uint64_t
ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrPtr& res_mgr,
std::vector<std::string>& path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResources().size();
std::unordered_map<uint64_t, std::string> id_name_map;
std::unordered_map<std::string, uint64_t> name_id_map;
......
......@@ -16,7 +16,9 @@
// under the License.
#include "scheduler/JobMgr.h"
#include "SchedInst.h"
#include "TaskCreator.h"
#include "optimizer/Optimizer.h"
#include "task/Task.h"
#include <src/scheduler/optimizer/Optimizer.h>
......@@ -67,8 +69,9 @@ JobMgr::worker_function() {
}
auto tasks = build_task(job);
// TODO: optimizer all task
for (auto& task : tasks) {
OptimizerInst::GetInstance()->Run(task);
}
// disk resources NEVER be empty.
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
......
......@@ -38,6 +38,9 @@ std::mutex SchedInst::mutex_;
scheduler::JobMgrPtr JobMgrInst::instance = nullptr;
std::mutex JobMgrInst::mutex_;
OptimizerPtr OptimizerInst::instance = nullptr;
std::mutex OptimizerInst::mutex_;
void
load_simple_config() {
server::Config& config = server::Config::GetInstance();
......
......@@ -20,9 +20,12 @@
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "optimizer/HybridPass.h"
#include "optimizer/Optimizer.h"
#include <memory>
#include <mutex>
#include <vector>
namespace milvus {
namespace scheduler {
......@@ -81,6 +84,27 @@ class JobMgrInst {
static std::mutex mutex_;
};
class OptimizerInst {
public:
static OptimizerPtr
GetInstance() {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
HybridPassPtr pass_ptr = std::make_shared<HybridPass>();
std::vector<PassPtr> pass_list;
pass_list.push_back(pass_ptr);
instance = std::make_shared<Optimizer>(pass_list);
}
}
return instance;
}
private:
static scheduler::OptimizerPtr instance;
static std::mutex mutex_;
};
void
StartSchedulerService();
......
......@@ -16,10 +16,10 @@
// under the License.
#include "scheduler/TaskCreator.h"
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "SchedInst.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
......
......@@ -19,6 +19,7 @@
#include <random>
#include "../Algorithm.h"
#include "Action.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
......@@ -145,25 +146,35 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
paths.emplace_back(path);
}
if (task->job_.lock()->type() == JobType::SEARCH) {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost =
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
auto label = task->label();
auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
if (spec_label->resource().lock()->type() == ResourceType::CPU) {
std::vector<std::string> spec_path;
spec_path.push_back(spec_label->resource().lock()->name());
spec_path.push_back(resource->name());
task->path() = Path(spec_path, spec_path.size() - 1);
} else {
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() +
transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
} else if (task->job_.lock()->type() == JobType::BUILD) {
// step2: Read device id in config
// get build index gpu resource
......
......@@ -16,7 +16,9 @@
// under the License.
#include "scheduler/optimizer/HybridPass.h"
#include "scheduler/SchedInst.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
......@@ -28,7 +30,10 @@ HybridPass::Run(const TaskPtr& task) {
return false;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) {
// TODO: make specified label
// TODO: remove "cpu" hardcode
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
task->label() = label;
return true;
}
return false;
......
......@@ -25,6 +25,7 @@
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "Pass.h"
......@@ -34,7 +35,8 @@ namespace scheduler {
class Optimizer {
public:
Optimizer() = default;
explicit Optimizer(std::vector<PassPtr> pass_list) : pass_list_(std::move(pass_list)) {
}
void
Init();
......@@ -46,5 +48,7 @@ class Optimizer {
std::vector<PassPtr> pass_list_;
};
using OptimizerPtr = std::shared_ptr<Optimizer>;
} // namespace scheduler
} // namespace milvus
......@@ -124,6 +124,7 @@ XBuildIndexTask::Execute() {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = status;
to_index_engine_ = nullptr;
return;
}
......@@ -136,6 +137,7 @@ XBuildIndexTask::Execute() {
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
to_index_engine_ = nullptr;
return;
}
} catch (std::exception& ex) {
......@@ -150,6 +152,7 @@ XBuildIndexTask::Execute() {
<< std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
to_index_engine_ = nullptr;
return;
}
......@@ -158,6 +161,7 @@ XBuildIndexTask::Execute() {
meta_ptr->HasTable(file_->table_id_, has_table);
if (!has_table) {
meta_ptr->DeleteTableFiles(file_->table_id_);
to_index_engine_ = nullptr;
return;
}
......@@ -177,6 +181,7 @@ XBuildIndexTask::Execute() {
<< ", possible out of disk space" << std::endl;
build_index_job->GetStatus() = Status(DB_ERROR, msg);
to_index_engine_ = nullptr;
return;
}
......
......@@ -50,6 +50,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files)
set(scheduler_files
${scheduler_main_files}
${scheduler_action_files}
......@@ -57,6 +58,7 @@ set(scheduler_files
${scheduler_job_files}
${scheduler_resource_files}
${scheduler_task_files}
${scheduler_optimizer_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
......
......@@ -31,3 +31,12 @@ target_link_libraries(test_db
install(TARGETS test_db DESTINATION unittest)
configure_file(appendix/server_config.yaml
"${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/server_config.yaml"
COPYONLY)
configure_file(appendix/log_config.conf
"${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/log_config.conf"
COPYONLY)
* GLOBAL:
FORMAT = "%datetime | %level | %logger | %msg"
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log"
ENABLED = true
TO_FILE = true
TO_STANDARD_OUTPUT = false
SUBSECOND_PRECISION = 3
PERFORMANCE_TRACKING = false
MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB
* DEBUG:
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log"
ENABLED = true
* WARNING:
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log"
* TRACE:
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log"
* VERBOSE:
FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg"
TO_FILE = false
TO_STANDARD_OUTPUT = false
## Error logs
* ERROR:
ENABLED = true
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log"
* FATAL:
ENABLED = true
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log"
# All the following configurations are default values.
server_config:
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # port range: 1025 ~ 65534
deploy_mode: single # deployment type: single, cluster_readonly, cluster_writable
time_zone: UTC+8
db_config:
primary_path: /tmp/milvus # path used to store data and meta
secondary_path: # path used to store data only, split by semicolon
backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database
# Keep 'dialect://:@:/', and replace other texts with real values.
# Replace 'dialect' with 'mysql' or 'sqlite'
insert_buffer_size: 4 # GB, maximum insert buffer size allowed
build_index_gpu: 0 # gpu id used for building index
metric_config:
enable_monitor: false # enable monitoring or not
collector: prometheus # prometheus
prometheus_config:
port: 8080 # port prometheus used to fetch metrics
cache_config:
cpu_mem_capacity: 16 # GB, CPU memory used for cache
cpu_mem_threshold: 0.85 # percentage of data kept when cache cleanup triggered
cache_insert_data: false # whether load inserted data into cache
engine_config:
blas_threshold: 20
resource_config:
resource_pool:
- cpu
- gpu0
......@@ -23,14 +23,18 @@
#include "db/DBFactory.h"
#include "cache/CpuCacheMgr.h"
#include "utils/CommonUtil.h"
#include "server/Config.h"
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <thread>
#include <random>
namespace {
static const char *CONFIG_FILE_PATH = "./milvus/conf/server_config.yaml";
static const char *TABLE_NAME = "test_group";
static constexpr int64_t TABLE_DIM = 256;
static constexpr int64_t VECTOR_COUNT = 25000;
......@@ -228,6 +232,9 @@ TEST_F(DBTest, DB_TEST) {
}
TEST_F(DBTest, SEARCH_TEST) {
milvus::server::Config &config = milvus::server::Config::GetInstance();
milvus::Status s = config.LoadConfigFile(CONFIG_FILE_PATH);
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
auto stat = db_->CreateTable(table_info);
......@@ -290,6 +297,25 @@ TEST_F(DBTest, SEARCH_TEST) {
ASSERT_TRUE(stat.ok());
}
//test FAISS_IVFSQ8H optimizer
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
db_->CreateIndex(TABLE_NAME, index); // wait until build index finish
{
milvus::engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results);
ASSERT_TRUE(stat.ok());
}
{//search by specify index file
milvus::engine::meta::DatesT dates;
std::vector<std::string> file_ids = {"1", "2", "3", "4", "5", "6"};
milvus::engine::QueryResults results;
stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results);
ASSERT_TRUE(stat.ok());
}
// TODO(lxj): add groundTruth assert
}
......
......@@ -97,7 +97,7 @@ DBTest::SetUp() {
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, true));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册