diff --git a/ci/jenkinsfile/dev_test.groovy b/ci/jenkinsfile/dev_test.groovy index 6fb938854513ecd0eb15bb506489f2897e6fc282..f9df9b406520ffa6b777c8261b563b172e5e9d38 100644 --- a/ci/jenkinsfile/dev_test.groovy +++ b/ci/jenkinsfile/dev_test.groovy @@ -2,10 +2,9 @@ timeout(time: 30, unit: 'MINUTES') { try { dir ("${PROJECT_NAME}_test") { checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) - sh 'python3 -m pip install -r requirements.txt' + sh 'python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com' sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --level=1 --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local" } - // mysql database backend test load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy" diff --git a/ci/jenkinsfile/dev_test_all.groovy b/ci/jenkinsfile/dev_test_all.groovy index a7b3192cd0ec0099a27611b183238f695b9e1221..b11ea755b9e5b482a99851d7732529a5fa400424 100644 --- a/ci/jenkinsfile/dev_test_all.groovy +++ b/ci/jenkinsfile/dev_test_all.groovy @@ -2,7 +2,7 @@ timeout(time: 60, unit: 'MINUTES') { try { dir ("${PROJECT_NAME}_test") { checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) - sh 'python3 -m pip install -r requirements.txt' + sh 'python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com' sh "pytest . --alluredir=\"test_out/dev/single/sqlite\" --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local" } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 903b00111e5a4475e209e99a551bdd2753610c4a..6c561c0e5a8e066ca9f87127bbf1b5706ee0cdea 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-637 - out of memory when load too many tasks - MS-640 - Cache object size calculate incorrect - MS-641 - Segment fault(signal 11) in PickToLoad +- MS-639 - SQ8H index created failed and server hang ## Improvement - MS-552 - Add and change the easylogging library @@ -36,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-619 - Add optimizer class in scheduler - MS-614 - Preload table at startup - MS-626 - Refactor DataObj to support cache any type data +- MS-648 - Improve unittest ## New Feature - MS-627 - Integrate new index: IVFSQHybrid diff --git a/core/cmake/ThirdPartyPackages.cmake b/core/cmake/ThirdPartyPackages.cmake index a178468b0f8c40118dbc486b3894f7de267e0d74..17477f259f3e1456b05f16ac467faf839bbda8c4 100644 --- a/core/cmake/ThirdPartyPackages.cmake +++ b/core/cmake/ThirdPartyPackages.cmake @@ -143,7 +143,7 @@ if(USE_JFROG_CACHE STREQUAL "ON") if(NOT DEFINED JFROG_ARTFACTORY_URL) message(FATAL_ERROR "JFROG_ARTFACTORY_URL is not set") endif() - set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}") + set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${MILVUS_BUILD_ARCH}/${BUILD_TYPE}") if(DEFINED ENV{JFROG_USER_NAME}) set(JFROG_USER_NAME "$ENV{JFROG_USER_NAME}") endif() diff --git a/core/src/db/engine/ExecutionEngine.h b/core/src/db/engine/ExecutionEngine.h index d7de6f37261f0370ca6c47553ed0e6c18e8ec37b..2c4960e6ac12a1b6d46a9b34173b5b48e5b52503 100644 --- a/core/src/db/engine/ExecutionEngine.h +++ b/core/src/db/engine/ExecutionEngine.h @@ -65,7 +65,7 @@ class ExecutionEngine { Load(bool to_cache = true) = 0; virtual Status - CopyToGpu(uint64_t device_id) = 0; + CopyToGpu(uint64_t device_id, bool hybrid) = 0; virtual Status CopyToIndexFileToGpu(uint64_t device_id) = 0; @@ -80,7 +80,8 @@ class ExecutionEngine { Merge(const std::string& location) = 0; virtual Status - Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0; + Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, + bool hybrid) const = 0; virtual std::shared_ptr BuildIndex(const std::string& location, EngineType engine_type) = 0; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index c5a36db07f1a124a12e189cf5884577e370fa74a..3fa68aae520dd81e79db016e031fb48bd5a6931a 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -31,6 +31,7 @@ #include "wrapper/ConfAdapter.h" #include "wrapper/ConfAdapterMgr.h" +#include #include #include #include @@ -245,7 +246,31 @@ ExecutionEngineImpl::Load(bool to_cache) { } Status -ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { +ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { + if (hybrid) { + auto key = location_ + ".quantizer"; + auto quantizer = + std::static_pointer_cast(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key)); + + auto conf = std::make_shared(); + conf->gpu_id = device_id; + + if (quantizer) { + // cache hit + conf->mode = 2; + auto new_index = index_->LoadData(quantizer->Data(), conf); + index_ = new_index; + } else { + auto pair = index_->CopyToGpuWithQuantizer(device_id); + index_ = pair.first; + + // cache + auto cached_quantizer = std::make_shared(pair.second); + cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer); + } + return Status::OK(); + } + auto index = std::static_pointer_cast(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); bool already_in_cache = (index != nullptr); if (already_in_cache) { @@ -389,8 +414,8 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t } Status -ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, - int64_t* labels) const { +ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, + bool hybrid) const { if (index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; return Status(DB_ERROR, "index is null"); @@ -406,11 +431,15 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); - HybridLoad(); + if (hybrid) { + HybridLoad(); + } auto status = index_->Search(n, data, distances, labels, conf); - HybridUnset(); + if (hybrid) { + HybridUnset(); + } if (!status.ok()) { ENGINE_LOG_ERROR << "Search error"; diff --git a/core/src/db/engine/ExecutionEngineImpl.h b/core/src/db/engine/ExecutionEngineImpl.h index 4594986bd9bbd011ff2698b0fee195fea3eac66d..9cbabb2bd53ee5f67f84cca651422b0defda89a4 100644 --- a/core/src/db/engine/ExecutionEngineImpl.h +++ b/core/src/db/engine/ExecutionEngineImpl.h @@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine { Load(bool to_cache) override; Status - CopyToGpu(uint64_t device_id) override; + CopyToGpu(uint64_t device_id, bool hybrid = false) override; Status CopyToIndexFileToGpu(uint64_t device_id) override; @@ -71,7 +71,8 @@ class ExecutionEngineImpl : public ExecutionEngine { Merge(const std::string& location) override; Status - Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override; + Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, + bool hybrid = false) const override; ExecutionEnginePtr BuildIndex(const std::string& location, EngineType engine_type) override; diff --git a/core/src/index/cmake/ThirdPartyPackagesCore.cmake b/core/src/index/cmake/ThirdPartyPackagesCore.cmake index f2823a78c62ec422f724a0a53e1e8688cf81ae52..12d5c2a71168429d87060e0d1d181f9f74a27018 100644 --- a/core/src/index/cmake/ThirdPartyPackagesCore.cmake +++ b/core/src/index/cmake/ThirdPartyPackagesCore.cmake @@ -123,7 +123,7 @@ if(NOT DEFINED USE_JFROG_CACHE) set(USE_JFROG_CACHE "OFF") endif() if(USE_JFROG_CACHE STREQUAL "ON") - set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/generic-local/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${KNOWHERE_BUILD_ARCH}/${BUILD_TYPE}") + set(JFROG_ARTFACTORY_CACHE_URL "${JFROG_ARTFACTORY_URL}/milvus/thirdparty/cache/${CMAKE_OS_NAME}/${KNOWHERE_BUILD_ARCH}/${BUILD_TYPE}") set(THIRDPARTY_PACKAGE_CACHE "${THIRDPARTY_DIR}/cache") endif() @@ -234,7 +234,7 @@ if(CUSTOMIZATION) # set(FAISS_MD5 "57da9c4f599cc8fa4260488b1c96e1cc") # commit-id 6dbdf75987c34a2c853bd172ea0d384feea8358c branch-0.2.0 # set(FAISS_MD5 "21deb1c708490ca40ecb899122c01403") # commit-id 643e48f479637fd947e7b93fa4ca72b38ecc9a39 branch-0.2.0 # set(FAISS_MD5 "072db398351cca6e88f52d743bbb9fa0") # commit-id 3a2344d04744166af41ef1a74449d68a315bfe17 branch-0.2.1 - set(FAISS_MD5 "94988b7bdac4eb82a9575c702a3f2df3") # commit-id 1407526b31cad26f98ceca8dddaface8f18c4c19 branch-0.2.1 + set(FAISS_MD5 "c89ea8e655f5cdf58f42486f13614714") # commit-id 9c28a1cbb88f41fa03b03d7204106201ad33276b branch-0.2.1 execute_process(COMMAND wget -q --method HEAD ${FAISS_SOURCE_URL} RESULT_VARIABLE return_code) message(STATUS "Check the remote cache file ${FAISS_SOURCE_URL}. return code = ${return_code}") diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp index 1b4f4e9edba8edf4e2c19b5c48ea1bebed3cf432..5e1f5226f25422145074186de79fe8db197c09e6 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp @@ -71,4 +71,13 @@ GPUIVFSQ::CopyGpuToCpu(const Config& config) { return std::make_shared(new_index); } +void +GPUIVFSQ::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) { +#ifdef CUSTOMIZATION + GPUIVF::search_impl(n, data, k, distances, labels, cfg); +#else + IVF::search_impl(n, data, k, distances, labels, cfg); +#endif +} + } // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h b/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h index ed8013d77f1f87535a8cabb7ca256c7ee28bc399..7332bce691bd583cd9383290e48987817ce08447 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h @@ -38,6 +38,10 @@ class GPUIVFSQ : public GPUIVF { VectorIndexPtr CopyGpuToCpu(const Config& config) override; + + protected: + void + search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) override; }; } // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp index 510ab46bd6de3bd0683a0a78cf433835a5c8144b..0c4856f2b64b844f94345de1a7d9c72d76c53b2c 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp @@ -115,6 +115,19 @@ IVF::Search(const DatasetPtr& dataset, const Config& config) { search_impl(rows, (float*)p_data, search_cfg->k, res_dis, res_ids, config); + // std::stringstream ss_res_id, ss_res_dist; + // for (int i = 0; i < 10; ++i) { + // printf("%llu", res_ids[i]); + // printf("\n"); + // printf("%.6f", res_dis[i]); + // printf("\n"); + // ss_res_id << res_ids[i] << " "; + // ss_res_dist << res_dis[i] << " "; + // } + // std::cout << std::endl << "after search: " << std::endl; + // std::cout << ss_res_id.str() << std::endl; + // std::cout << ss_res_dist.str() << std::endl << std::endl; + auto id_buf = MakeMutableBufferSmart((uint8_t*)res_ids, sizeof(int64_t) * elems); auto dist_buf = MakeMutableBufferSmart((uint8_t*)res_dis, sizeof(float) * elems); diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp index 5f916f3370a9afe9f81318252cc94253fbb93eb5..268b7fb9e329c62c6096e930579599b6a004e799 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp @@ -17,6 +17,7 @@ // under the License. #include "knowhere/index/vector_index/IndexIVFSQHybrid.h" +#include #include "faiss/AutoTune.h" #include "faiss/gpu/GpuAutoTune.h" #include "faiss/gpu/GpuIndexIVF.h" @@ -79,20 +80,8 @@ IVFSQHybrid::CopyGpuToCpu(const Config& config) { VectorIndexPtr IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) { if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) { - ResScope rs(res, device_id, false); - faiss::gpu::GpuClonerOptions option; - option.allInGpu = true; - - faiss::IndexComposition index_composition; - index_composition.index = index_.get(); - index_composition.quantizer = nullptr; - index_composition.mode = 0; // copy all - - auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option); - - std::shared_ptr device_index; - device_index.reset(gpu_index); - return std::make_shared(device_index, device_id, res); + auto p = CopyCpuToGpuWithQuantizer(device_id, config); + return p.first; } else { KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); } @@ -180,7 +169,7 @@ IVFSQHybrid::UnsetQuantizer() { ivf_index->quantizer = nullptr; } -void +VectorIndexPtr IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { auto quantizer_conf = std::dynamic_pointer_cast(conf); if (quantizer_conf != nullptr) { @@ -188,9 +177,10 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { KNOWHERE_THROW_MSG("mode only support 2 in this func"); } } - if (quantizer_conf->gpu_id != gpu_id_) { - KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card"); - } + // if (quantizer_conf->gpu_id != gpu_id_) { + // KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card"); + // } + gpu_id_ = quantizer_conf->gpu_id; if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_)) { ResScope rs(res, gpu_id_, false); @@ -207,8 +197,37 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { index_composition->mode = quantizer_conf->mode; // only 2 auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option); - index_.reset(gpu_index); - gpu_mode = 2; // all in gpu + std::shared_ptr new_idx; + new_idx.reset(gpu_index); + auto sq_idx = std::make_shared(new_idx, gpu_id_, res); + return sq_idx; + } else { + KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); + } +} + +std::pair +IVFSQHybrid::CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config) { + if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(device_id)) { + ResScope rs(res, device_id, false); + faiss::gpu::GpuClonerOptions option; + option.allInGpu = true; + + faiss::IndexComposition index_composition; + index_composition.index = index_.get(); + index_composition.quantizer = nullptr; + index_composition.mode = 0; // copy all + + auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option); + + std::shared_ptr device_index; + device_index.reset(gpu_index); + auto new_idx = std::make_shared(device_index, device_id, res); + + auto q = std::make_shared(); + q->quantizer = index_composition.quantizer; + q->size = index_composition.quantizer->d * index_composition.quantizer->getNumVecs() * sizeof(float); + return std::make_pair(new_idx, q); } else { KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); } diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h index e2ca208d902b4314df5c0a6ef134efe383900b19..f54c61c20f69fcdc7758d8e22fc733aca7aedbd0 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h @@ -19,6 +19,7 @@ #include #include +#include #include "IndexGPUIVFSQ.h" #include "Quantizer.h" @@ -60,9 +61,12 @@ class IVFSQHybrid : public GPUIVFSQ { void UnsetQuantizer(); - void + VectorIndexPtr LoadData(const knowhere::QuantizerPtr& q, const Config& conf); + std::pair + CopyCpuToGpuWithQuantizer(const int64_t& device_id, const Config& config); + IndexModelPtr Train(const DatasetPtr& dataset, const Config& config) override; diff --git a/core/src/index/unittest/test_ivf.cpp b/core/src/index/unittest/test_ivf.cpp index c6faea9182c866c25d2e409e5123c45d82e6fb3c..3b3e557c4d77637468f086b631b985ef3b8a8380 100644 --- a/core/src/index/unittest/test_ivf.cpp +++ b/core/src/index/unittest/test_ivf.cpp @@ -154,8 +154,8 @@ class IVFTest : public DataGen, public TestWithParam<::std::tupleSearch(query_dataset, conf); AssertAnns(result, nq, conf->k); PrintResult(result, nq, k); + hybrid_1_idx->UnsetQuantizer(); } { @@ -253,9 +254,9 @@ TEST_P(IVFTest, hybrid) { quantizer_conf->gpu_id = device_id; auto q = hybrid_2_idx->LoadQuantizer(quantizer_conf); quantizer_conf->mode = 2; - hybrid_2_idx->LoadData(q, quantizer_conf); + auto gpu_idx = hybrid_2_idx->LoadData(q, quantizer_conf); - auto result = hybrid_2_idx->Search(query_dataset, conf); + auto result = gpu_idx->Search(query_dataset, conf); AssertAnns(result, nq, conf->k); PrintResult(result, nq, k); } @@ -438,6 +439,7 @@ TEST_P(IVFTest, clone_test) { } } +#ifdef CUSTOMIZATION TEST_P(IVFTest, seal_test) { // FaissGpuResourceMgr::GetInstance().InitDevice(device_id); @@ -472,6 +474,7 @@ TEST_P(IVFTest, seal_test) { auto with_seal = tc.RecordSection("With seal"); ASSERT_GE(without_seal, with_seal); } +#endif class GPURESTEST : public DataGen, public ::testing::Test { protected: @@ -637,7 +640,7 @@ TEST_F(GPURESTEST, copyandsearch) { // search and copy at the same time printf("==================\n"); - index_type = "GPUIVFSQ"; + index_type = "GPUIVF"; index_ = IndexFactory(index_type); auto conf = std::make_shared(); @@ -699,7 +702,7 @@ TEST_F(GPURESTEST, copyandsearch) { } TEST_F(GPURESTEST, TrainAndSearch) { - index_type = "GPUIVFSQ"; + index_type = "GPUIVF"; index_ = IndexFactory(index_type); auto conf = std::make_shared(); diff --git a/core/src/index/unittest/test_kdt.cpp b/core/src/index/unittest/test_kdt.cpp index 875944be83772f1aaa6cc74d54cd05f60a193337..8758fee669460763b34165295584cf056a509f61 100644 --- a/core/src/index/unittest/test_kdt.cpp +++ b/core/src/index/unittest/test_kdt.cpp @@ -36,6 +36,7 @@ class KDTTest : public DataGen, public ::testing::Test { protected: void SetUp() override { + Generate(96, 1000, 10); index_ = std::make_shared(); auto tempconf = std::make_shared(); diff --git a/core/src/index/unittest/test_nsg/test_nsg.cpp b/core/src/index/unittest/test_nsg/test_nsg.cpp index 5aaa65abe293e80a174dc90d82a544eefa592112..657387f2193c3759c5f2a276ff28b682856b369e 100644 --- a/core/src/index/unittest/test_nsg/test_nsg.cpp +++ b/core/src/index/unittest/test_nsg/test_nsg.cpp @@ -38,17 +38,17 @@ class NSGInterfaceTest : public DataGen, public ::testing::Test { SetUp() override { // Init_with_default(); knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024 * 1024 * 200, 1024 * 1024 * 600, 2); - Generate(256, 1000000, 1); + Generate(256, 1000000 / 100, 1); index_ = std::make_shared(); auto tmp_conf = std::make_shared(); tmp_conf->gpu_id = DEVICE_ID; - tmp_conf->knng = 100; - tmp_conf->nprobe = 32; - tmp_conf->nlist = 16384; - tmp_conf->search_length = 60; - tmp_conf->out_degree = 70; - tmp_conf->candidate_pool_size = 500; + tmp_conf->knng = 20; + tmp_conf->nprobe = 8; + tmp_conf->nlist = 163; + tmp_conf->search_length = 40; + tmp_conf->out_degree = 30; + tmp_conf->candidate_pool_size = 100; tmp_conf->metric_type = knowhere::METRICTYPE::L2; train_conf = tmp_conf; diff --git a/core/src/scheduler/JobMgr.cpp b/core/src/scheduler/JobMgr.cpp index 170dee4b80a02f8524d3fb8bfb0e4a46ad7dd1c8..70f1352a5cabc4392e21161274b0c4874a3e22b9 100644 --- a/core/src/scheduler/JobMgr.cpp +++ b/core/src/scheduler/JobMgr.cpp @@ -19,9 +19,11 @@ #include "SchedInst.h" #include "TaskCreator.h" #include "optimizer/Optimizer.h" +#include "scheduler/Algorithm.h" +#include "scheduler/optimizer/Optimizer.h" +#include "scheduler/tasklabel/SpecResLabel.h" #include "task/Task.h" -#include #include namespace milvus { @@ -73,6 +75,10 @@ JobMgr::worker_function() { OptimizerInst::GetInstance()->Run(task); } + for (auto& task : tasks) { + calculate_path(task); + } + // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { @@ -87,5 +93,23 @@ JobMgr::build_task(const JobPtr& job) { return TaskCreator::Create(job); } +void +JobMgr::calculate_path(const TaskPtr& task) { + if (task->type_ != TaskType::SearchTask) { + return; + } + + if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) { + return; + } + + std::vector path; + auto spec_label = std::static_pointer_cast(task->label()); + auto src = res_mgr_->GetDiskResources()[0]; + auto dest = spec_label->resource(); + ShortestPath(src.lock(), dest.lock(), res_mgr_, path); + task->path() = Path(path, path.size() - 1); +} + } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/JobMgr.h b/core/src/scheduler/JobMgr.h index 4340c9e616a6a353913cfcf8596f71c7ed969da0..b4c706d359bb50e3e53928df448435559e4e0816 100644 --- a/core/src/scheduler/JobMgr.h +++ b/core/src/scheduler/JobMgr.h @@ -52,9 +52,12 @@ class JobMgr { void worker_function(); - std::vector + static std::vector build_task(const JobPtr& job); + void + calculate_path(const TaskPtr& task); + private: bool running_ = false; std::queue queue_; diff --git a/core/src/scheduler/SchedInst.cpp b/core/src/scheduler/SchedInst.cpp index 194e0d0e007c77a6120f2884d5d7f5c91832b7d9..0053332746f84acaa5fbb04baab852855abeb0fd 100644 --- a/core/src/scheduler/SchedInst.cpp +++ b/core/src/scheduler/SchedInst.cpp @@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_; OptimizerPtr OptimizerInst::instance = nullptr; std::mutex OptimizerInst::mutex_; +BuildMgrPtr BuildMgrInst::instance = nullptr; +std::mutex BuildMgrInst::mutex_; + void load_simple_config() { server::Config& config = server::Config::GetInstance(); diff --git a/core/src/scheduler/SchedInst.h b/core/src/scheduler/SchedInst.h index 0d2a04b02c09960a8a13b5a478f73508ec9acd24..b9153d3bc3f3805589d461568e8e757b26fd48cb 100644 --- a/core/src/scheduler/SchedInst.h +++ b/core/src/scheduler/SchedInst.h @@ -17,10 +17,12 @@ #pragma once +#include "BuildMgr.h" #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" #include "optimizer/HybridPass.h" +#include "optimizer/LargeSQ8HPass.h" #include "optimizer/Optimizer.h" #include @@ -91,9 +93,9 @@ class OptimizerInst { if (instance == nullptr) { std::lock_guard lock(mutex_); if (instance == nullptr) { - HybridPassPtr pass_ptr = std::make_shared(); std::vector pass_list; - pass_list.push_back(pass_ptr); + pass_list.push_back(std::make_shared()); + pass_list.push_back(std::make_shared()); instance = std::make_shared(pass_list); } } @@ -105,6 +107,24 @@ class OptimizerInst { static std::mutex mutex_; }; +class BuildMgrInst { + public: + static BuildMgrPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(4); + } + } + return instance; + } + + private: + static BuildMgrPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index 2f7576de340c48adbdb3d458e8e0d27c2058a126..c449728c8f30de5be519e29a9e6aa485faeb3cfb 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -18,6 +18,7 @@ #include "scheduler/TaskTable.h" #include "Utils.h" #include "event/TaskTableUpdatedEvent.h" +#include "scheduler/SchedInst.h" #include "utils/Log.h" #include @@ -164,6 +165,13 @@ TaskTable::PickToLoad(uint64_t limit) { if (not table_[j]) { SERVER_LOG_WARNING << "table[" << j << "] is nullptr"; } + + if (table_[j]->task->path().Current() == "cpu") { + if (table_[j]->task->Type() == TaskType::BuildIndexTask && BuildMgrInst::GetInstance()->numoftasks() < 1) { + return std::vector(); + } + } + if (table_[j]->state == TaskTableItemState::LOADED) { ++count; if (count > 2) @@ -177,9 +185,21 @@ TaskTable::PickToLoad(uint64_t limit) { if (not cross && table_[i]->IsFinish()) { last_finish_ = i; } else if (table_[i]->state == TaskTableItemState::START) { - cross = true; - indexes.push_back(i); - ++count; + auto task = table_[i]->task; + if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") { + if (BuildMgrInst::GetInstance()->numoftasks() == 0) { + break; + } else { + cross = true; + indexes.push_back(i); + ++count; + BuildMgrInst::GetInstance()->take(); + } + } else { + cross = true; + indexes.push_back(i); + ++count; + } } } return indexes; diff --git a/core/src/scheduler/action/PushTaskToNeighbour.cpp b/core/src/scheduler/action/PushTaskToNeighbour.cpp index 95f8212297dd56e042a5fae4631a0177912e3ae6..b42234d0f6f9f36f20c8eb29d0de39fa10f687b5 100644 --- a/core/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/core/src/scheduler/action/PushTaskToNeighbour.cpp @@ -145,37 +145,39 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr transport_costs.push_back(transport_cost); paths.emplace_back(path); } - if (task->job_.lock()->type() == JobType::SEARCH) { - auto label = task->label(); - auto spec_label = std::static_pointer_cast(label); - if (spec_label->resource().lock()->type() == ResourceType::CPU) { - std::vector spec_path; - spec_path.push_back(spec_label->resource().lock()->name()); - spec_path.push_back(resource->name()); - task->path() = Path(spec_path, spec_path.size() - 1); - } else { - // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx = 0; - for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->TotalTasks() == 0) { - min_cost_idx = i; - break; - } - uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + - transport_costs[i]; - if (min_cost > cost) { - min_cost = cost; - min_cost_idx = i; - } - } - - // step 3: set path in task - Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); - task->path() = task_path; - } - - } else if (task->job_.lock()->type() == JobType::BUILD) { + // if (task->job_.lock()->type() == JobType::SEARCH) { + // auto label = task->label(); + // auto spec_label = std::static_pointer_cast(label); + // if (spec_label->resource().lock()->type() == ResourceType::CPU) { + // std::vector spec_path; + // spec_path.push_back(spec_label->resource().lock()->name()); + // spec_path.push_back(resource->name()); + // task->path() = Path(spec_path, spec_path.size() - 1); + // } else { + // // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + // uint64_t min_cost = std::numeric_limits::max(); + // uint64_t min_cost_idx = 0; + // for (uint64_t i = 0; i < compute_resources.size(); ++i) { + // if (compute_resources[i]->TotalTasks() == 0) { + // min_cost_idx = i; + // break; + // } + // uint64_t cost = compute_resources[i]->TaskAvgCost() * + // compute_resources[i]->NumOfTaskToExec() + + // transport_costs[i]; + // if (min_cost > cost) { + // min_cost = cost; + // min_cost_idx = i; + // } + // } + // + // // step 3: set path in task + // Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + // task->path() = task_path; + // } + // + // } else + if (task->job_.lock()->type() == JobType::BUILD) { // step2: Read device id in config // get build index gpu resource server::Config& config = server::Config::GetInstance(); @@ -201,12 +203,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr } if (resource->name() == task->path().Last()) { - resource->WakeupLoader(); + resource->WakeupExecutor(); } else { auto next_res_name = task->path().Next(); auto next_res = res_mgr.lock()->GetResource(next_res_name); - event->task_table_item_->Move(); - next_res->task_table().Put(task); + if (event->task_table_item_->Move()) { + next_res->task_table().Put(task); + } } } diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 8fea475d70b834f92aa66e49e33f1f8848ed7940..e99ee252551ec388f77a09879b81a5939a260112 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -16,9 +16,11 @@ // under the License. #include "scheduler/resource/Resource.h" +#include "scheduler/SchedInst.h" #include "scheduler/Utils.h" #include +#include #include namespace milvus { @@ -111,11 +113,18 @@ Resource::pick_task_load() { TaskTableItemPtr Resource::pick_task_execute() { - auto indexes = task_table_.PickToExecute(3); + auto indexes = task_table_.PickToExecute(std::numeric_limits::max()); for (auto index : indexes) { // try to set one task executing, then return - if (task_table_.Execute(index)) + if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) { + if (task_table_[index]->task->path().Last() != name()) { + continue; + } + } + + if (task_table_.Execute(index)) { return task_table_.Get(index); + } // else try next } return nullptr; @@ -167,6 +176,12 @@ Resource::executor_function() { total_cost_ += finish - start; task_item->Executed(); + + if (task_item->task->Type() == TaskType::BuildIndexTask) { + BuildMgrInst::GetInstance()->Put(); + ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader(); + } + if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/core/src/scheduler/task/Path.h b/core/src/scheduler/task/Path.h index c23db9bb09967382042efa7208cae772397f6d79..30e77a17b8ed0a20fee248857d7fa32f19999bd4 100644 --- a/core/src/scheduler/task/Path.h +++ b/core/src/scheduler/task/Path.h @@ -40,13 +40,22 @@ class Path { return path_; } + std::string + Current() { + if (!path_.empty() && path_.size() > index_) { + return path_[index_]; + } else { + return ""; + } + } + std::string Next() { if (index_ > 0 && !path_.empty()) { --index_; return path_[index_]; } else { - return nullptr; + return ""; } } diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 9925a8bcf85511d83759ea2fd88aacca1ba7f44a..b5f1599eba269c0ed35b2330a3efc225293a9dc7 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -22,6 +22,7 @@ #include "utils/Log.h" #include "utils/TimeRecorder.h" +#include #include #include #include @@ -33,8 +34,6 @@ namespace scheduler { static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000; static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; -std::mutex XSearchTask::merge_mutex_; - // TODO(wxyu): remove unused code // bool // NeedParallelReduce(uint64_t nq, uint64_t topk) { @@ -121,7 +120,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { stat = index_engine_->Load(); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - stat = index_engine_->CopyToGpu(device_id); + bool hybrid = false; + if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) { + hybrid = true; + } + stat = index_engine_->CopyToGpu(device_id, hybrid); type_str = "CPU2GPU"; } else if (type == LoadType::GPU2CPU) { stat = index_engine_->CopyToCpu(); @@ -204,14 +207,20 @@ XSearchTask::Execute() { try { // step 2: search - index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data()); + bool hybrid = false; + if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H && + ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) { + hybrid = true; + } + index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data(), hybrid); double span = rc.RecordSection(hdr + ", do search"); // search_job->AccumSearchCost(span); // step 3: pick up topk result auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk; - XSearchTask::TopkResult(output_ids, output_distance, spec_k, nq, topk, metric_l2, search_job->GetResult()); + XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, metric_l2, + search_job->GetResult()); span = rc.RecordSection(hdr + ", reduce topk"); // search_job->AccumReduceCost(span); @@ -220,7 +229,7 @@ XSearchTask::Execute() { // search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed } - // step 5: notify to send result to client + // step 4: notify to send result to client search_job->SearchDone(index_id_); } @@ -230,36 +239,37 @@ XSearchTask::Execute() { index_engine_ = nullptr; } -Status -XSearchTask::TopkResult(const std::vector& input_ids, const std::vector& input_distance, - uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result) { - scheduler::ResultSet result_buf; - +void +XSearchTask::MergeTopkToResultSet(const std::vector& input_ids, const std::vector& input_distance, + uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, + scheduler::ResultSet& result) { if (result.empty()) { - result_buf.resize(nq, scheduler::Id2DistVec(input_k, scheduler::IdDistPair(-1, 0.0))); - for (auto i = 0; i < nq; ++i) { - auto& result_buf_i = result_buf[i]; + result.resize(nq); + } + + for (uint64_t i = 0; i < nq; i++) { + scheduler::Id2DistVec result_buf; + auto& result_i = result[i]; + + if (result[i].empty()) { + result_buf.resize(input_k, scheduler::IdDistPair(-1, 0.0)); uint64_t input_k_multi_i = input_k * i; for (auto k = 0; k < input_k; ++k) { uint64_t idx = input_k_multi_i + k; - auto& result_buf_item = result_buf_i[k]; + auto& result_buf_item = result_buf[k]; result_buf_item.first = input_ids[idx]; result_buf_item.second = input_distance[idx]; } - } - } else { - size_t tar_size = result[0].size(); - uint64_t output_k = std::min(topk, input_k + tar_size); - result_buf.resize(nq, scheduler::Id2DistVec(output_k, scheduler::IdDistPair(-1, 0.0))); - for (auto i = 0; i < nq; ++i) { + } else { + size_t tar_size = result_i.size(); + uint64_t output_k = std::min(topk, input_k + tar_size); + result_buf.resize(output_k, scheduler::IdDistPair(-1, 0.0)); size_t buf_k = 0, src_k = 0, tar_k = 0; uint64_t src_idx; - auto& result_i = result[i]; - auto& result_buf_i = result_buf[i]; uint64_t input_k_multi_i = input_k * i; while (buf_k < output_k && src_k < input_k && tar_k < tar_size) { src_idx = input_k_multi_i + src_k; - auto& result_buf_item = result_buf_i[buf_k]; + auto& result_buf_item = result_buf[buf_k]; auto& result_item = result_i[tar_k]; if ((ascending && input_distance[src_idx] < result_item.second) || (!ascending && input_distance[src_idx] > result_item.second)) { @@ -273,11 +283,11 @@ XSearchTask::TopkResult(const std::vector& input_ids, const std::vector buf_k++; } - if (buf_k < topk) { + if (buf_k < output_k) { if (src_k < input_k) { while (buf_k < output_k && src_k < input_k) { src_idx = input_k_multi_i + src_k; - auto& result_buf_item = result_buf_i[buf_k]; + auto& result_buf_item = result_buf[buf_k]; result_buf_item.first = input_ids[src_idx]; result_buf_item.second = input_distance[src_idx]; src_k++; @@ -285,18 +295,79 @@ XSearchTask::TopkResult(const std::vector& input_ids, const std::vector } } else { while (buf_k < output_k && tar_k < tar_size) { - result_buf_i[buf_k] = result_i[tar_k]; + result_buf[buf_k] = result_i[tar_k]; tar_k++; buf_k++; } } } } + + result_i.swap(result_buf); + } +} + +void +XSearchTask::MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, + const std::vector& src_ids, const std::vector& src_distance, + uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) { + if (src_ids.empty() || src_distance.empty()) { + return; } - result.swap(result_buf); + std::vector id_buf(nq * topk, -1); + std::vector dist_buf(nq * topk, 0.0); + + uint64_t output_k = std::min(topk, tar_input_k + src_input_k); + uint64_t buf_k, src_k, tar_k; + uint64_t src_idx, tar_idx, buf_idx; + uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i; + + for (uint64_t i = 0; i < nq; i++) { + src_input_k_multi_i = src_input_k * i; + tar_input_k_multi_i = tar_input_k * i; + buf_k_multi_i = output_k * i; + buf_k = src_k = tar_k = 0; + while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) { + src_idx = src_input_k_multi_i + src_k; + tar_idx = tar_input_k_multi_i + tar_k; + buf_idx = buf_k_multi_i + buf_k; + if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) || + (!ascending && src_distance[src_idx] > tar_distance[tar_idx])) { + id_buf[buf_idx] = src_ids[src_idx]; + dist_buf[buf_idx] = src_distance[src_idx]; + src_k++; + } else { + id_buf[buf_idx] = tar_ids[tar_idx]; + dist_buf[buf_idx] = tar_distance[tar_idx]; + tar_k++; + } + buf_k++; + } + + if (buf_k < output_k) { + if (src_k < src_input_k) { + while (buf_k < output_k && src_k < src_input_k) { + src_idx = src_input_k_multi_i + src_k; + id_buf[buf_idx] = src_ids[src_idx]; + dist_buf[buf_idx] = src_distance[src_idx]; + src_k++; + buf_k++; + } + } else { + while (buf_k < output_k && tar_k < tar_input_k) { + id_buf[buf_idx] = tar_ids[tar_idx]; + dist_buf[buf_idx] = tar_distance[tar_idx]; + tar_k++; + buf_k++; + } + } + } + } - return Status::OK(); + tar_ids.swap(id_buf); + tar_distance.swap(dist_buf); + tar_input_k = output_k; } } // namespace scheduler diff --git a/core/src/scheduler/task/SearchTask.h b/core/src/scheduler/task/SearchTask.h index fd5c8a0d1d5dacd838429330f9e8d0ab44cb77de..6a7381e0e66d5fd138026db4b17e2583f998fadd 100644 --- a/core/src/scheduler/task/SearchTask.h +++ b/core/src/scheduler/task/SearchTask.h @@ -38,9 +38,14 @@ class XSearchTask : public Task { Execute() override; public: - static Status - TopkResult(const std::vector& input_ids, const std::vector& input_distance, uint64_t input_k, - uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result); + static void + MergeTopkToResultSet(const std::vector& input_ids, const std::vector& input_distance, + uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result); + + static void + MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, + const std::vector& src_ids, const std::vector& src_distance, uint64_t src_input_k, + uint64_t nq, uint64_t topk, bool ascending); public: TableFileSchemaPtr file_; @@ -49,8 +54,6 @@ class XSearchTask : public Task { int index_type_ = 0; ExecutionEnginePtr index_engine_ = nullptr; bool metric_l2 = true; - - static std::mutex merge_mutex_; }; } // namespace scheduler diff --git a/core/src/wrapper/VecImpl.cpp b/core/src/wrapper/VecImpl.cpp index 1ed20c8029b76408078fe76b57c33d9f22530e80..c97900f839f23634669675acaa8d1ae229c1dc19 100644 --- a/core/src/wrapper/VecImpl.cpp +++ b/core/src/wrapper/VecImpl.cpp @@ -315,24 +315,40 @@ IVFHybridIndex::UnsetQuantizer() { return Status::OK(); } -Status +VecIndexPtr IVFHybridIndex::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { try { // TODO(linxj): Hardcode here if (auto new_idx = std::dynamic_pointer_cast(index_)) { - new_idx->LoadData(q, conf); + return std::make_shared(new_idx->LoadData(q, conf), type); } else { WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type); - return Status(KNOWHERE_ERROR, "not support"); } } catch (knowhere::KnowhereException& e) { WRAPPER_LOG_ERROR << e.what(); - return Status(KNOWHERE_UNEXPECTED_ERROR, e.what()); } catch (std::exception& e) { WRAPPER_LOG_ERROR << e.what(); - return Status(KNOWHERE_ERROR, e.what()); } - return Status::OK(); + return nullptr; +} + +std::pair +IVFHybridIndex::CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) { + try { + // TODO(linxj): Hardcode here + if (auto hybrid_idx = std::dynamic_pointer_cast(index_)) { + auto pair = hybrid_idx->CopyCpuToGpuWithQuantizer(device_id, cfg); + auto new_idx = std::make_shared(pair.first, type); + return std::make_pair(new_idx, pair.second); + } else { + WRAPPER_LOG_ERROR << "Hybrid mode not support for index type: " << int(type); + } + } catch (knowhere::KnowhereException& e) { + WRAPPER_LOG_ERROR << e.what(); + } catch (std::exception& e) { + WRAPPER_LOG_ERROR << e.what(); + } + return std::make_pair(nullptr, nullptr); } } // namespace engine diff --git a/core/src/wrapper/VecImpl.h b/core/src/wrapper/VecImpl.h index fd9bb79c0a9340229881532b50bbdde1f265820d..22d734cf9244458b43e5bd9f7ac27bda2cd7b52a 100644 --- a/core/src/wrapper/VecImpl.h +++ b/core/src/wrapper/VecImpl.h @@ -105,8 +105,10 @@ class IVFHybridIndex : public IVFMixIndex { Status UnsetQuantizer() override; + std::pair + CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg) override; - Status + VecIndexPtr LoadData(const knowhere::QuantizerPtr& q, const Config& conf) override; }; diff --git a/core/src/wrapper/VecIndex.h b/core/src/wrapper/VecIndex.h index f5fdd49466bcd4ce5c8f2ff484dbe8103503f5bd..05da9ccc0374e8752a05fdecd318f9c3d8206702 100644 --- a/core/src/wrapper/VecIndex.h +++ b/core/src/wrapper/VecIndex.h @@ -19,6 +19,7 @@ #include #include +#include #include "cache/DataObj.h" #include "knowhere/common/BinarySet.h" @@ -103,9 +104,9 @@ class VecIndex : public cache::DataObj { return nullptr; } - virtual Status + virtual VecIndexPtr LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { - return Status::OK(); + return nullptr; } virtual Status @@ -117,6 +118,11 @@ class VecIndex : public cache::DataObj { UnsetQuantizer() { return Status::OK(); } + + virtual std::pair + CopyToGpuWithQuantizer(const int64_t& device_id, const Config& cfg = Config()) { + return std::make_pair(nullptr, nullptr); + } //////////////// private: int64_t size_ = 0; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 9e80afbc099014eff6d777cdadfae92217230ce2..9e2730a8dddfd1f3bb112cc75541741671ab11c7 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -297,6 +297,7 @@ TEST_F(DBTest, SEARCH_TEST) { ASSERT_TRUE(stat.ok()); } +#ifdef CUSTOMIZATION //test FAISS_IVFSQ8H optimizer index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; db_->CreateIndex(TABLE_NAME, index); // wait until build index finish @@ -314,9 +315,7 @@ TEST_F(DBTest, SEARCH_TEST) { stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results); ASSERT_TRUE(stat.ok()); } - - - // TODO(lxj): add groundTruth assert +#endif } TEST_F(DBTest, PRELOADTABLE_TEST) { diff --git a/core/unittest/db/test_search.cpp b/core/unittest/db/test_search.cpp index e17e06ac16143a97c4ce610e31f68f5f8fdefc06..b0ce9a28b6e407a74af198837d5597584c902ddc 100644 --- a/core/unittest/db/test_search.cpp +++ b/core/unittest/db/test_search.cpp @@ -21,26 +21,51 @@ #include "scheduler/task/SearchTask.h" #include "utils/TimeRecorder.h" +#include "utils/ThreadPool.h" namespace { namespace ms = milvus::scheduler; void -BuildResult(uint64_t nq, +BuildResult(std::vector& output_ids, + std::vector& output_distance, uint64_t topk, - bool ascending, - std::vector& output_ids, - std::vector& output_distence) { + uint64_t nq, + bool ascending) { output_ids.clear(); output_ids.resize(nq * topk); - output_distence.clear(); - output_distence.resize(nq * topk); + output_distance.clear(); + output_distance.resize(nq * topk); for (uint64_t i = 0; i < nq; i++) { for (uint64_t j = 0; j < topk; j++) { output_ids[i * topk + j] = (int64_t)(drand48() * 100000); - output_distence[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48()); + output_distance[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48()); + } + } +} + +void +CopyResult(std::vector& output_ids, + std::vector& output_distance, + uint64_t output_topk, + std::vector& input_ids, + std::vector& input_distance, + uint64_t input_topk, + uint64_t nq) { + ASSERT_TRUE(input_ids.size() >= nq * input_topk); + ASSERT_TRUE(input_distance.size() >= nq * input_topk); + ASSERT_TRUE(output_topk <= input_topk); + output_ids.clear(); + output_ids.resize(nq * output_topk); + output_distance.clear(); + output_distance.resize(nq * output_topk); + + for (uint64_t i = 0; i < nq; i++) { + for (uint64_t j = 0; j < output_topk; j++) { + output_ids[i * output_topk + j] = input_ids[i * input_topk + j]; + output_distance[i * output_topk + j] = input_distance[i * input_topk + j]; } } } @@ -50,8 +75,8 @@ CheckTopkResult(const std::vector& input_ids_1, const std::vector& input_distance_1, const std::vector& input_ids_2, const std::vector& input_distance_2, - uint64_t nq, uint64_t topk, + uint64_t nq, bool ascending, const milvus::scheduler::ResultSet& result) { ASSERT_EQ(result.size(), nq); @@ -91,43 +116,36 @@ TEST(DBSearchTest, TOPK_TEST) { bool ascending; std::vector ids1, ids2; std::vector dist1, dist2; - milvus::scheduler::ResultSet result; - milvus::Status status; + ms::ResultSet result; /* test1, id1/dist1 valid, id2/dist2 empty */ ascending = true; - BuildResult(NQ, TOP_K, ascending, ids1, dist1); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids1, dist1, TOP_K, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test2, id1/dist1 valid, id2/dist2 valid */ - BuildResult(NQ, TOP_K, ascending, ids2, dist2); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids2, dist2, TOP_K, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test3, id1/dist1 small topk */ ids1.clear(); dist1.clear(); result.clear(); - BuildResult(NQ, TOP_K / 2, ascending, ids1, dist1); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids1, dist1, TOP_K/2, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test4, id1/dist1 small topk, id2/dist2 small topk */ ids2.clear(); dist2.clear(); result.clear(); - BuildResult(NQ, TOP_K / 3, ascending, ids2, dist2); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K / 3, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids2, dist2, TOP_K/3, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); ///////////////////////////////////////////////////////////////////////////////////////// ascending = false; @@ -138,71 +156,199 @@ TEST(DBSearchTest, TOPK_TEST) { result.clear(); /* test1, id1/dist1 valid, id2/dist2 empty */ - BuildResult(NQ, TOP_K, ascending, ids1, dist1); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids1, dist1, TOP_K, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test2, id1/dist1 valid, id2/dist2 valid */ - BuildResult(NQ, TOP_K, ascending, ids2, dist2); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids2, dist2, TOP_K, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test3, id1/dist1 small topk */ ids1.clear(); dist1.clear(); result.clear(); - BuildResult(NQ, TOP_K / 2, ascending, ids1, dist1); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids1, dist1, TOP_K/2, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); /* test4, id1/dist1 small topk, id2/dist2 small topk */ ids2.clear(); dist2.clear(); result.clear(); - BuildResult(NQ, TOP_K / 3, ascending, ids2, dist2); - status = milvus::scheduler::XSearchTask::TopkResult(ids1, dist1, TOP_K / 2, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - status = milvus::scheduler::XSearchTask::TopkResult(ids2, dist2, TOP_K / 3, NQ, TOP_K, ascending, result); - ASSERT_TRUE(status.ok()); - CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); + BuildResult(ids2, dist2, TOP_K/3, NQ, ascending); + ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); + CheckTopkResult(ids1, dist1, ids2, dist2, TOP_K, NQ, ascending, result); } TEST(DBSearchTest, REDUCE_PERF_TEST) { - int32_t nq = 100; - int32_t top_k = 1000; int32_t index_file_num = 478; /* sift1B dataset, index files num */ bool ascending = true; + + std::vector thread_vec = {4, 8, 11}; + std::vector nq_vec = {1, 10, 100, 1000}; + std::vector topk_vec = {1, 4, 16, 64, 256, 1024}; + int32_t NQ = nq_vec[nq_vec.size()-1]; + int32_t TOPK = topk_vec[topk_vec.size()-1]; + + std::vector> id_vec; + std::vector> dist_vec; std::vector input_ids; std::vector input_distance; - milvus::scheduler::ResultSet final_result; - milvus::Status status; - - double span, reduce_cost = 0.0; - milvus::TimeRecorder rc(""); - - for (int32_t i = 0; i < index_file_num; i++) { - BuildResult(nq, top_k, ascending, input_ids, input_distance); - - rc.RecordSection("do search for context: " + std::to_string(i)); - - // pick up topk result - status = milvus::scheduler::XSearchTask::TopkResult(input_ids, - input_distance, - top_k, - nq, - top_k, - ascending, - final_result); - ASSERT_TRUE(status.ok()); - ASSERT_EQ(final_result.size(), nq); - - span = rc.RecordSection("reduce topk for context: " + std::to_string(i)); - reduce_cost += span; + int32_t i, k, step; + + /* generate testing data */ + for (i = 0; i < index_file_num; i++) { + BuildResult(input_ids, input_distance, TOPK, NQ, ascending); + id_vec.push_back(input_ids); + dist_vec.push_back(input_distance); + } + + for (int32_t max_thread_num : thread_vec) { + milvus::ThreadPool threadPool(max_thread_num); + std::list> threads_list; + + for (int32_t nq : nq_vec) { + for (int32_t top_k : topk_vec) { + ms::ResultSet final_result, final_result_2, final_result_3; + + std::vector> id_vec_1(index_file_num); + std::vector> dist_vec_1(index_file_num); + for (i = 0; i < index_file_num; i++) { + CopyResult(id_vec_1[i], dist_vec_1[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); + } + + std::string str1 = "Method-1 " + std::to_string(max_thread_num) + " " + + std::to_string(nq) + " " + std::to_string(top_k); + milvus::TimeRecorder rc1(str1); + + /////////////////////////////////////////////////////////////////////////////////////// + /* method-1 */ + for (i = 0; i < index_file_num; i++) { + ms::XSearchTask::MergeTopkToResultSet(id_vec_1[i], + dist_vec_1[i], + top_k, + nq, + top_k, + ascending, + final_result); + ASSERT_EQ(final_result.size(), nq); + } + + rc1.RecordSection("reduce done"); + + /////////////////////////////////////////////////////////////////////////////////////// + /* method-2 */ + std::vector> id_vec_2(index_file_num); + std::vector> dist_vec_2(index_file_num); + std::vector k_vec_2(index_file_num); + for (i = 0; i < index_file_num; i++) { + CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); + k_vec_2[i] = top_k; + } + + std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " + + std::to_string(nq) + " " + std::to_string(top_k); + milvus::TimeRecorder rc2(str2); + + for (step = 1; step < index_file_num; step *= 2) { + for (i = 0; i + step < index_file_num; i += step * 2) { + ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i], + id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step], + nq, top_k, ascending); + } + } + ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0], + dist_vec_2[0], + k_vec_2[0], + nq, + top_k, + ascending, + final_result_2); + ASSERT_EQ(final_result_2.size(), nq); + + rc2.RecordSection("reduce done"); + + for (i = 0; i < nq; i++) { + ASSERT_EQ(final_result[i].size(), final_result_2[i].size()); + for (k = 0; k < final_result[i].size(); k++) { + if (final_result[i][k].first != final_result_2[i][k].first) { + std::cout << i << " " << k << std::endl; + } + ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first); + ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second); + } + } + + /////////////////////////////////////////////////////////////////////////////////////// + /* method-3 parallel */ + std::vector> id_vec_3(index_file_num); + std::vector> dist_vec_3(index_file_num); + std::vector k_vec_3(index_file_num); + for (i = 0; i < index_file_num; i++) { + CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); + k_vec_3[i] = top_k; + } + + std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " + + std::to_string(nq) + " " + std::to_string(top_k); + milvus::TimeRecorder rc3(str3); + + for (step = 1; step < index_file_num; step *= 2) { + for (i = 0; i + step < index_file_num; i += step * 2) { + threads_list.push_back( + threadPool.enqueue(ms::XSearchTask::MergeTopkArray, + std::ref(id_vec_3[i]), + std::ref(dist_vec_3[i]), + std::ref(k_vec_3[i]), + std::ref(id_vec_3[i + step]), + std::ref(dist_vec_3[i + step]), + std::ref(k_vec_3[i + step]), + nq, + top_k, + ascending)); + } + + while (threads_list.size() > 0) { + int nready = 0; + for (auto it = threads_list.begin(); it != threads_list.end(); it = it) { + auto &p = *it; + std::chrono::milliseconds span(0); + if (p.wait_for(span) == std::future_status::ready) { + threads_list.erase(it++); + ++nready; + } else { + ++it; + } + } + + if (nready == 0) { + std::this_thread::yield(); + } + } + } + ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0], + dist_vec_3[0], + k_vec_3[0], + nq, + top_k, + ascending, + final_result_3); + ASSERT_EQ(final_result_3.size(), nq); + + rc3.RecordSection("reduce done"); + + for (i = 0; i < nq; i++) { + ASSERT_EQ(final_result[i].size(), final_result_3[i].size()); + for (k = 0; k < final_result[i].size(); k++) { + ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first); + ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second); + } + } + } + } } - std::cout << "total reduce time: " << reduce_cost / 1000 << " ms" << std::endl; } diff --git a/core/unittest/scheduler/test_resource.cpp b/core/unittest/scheduler/test_resource.cpp index 9d859d6243a2755943badfcff4832173dad09c31..1ff0d9fdc19b00e076fe9fa4cd590066ba6c16b9 100644 --- a/core/unittest/scheduler/test_resource.cpp +++ b/core/unittest/scheduler/test_resource.cpp @@ -184,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test { }; TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { - const uint64_t NUM = 10; + const uint64_t NUM = max_once_load; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { diff --git a/core/unittest/scheduler/test_resource_mgr.cpp b/core/unittest/scheduler/test_resource_mgr.cpp index 34e6b50c49fe4df11975a21e860177194c1155d9..2534f66439b790a95fe3532fdc27f5f2a15c1199 100644 --- a/core/unittest/scheduler/test_resource_mgr.cpp +++ b/core/unittest/scheduler/test_resource_mgr.cpp @@ -165,7 +165,9 @@ class ResourceMgrAdvanceTest : public testing::Test { SetUp() override { mgr1_ = std::make_shared(); disk_res = std::make_shared("disk", 0, true, false); + cpu_res = std::make_shared("cpu", 0, true, true); mgr1_->Add(ResourcePtr(disk_res)); + mgr1_->Add(ResourcePtr(cpu_res)); mgr1_->Start(); } @@ -176,6 +178,7 @@ class ResourceMgrAdvanceTest : public testing::Test { ResourceMgrPtr mgr1_; ResourcePtr disk_res; + ResourcePtr cpu_res; }; TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) { diff --git a/core/unittest/scheduler/test_scheduler.cpp b/core/unittest/scheduler/test_scheduler.cpp index a107040a0b9b72be863766e4e2e47bdbc762d97b..aebdfa2af2aea1da72cce414ae78e9528a35050c 100644 --- a/core/unittest/scheduler/test_scheduler.cpp +++ b/core/unittest/scheduler/test_scheduler.cpp @@ -28,18 +28,17 @@ #include "utils/Error.h" #include "wrapper/VecIndex.h" - namespace milvus { namespace scheduler { class MockVecIndex : public engine::VecIndex { public: - virtual Status BuildAll(const int64_t &nb, - const float *xb, - const int64_t *ids, - const engine::Config &cfg, - const int64_t &nt = 0, - const float *xt = nullptr) { + virtual Status BuildAll(const int64_t& nb, + const float* xb, + const int64_t* ids, + const engine::Config& cfg, + const int64_t& nt = 0, + const float* xt = nullptr) { } engine::VecIndexPtr Clone() override { @@ -54,23 +53,23 @@ class MockVecIndex : public engine::VecIndex { return engine::IndexType::INVALID; } - virtual Status Add(const int64_t &nb, - const float *xb, - const int64_t *ids, - const engine::Config &cfg = engine::Config()) { + virtual Status Add(const int64_t& nb, + const float* xb, + const int64_t* ids, + const engine::Config& cfg = engine::Config()) { } - virtual Status Search(const int64_t &nq, - const float *xq, - float *dist, - int64_t *ids, - const engine::Config &cfg = engine::Config()) { + virtual Status Search(const int64_t& nq, + const float* xq, + float* dist, + int64_t* ids, + const engine::Config& cfg = engine::Config()) { } - engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { + engine::VecIndexPtr CopyToGpu(const int64_t& device_id, const engine::Config& cfg) override { } - engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { + engine::VecIndexPtr CopyToCpu(const engine::Config& cfg) override { } virtual int64_t Dimension() { @@ -86,7 +85,7 @@ class MockVecIndex : public engine::VecIndex { return binset; } - virtual Status Load(const knowhere::BinarySet &index_binary) { + virtual Status Load(const knowhere::BinarySet& index_binary) { } public: @@ -102,11 +101,13 @@ class SchedulerTest : public testing::Test { cache::GpuCacheMgr::GetInstance(0)->SetCapacity(cache_cap); cache::GpuCacheMgr::GetInstance(1)->SetCapacity(cache_cap); + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); res_mgr_ = std::make_shared(); + disk_resource_ = res_mgr_->Add(std::move(disk)); cpu_resource_ = res_mgr_->Add(std::move(cpu)); gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); @@ -127,6 +128,7 @@ class SchedulerTest : public testing::Test { res_mgr_->Stop(); } + ResourceWPtr disk_resource_; ResourceWPtr cpu_resource_; ResourceWPtr gpu_resource_0_; ResourceWPtr gpu_resource_1_; @@ -137,7 +139,7 @@ class SchedulerTest : public testing::Test { void insert_dummy_index_into_gpu_cache(uint64_t device_id) { - MockVecIndex *mock_index = new MockVecIndex(); + MockVecIndex* mock_index = new MockVecIndex(); mock_index->ntotal_ = 1000; engine::VecIndexPtr index(mock_index); @@ -224,6 +226,7 @@ class SchedulerTest2 : public testing::Test { TearDown() override { scheduler_->Stop(); res_mgr_->Stop(); + res_mgr_->Clear(); } ResourceWPtr disk_; @@ -237,22 +240,22 @@ class SchedulerTest2 : public testing::Test { std::shared_ptr scheduler_; }; -TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) { - const uint64_t NUM = 10; - std::vector> tasks; - TableFileSchemaPtr dummy = std::make_shared(); - dummy->location_ = "location"; - - for (uint64_t i = 0; i < NUM; ++i) { - auto label = std::make_shared(); - std::shared_ptr task = std::make_shared(dummy, label); - task->label() = std::make_shared(disk_); - tasks.push_back(task); - disk_.lock()->task_table().Put(task); - } +//TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) { +// const uint64_t NUM = 2; +// std::vector> tasks; +// TableFileSchemaPtr dummy = std::make_shared(); +// dummy->location_ = "location"; +// +// for (uint64_t i = 0; i < NUM; ++i) { +// auto label = std::make_shared(); +// std::shared_ptr task = std::make_shared(dummy, label); +// task->label() = std::make_shared(disk_); +// tasks.push_back(task); +// disk_.lock()->task_table().Put(task); +// } // ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -} +//} } // namespace scheduler } // namespace milvus diff --git a/core/unittest/wrapper/test_wrapper.cpp b/core/unittest/wrapper/test_wrapper.cpp index fe8cc3d91451a163c5b656b44bf918eca4b98e58..7accef649c8bc3986b0333acd01647f6c04a6bc0 100644 --- a/core/unittest/wrapper/test_wrapper.cpp +++ b/core/unittest/wrapper/test_wrapper.cpp @@ -188,7 +188,7 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest, 10, 10), std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_CPU, "Default", DIM, NB, 10, 10), - std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10), +// std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10), std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_MIX, "Default", DIM, NB, 10, 10), // std::make_tuple(IndexType::NSG_MIX, "Default", 128, 250000, 10, 10), // std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", 128, 250000, 10, 10), diff --git a/cpp/src/scheduler/BuildMgr.cpp b/cpp/src/scheduler/BuildMgr.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d90a074d30442a83081627e9bd68f447df0cffdf --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scheduler/BuildMgr.h" + +namespace milvus { +namespace scheduler {} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/BuildMgr.h b/cpp/src/scheduler/BuildMgr.h new file mode 100644 index 0000000000000000000000000000000000000000..ee7ab38e2594c50e4218bdfadb59393117af436f --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace milvus { +namespace scheduler { + +class BuildMgr { + public: + explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) { + } + + public: + void + Put() { + ++numoftasks_; + } + + void + take() { + --numoftasks_; + } + + int64_t + numoftasks() { + return (int64_t)numoftasks_; + } + + private: + std::atomic_long numoftasks_; +}; + +using BuildMgrPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp b/cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp new file mode 100644 index 0000000000000000000000000000000000000000..62d0e57902746ab3ac7afdcd4bc002c30dfa03ce --- /dev/null +++ b/cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scheduler/optimizer/LargeSQ8HPass.h" +#include "cache/GpuCacheMgr.h" +#include "scheduler/SchedInst.h" +#include "scheduler/Utils.h" +#include "scheduler/task/SearchTask.h" +#include "scheduler/tasklabel/SpecResLabel.h" +#include "utils/Log.h" + +namespace milvus { +namespace scheduler { + +bool +LargeSQ8HPass::Run(const TaskPtr& task) { + if (task->Type() != TaskType::SearchTask) { + return false; + } + + auto search_task = std::static_pointer_cast(task); + if (search_task->file_->engine_type_ != (int)engine::EngineType::FAISS_IVFSQ8H) { + return false; + } + + auto search_job = std::static_pointer_cast(search_task->job_.lock()); + + // TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu + if (search_job->nq() < 100) { + return false; + } + + std::vector gpus = scheduler::get_gpu_pool(); + std::vector all_free_mem; + for (auto& gpu : gpus) { + auto cache = cache::GpuCacheMgr::GetInstance(gpu); + auto free_mem = cache->CacheCapacity() - cache->CacheUsage(); + all_free_mem.push_back(free_mem); + } + + auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end()); + auto best_index = std::distance(all_free_mem.begin(), max_e); + auto best_device_id = gpus[best_index]; + + ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, best_device_id); + if (not res_ptr) { + SERVER_LOG_ERROR << "GpuResource " << best_device_id << " invalid."; + // TODO: throw critical error and exit + return false; + } + + auto label = std::make_shared(std::weak_ptr(res_ptr)); + task->label() = label; + + return true; +} + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/LargeSQ8HPass.h b/cpp/src/scheduler/optimizer/LargeSQ8HPass.h new file mode 100644 index 0000000000000000000000000000000000000000..49e658002f0696a401b436474eb878d18348d041 --- /dev/null +++ b/cpp/src/scheduler/optimizer/LargeSQ8HPass.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Pass.h" + +namespace milvus { +namespace scheduler { + +class LargeSQ8HPass : public Pass { + public: + LargeSQ8HPass() = default; + + public: + bool + Run(const TaskPtr& task) override; +}; + +using LargeSQ8HPassPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus