diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1a62a791b0bb4db048db137a7e28e5f815b29bb3..d0763d5c069072be4bc46d4df3696c5b5790cbbe 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature - MS-627 - Integrate new index: IVFSQHybrid - MS-631 - IVFSQ8H Index support +- MS-636 - Add optimizer in scheduler for FAISS_IVFSQ8H ## Task - MS-554 - Change license to Apache 2.0 diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index f81fb32e7fde14f7614dc7f263f721ba6d8f21b0..cf8f1824d2ce3dcd720bb87281d2160e69c292b9 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -900,20 +900,21 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; - scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); + if (!to_index_files.empty()) { + scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); - // step 2: put build index task to scheduler - for (auto& file : to_index_files) { - scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); - job->AddToIndexFiles(file_ptr); - } - scheduler::JobMgrInst::GetInstance()->Put(job); - job->WaitBuildIndexFinish(); - if (!job->GetStatus().ok()) { - Status status = job->GetStatus(); - ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); + // step 2: put build index task to scheduler + for (auto& file : to_index_files) { + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + job->AddToIndexFiles(file_ptr); + } + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitBuildIndexFinish(); + if (!job->GetStatus().ok()) { + Status status = job->GetStatus(); + ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); + } } - // for (auto &file : to_index_files) { // status = BuildIndex(file); // if (!status.ok()) { diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index 44f83742c25633902c5f00a249a4ec8c359d7568..b2156b3f97fcace46b09e0ca9d46523853cecf6b 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -29,8 +29,6 @@ constexpr uint64_t MAXINT = std::numeric_limits::max(); uint64_t ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrPtr& res_mgr, std::vector& path) { - std::vector> paths; - uint64_t num_of_resources = res_mgr->GetAllResources().size(); std::unordered_map id_name_map; std::unordered_map name_id_map; diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp index a4ef83ad75dd7922a65fb9eae95bf732c3f79b90..170dee4b80a02f8524d3fb8bfb0e4a46ad7dd1c8 100644 --- a/cpp/src/scheduler/JobMgr.cpp +++ b/cpp/src/scheduler/JobMgr.cpp @@ -16,7 +16,9 @@ // under the License. #include "scheduler/JobMgr.h" +#include "SchedInst.h" #include "TaskCreator.h" +#include "optimizer/Optimizer.h" #include "task/Task.h" #include @@ -67,8 +69,9 @@ JobMgr::worker_function() { } auto tasks = build_task(job); - - // TODO: optimizer all task + for (auto& task : tasks) { + OptimizerInst::GetInstance()->Run(task); + } // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index b9edbca001d6fdebae98f4ac4280bf2842e67c72..cc2b4e280ad00f70bb61774a3c6ca8a28bae0bce 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -38,6 +38,9 @@ std::mutex SchedInst::mutex_; scheduler::JobMgrPtr JobMgrInst::instance = nullptr; std::mutex JobMgrInst::mutex_; +OptimizerPtr OptimizerInst::instance = nullptr; +std::mutex OptimizerInst::mutex_; + void load_simple_config() { server::Config& config = server::Config::GetInstance(); diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index dc8e4ed478fb84ac964c5e4b7315c2df1d01a6da..0d2a04b02c09960a8a13b5a478f73508ec9acd24 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -20,9 +20,12 @@ #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" +#include "optimizer/HybridPass.h" +#include "optimizer/Optimizer.h" #include #include +#include namespace milvus { namespace scheduler { @@ -81,6 +84,27 @@ class JobMgrInst { static std::mutex mutex_; }; +class OptimizerInst { + public: + static OptimizerPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + HybridPassPtr pass_ptr = std::make_shared(); + std::vector pass_list; + pass_list.push_back(pass_ptr); + instance = std::make_shared(pass_list); + } + } + return instance; + } + + private: + static scheduler::OptimizerPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index ee63c2c6b7763ea102ec4650aa5590965ca0ed74..40cfa9aac694649d07a25ab59d134d2981fd1ea8 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -16,10 +16,10 @@ // under the License. #include "scheduler/TaskCreator.h" -#include #include "SchedInst.h" -#include "scheduler/tasklabel/BroadcastLabel.h" +#include "tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" +#include "tasklabel/SpecResLabel.h" namespace milvus { namespace scheduler { diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 53dd45faca0f5c57fc6f1973128b4b1b0a978c1f..95f8212297dd56e042a5fae4631a0177912e3ae6 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -19,6 +19,7 @@ #include #include "../Algorithm.h" #include "Action.h" +#include "scheduler/tasklabel/SpecResLabel.h" #include "src/cache/GpuCacheMgr.h" #include "src/server/Config.h" @@ -145,25 +146,35 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr paths.emplace_back(path); } if (task->job_.lock()->type() == JobType::SEARCH) { - // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx = 0; - for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->TotalTasks() == 0) { - min_cost_idx = i; - break; - } - uint64_t cost = - compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - if (min_cost > cost) { - min_cost = cost; - min_cost_idx = i; + auto label = task->label(); + auto spec_label = std::static_pointer_cast(label); + if (spec_label->resource().lock()->type() == ResourceType::CPU) { + std::vector spec_path; + spec_path.push_back(spec_label->resource().lock()->name()); + spec_path.push_back(resource->name()); + task->path() = Path(spec_path, spec_path.size() - 1); + } else { + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } } + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; } - // step 3: set path in task - Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); - task->path() = task_path; } else if (task->job_.lock()->type() == JobType::BUILD) { // step2: Read device id in config // get build index gpu resource diff --git a/cpp/src/scheduler/optimizer/HybridPass.cpp b/cpp/src/scheduler/optimizer/HybridPass.cpp index 343d6ec81e80cac95a7edcd19bca84d7891d7a66..d63fc2e819de149b5d41def74a6df22e606de318 100644 --- a/cpp/src/scheduler/optimizer/HybridPass.cpp +++ b/cpp/src/scheduler/optimizer/HybridPass.cpp @@ -16,7 +16,9 @@ // under the License. #include "scheduler/optimizer/HybridPass.h" +#include "scheduler/SchedInst.h" #include "scheduler/task/SearchTask.h" +#include "scheduler/tasklabel/SpecResLabel.h" namespace milvus { namespace scheduler { @@ -28,7 +30,10 @@ HybridPass::Run(const TaskPtr& task) { return false; auto search_task = std::static_pointer_cast(task); if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) { - // TODO: make specified label + // TODO: remove "cpu" hardcode + ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu"); + auto label = std::make_shared(std::weak_ptr(res_ptr)); + task->label() = label; return true; } return false; diff --git a/cpp/src/scheduler/optimizer/Optimizer.h b/cpp/src/scheduler/optimizer/Optimizer.h index 99282e66a679d5480661c1652cf85eaee1151870..68b519e115cbf8769ff1339f5e4b84030c4a807c 100644 --- a/cpp/src/scheduler/optimizer/Optimizer.h +++ b/cpp/src/scheduler/optimizer/Optimizer.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "Pass.h" @@ -34,7 +35,8 @@ namespace scheduler { class Optimizer { public: - Optimizer() = default; + explicit Optimizer(std::vector pass_list) : pass_list_(std::move(pass_list)) { + } void Init(); @@ -46,5 +48,7 @@ class Optimizer { std::vector pass_list_; }; +using OptimizerPtr = std::shared_ptr; + } // namespace scheduler } // namespace milvus diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index f2cebcac9e24b9b36d6003ad19ca20242cc0715c..25d3d73a7bb5554b7dd613703d406c0d88d04908 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -124,6 +124,7 @@ XBuildIndexTask::Execute() { ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); build_index_job->BuildIndexDone(to_index_id_); build_index_job->GetStatus() = status; + to_index_engine_ = nullptr; return; } @@ -136,6 +137,7 @@ XBuildIndexTask::Execute() { ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + to_index_engine_ = nullptr; return; } } catch (std::exception& ex) { @@ -150,6 +152,7 @@ XBuildIndexTask::Execute() { << std::endl; build_index_job->GetStatus() = Status(DB_ERROR, msg); + to_index_engine_ = nullptr; return; } @@ -158,6 +161,7 @@ XBuildIndexTask::Execute() { meta_ptr->HasTable(file_->table_id_, has_table); if (!has_table) { meta_ptr->DeleteTableFiles(file_->table_id_); + to_index_engine_ = nullptr; return; } @@ -177,6 +181,7 @@ XBuildIndexTask::Execute() { << ", possible out of disk space" << std::endl; build_index_job->GetStatus() = Status(DB_ERROR, msg); + to_index_engine_ = nullptr; return; } diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index 6c9aeadcd123683abb6c846c6d815e6a4b1e647a..ac4fae85bb8af8a037fe24a8b2db9c208527367a 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -50,6 +50,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files) set(scheduler_files ${scheduler_main_files} ${scheduler_action_files} @@ -57,6 +58,7 @@ set(scheduler_files ${scheduler_job_files} ${scheduler_resource_files} ${scheduler_task_files} + ${scheduler_optimizer_files} ) aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files) diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 2cbf55a208d79b8a59be9c48a34f5908b5bd35b4..4bce9f35b3ecf4b649dd7672bb26267ab605ed07 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -31,3 +31,12 @@ target_link_libraries(test_db install(TARGETS test_db DESTINATION unittest) +configure_file(appendix/server_config.yaml + "${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/server_config.yaml" + COPYONLY) + +configure_file(appendix/log_config.conf + "${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/log_config.conf" + COPYONLY) + + diff --git a/cpp/unittest/db/appendix/log_config.conf b/cpp/unittest/db/appendix/log_config.conf new file mode 100644 index 0000000000000000000000000000000000000000..0a3e0d21afbd87070cb4d5f4738bf328ebb44273 --- /dev/null +++ b/cpp/unittest/db/appendix/log_config.conf @@ -0,0 +1,27 @@ +* GLOBAL: + FORMAT = "%datetime | %level | %logger | %msg" + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log" + ENABLED = true + TO_FILE = true + TO_STANDARD_OUTPUT = false + SUBSECOND_PRECISION = 3 + PERFORMANCE_TRACKING = false + MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB +* DEBUG: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log" + ENABLED = true +* WARNING: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log" +* TRACE: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log" +* VERBOSE: + FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg" + TO_FILE = false + TO_STANDARD_OUTPUT = false +## Error logs +* ERROR: + ENABLED = true + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log" +* FATAL: + ENABLED = true + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log" diff --git a/cpp/unittest/db/appendix/server_config.yaml b/cpp/unittest/db/appendix/server_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..f92b2f1a1891d4f3959fecd7ca811935776b08fb --- /dev/null +++ b/cpp/unittest/db/appendix/server_config.yaml @@ -0,0 +1,37 @@ +# All the following configurations are default values. + +server_config: + address: 0.0.0.0 # milvus server ip address (IPv4) + port: 19530 # port range: 1025 ~ 65534 + deploy_mode: single # deployment type: single, cluster_readonly, cluster_writable + time_zone: UTC+8 + +db_config: + primary_path: /tmp/milvus # path used to store data and meta + secondary_path: # path used to store data only, split by semicolon + + backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database + # Keep 'dialect://:@:/', and replace other texts with real values. + # Replace 'dialect' with 'mysql' or 'sqlite' + + insert_buffer_size: 4 # GB, maximum insert buffer size allowed + build_index_gpu: 0 # gpu id used for building index + +metric_config: + enable_monitor: false # enable monitoring or not + collector: prometheus # prometheus + prometheus_config: + port: 8080 # port prometheus used to fetch metrics + +cache_config: + cpu_mem_capacity: 16 # GB, CPU memory used for cache + cpu_mem_threshold: 0.85 # percentage of data kept when cache cleanup triggered + cache_insert_data: false # whether load inserted data into cache + +engine_config: + blas_threshold: 20 + +resource_config: + resource_pool: + - cpu + - gpu0 diff --git a/cpp/unittest/db/test_db.cpp b/cpp/unittest/db/test_db.cpp index e502d0f742ac57884e9ae36b76f1f40edbd3fcfa..9e80afbc099014eff6d777cdadfae92217230ce2 100644 --- a/cpp/unittest/db/test_db.cpp +++ b/cpp/unittest/db/test_db.cpp @@ -23,14 +23,18 @@ #include "db/DBFactory.h" #include "cache/CpuCacheMgr.h" #include "utils/CommonUtil.h" +#include "server/Config.h" #include #include #include #include + namespace { +static const char *CONFIG_FILE_PATH = "./milvus/conf/server_config.yaml"; + static const char *TABLE_NAME = "test_group"; static constexpr int64_t TABLE_DIM = 256; static constexpr int64_t VECTOR_COUNT = 25000; @@ -228,6 +232,9 @@ TEST_F(DBTest, DB_TEST) { } TEST_F(DBTest, SEARCH_TEST) { + milvus::server::Config &config = milvus::server::Config::GetInstance(); + milvus::Status s = config.LoadConfigFile(CONFIG_FILE_PATH); + milvus::engine::meta::TableSchema table_info = BuildTableSchema(); auto stat = db_->CreateTable(table_info); @@ -290,6 +297,25 @@ TEST_F(DBTest, SEARCH_TEST) { ASSERT_TRUE(stat.ok()); } + //test FAISS_IVFSQ8H optimizer + index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; + db_->CreateIndex(TABLE_NAME, index); // wait until build index finish + + { + milvus::engine::QueryResults results; + stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results); + ASSERT_TRUE(stat.ok()); + } + + {//search by specify index file + milvus::engine::meta::DatesT dates; + std::vector file_ids = {"1", "2", "3", "4", "5", "6"}; + milvus::engine::QueryResults results; + stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results); + ASSERT_TRUE(stat.ok()); + } + + // TODO(lxj): add groundTruth assert } diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index dfbffc66bc1194fa58637be4dacdb04fd1800c2d..67beeba36fffefc887335f7a32d4d7ffd3e1efde 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -97,7 +97,7 @@ DBTest::SetUp() { auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); res_mgr->Clear(); res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, true, false)); - res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, false)); + res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, true)); res_mgr->Add(milvus::scheduler::ResourceFactory::Create("gtx1660", "GPU", 0, true, true)); auto default_conn = milvus::scheduler::Connection("IO", 500.0);