diff --git a/ci/jenkinsfile/milvus_build.groovy b/ci/jenkinsfile/milvus_build.groovy index db5a0fea7418bf6b5103f36a73cac6f6a4dad2b7..0eb9cd7c999ff272da22e48cb81cb0f0ee6d0d9a 100644 --- a/ci/jenkinsfile/milvus_build.groovy +++ b/ci/jenkinsfile/milvus_build.groovy @@ -3,13 +3,16 @@ container('milvus-build-env') { gitlabCommitStatus(name: 'Build Engine') { dir ("milvus_engine") { try { - def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/thirdparty/knowhere/cmake_build" + def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build" checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) + + /* dir ("cpp/thirdparty/knowhere") { checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j" } + */ dir ("cpp") { sh "git config --global user.email \"test@zilliz.com\"" diff --git a/ci/jenkinsfile/milvus_build_no_ut.groovy b/ci/jenkinsfile/milvus_build_no_ut.groovy index 750c47e4c33f4eccda9e3b72f83436433dc7f752..8bbb568feeee7f0e68a243f86c1a83db6dd42a24 100644 --- a/ci/jenkinsfile/milvus_build_no_ut.groovy +++ b/ci/jenkinsfile/milvus_build_no_ut.groovy @@ -3,14 +3,16 @@ container('milvus-build-env') { gitlabCommitStatus(name: 'Build Engine') { dir ("milvus_engine") { try { - def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/thirdparty/knowhere/cmake_build" + def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build" checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) + /* dir ("cpp/thirdparty/knowhere") { checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]]) sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j" } + */ dir ("cpp") { sh "git config --global user.email \"test@zilliz.com\"" diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index bf3a13da2d1b422c6a287cef4bb78128ebe33837..67a17013f4a97df1a9b46544c66b0ec06adbaa09 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -16,6 +16,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9)) - MS-430 - Search no result if index created with FLAT - MS-443 - Create index hang again +- MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8 +- MS-450 - server hang after run stop_server.sh +- MS-449 - Add vectors twice success, once with ids, the other no ids ## Improvement - MS-327 - Clean code for milvus @@ -70,6 +73,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-440 - Add DumpTaskTables in sdk - MS-442 - Merge Knowhere - MS-445 - Rename CopyCompleted to LoadCompleted +- MS-451 - Update server_config.template file, set GPU compute default ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/build.sh b/cpp/build.sh index f61a5ea2e91a0e60ef0a686df3059229eeb48c2a..b2dba3c87086d294d4d24528e1b85cbd51b2db36 100755 --- a/cpp/build.sh +++ b/cpp/build.sh @@ -2,7 +2,6 @@ BUILD_TYPE="Debug" BUILD_UNITTEST="OFF" -LICENSE_CHECK="OFF" INSTALL_PREFIX=$(pwd)/milvus MAKE_CLEAN="OFF" BUILD_COVERAGE="OFF" @@ -11,12 +10,14 @@ PROFILING="OFF" BUILD_FAISS_WITH_MKL="OFF" USE_JFROG_CACHE="OFF" KNOWHERE_BUILD_DIR="`pwd`/src/core/cmake_build" +KNOWHERE_OPTIONS="-t ${BUILD_TYPE}" -while getopts "p:d:t:k:uhlrcgmj" arg +while getopts "p:d:t:k:uhrcgmj" arg do case $arg in t) BUILD_TYPE=$OPTARG # BUILD_TYPE + KNOWHERE_OPTIONS="-t ${BUILD_TYPE}" ;; u) echo "Build and run unittest cases" ; @@ -28,9 +29,6 @@ do d) DB_PATH=$OPTARG ;; - l) - LICENSE_CHECK="ON" - ;; r) if [[ -d cmake_build ]]; then rm ./cmake_build -r @@ -51,6 +49,7 @@ do ;; j) USE_JFROG_CACHE="ON" + KNOWHERE_OPTIONS="${KNOWHERE_OPTIONS} -j" ;; h) # help echo " @@ -60,7 +59,6 @@ parameter: -u: building unit test options(default: OFF) -p: install prefix(default: $(pwd)/milvus) -d: db path(default: /opt/milvus) --l: build license version(default: OFF) -r: remove previous build directory(default: OFF) -c: code coverage(default: OFF) -g: profiling(default: OFF) @@ -85,8 +83,12 @@ if [[ ! -d cmake_build ]]; then MAKE_CLEAN="ON" fi +pushd `pwd`/src/core +./build.sh ${KNOWHERE_OPTIONS} +popd + cd cmake_build -git + CUDA_COMPILER=/usr/local/cuda/bin/nvcc if [[ ${MAKE_CLEAN} == "ON" ]]; then @@ -94,7 +96,6 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} -DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ -DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \ - -DCMAKE_LICENSE_CHECK=${LICENSE_CHECK} \ -DBUILD_COVERAGE=${BUILD_COVERAGE} \ -DMILVUS_DB_PATH=${DB_PATH} \ -DMILVUS_ENABLE_PROFILING=${PROFILING} \ diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index 53d65bd90611c3c60108e61c850799113e692465..f0b353e7b2d7bde7a5c4753e654fa45818285884 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -309,7 +309,8 @@ else() # set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz") endif() -set(FAISS_MD5 "a589663865a8558205533c8ac414278c") +# set(FAISS_MD5 "a589663865a8558205533c8ac414278c") +set(FAISS_MD5 "31167ecbd1903fec600dc4ac00b9be9e") if(DEFINED ENV{MILVUS_KNOWHERE_URL}) set(KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}") diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 386b61d1f102f086bd38643da0ea1c876cf69284..daf75459da9d3b85b3f0d3df1c82267eeefe73b4 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -64,9 +64,9 @@ resource_config: memory: 64 device_id: 0 enable_loader: true - enable_executor: true + enable_executor: false - gtx1060: + gpu0: type: GPU memory: 6 device_id: 0 @@ -80,10 +80,17 @@ resource_config: enable_loader: false enable_executor: false +# gtx1660: +# type: GPU +# memory: 6 +# device_id: 1 +# enable_loader: true +# enable_executor: true + # connection list, length: 0~N # format: -${resource_name}===${resource_name} connections: - ssda===cpu - - cpu===gtx1060 - - cpu===gtx1660 + - cpu===gpu0 +# - cpu===gtx1660 diff --git a/cpp/src/core/cmake/ThirdPartyPackages.cmake b/cpp/src/core/cmake/ThirdPartyPackages.cmake index 981e1bd558a29e1c01d2b9619b30767ec2a7d1ee..945f50fb73d080e836b35f13584fc1726dc5b7ed 100644 --- a/cpp/src/core/cmake/ThirdPartyPackages.cmake +++ b/cpp/src/core/cmake/ThirdPartyPackages.cmake @@ -260,7 +260,8 @@ else() # set(FAISS_SOURCE_URL "${CMAKE_SOURCE_DIR}/thirdparty/faiss-1.5.3") message(STATUS ${FAISS_SOURCE_URL}) endif() -set(FAISS_MD5 "a589663865a8558205533c8ac414278c") +# set(FAISS_MD5 "a589663865a8558205533c8ac414278c") +set(FAISS_MD5 "31167ecbd1903fec600dc4ac00b9be9e") if(DEFINED ENV{KNOWHERE_ARROW_URL}) set(ARROW_SOURCE_URL "$ENV{KNOWHERE_ARROW_URL}") @@ -924,7 +925,7 @@ macro(build_faiss) if(USE_JFROG_CACHE STREQUAL "ON") # Check_Last_Modify("${CMAKE_SOURCE_DIR}/thirdparty/faiss_cache_check_lists.txt" "${CMAKE_SOURCE_DIR}" FAISS_LAST_MODIFIED_COMMIT_ID) string(MD5 FAISS_COMBINE_MD5 "${FAISS_MD5}${LAPACK_MD5}${OPENBLAS_MD5}") - string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}") + # string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}") set(FAISS_CACHE_PACKAGE_NAME "faiss_${FAISS_COMBINE_MD5}.tar.gz") set(FAISS_CACHE_URL "${JFROG_ARTFACTORY_CACHE_URL}/${FAISS_CACHE_PACKAGE_NAME}") set(FAISS_CACHE_PACKAGE_PATH "${THIRDPARTY_PACKAGE_CACHE}/${FAISS_CACHE_PACKAGE_NAME}") diff --git a/cpp/src/core/thirdparty/faiss_cache_check_lists.txt b/cpp/src/core/thirdparty/faiss_cache_check_lists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ce75614ca5b2275a61a62e7fd5125493b43baabc --- /dev/null +++ b/cpp/src/core/thirdparty/faiss_cache_check_lists.txt @@ -0,0 +1,13 @@ +# source +src/ +include/ + +# third party +thirdparty/ + +# cmake +cmake/ +CMakeLists.txt + +# script +build.sh \ No newline at end of file diff --git a/cpp/src/db/DB.h b/cpp/src/db/DB.h index 908714a08a61a8a7e50a379b49bbf8f413f09b12..d493c0f78cad2cacc77ac5edd7085dbe78a122b6 100644 --- a/cpp/src/db/DB.h +++ b/cpp/src/db/DB.h @@ -22,6 +22,9 @@ class DB { public: static void Open(const Options& options, DB** dbptr); + virtual Status Start() = 0; + virtual Status Stop() = 0; + virtual Status CreateTable(meta::TableSchema& table_schema_) = 0; virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0; virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0; diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 4f9cd1e792b6bd6d380d6b9e7b2fc88eb66f7a02..b744899d561b98e5a0bd66b62847412158b37e60 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; DBImpl::DBImpl(const Options& options) : options_(options), - shutting_down_(false), + shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); - if (options.mode != Options::MODE::READ_ONLY) { + Start(); +} + +DBImpl::~DBImpl() { + Stop(); +} + +Status DBImpl::Start() { + if (!shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + //for distribute version, some nodes are read only + if (options_.mode != Options::MODE::READ_ONLY) { ENGINE_LOG_TRACE << "StartTimerTasks"; - StartTimerTasks(); + bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } + shutting_down_.store(false, std::memory_order_release); + return Status::OK(); +} + +Status DBImpl::Stop() { + if (shutting_down_.load(std::memory_order_acquire)){ + return Status::OK(); + } + + shutting_down_.store(true, std::memory_order_release); + bg_timer_thread_.join(); + + //wait compaction/buildindex finish + for(auto& result : compact_thread_results_) { + result.wait(); + } + + for(auto& result : index_thread_results_) { + result.wait(); + } + + //makesure all memory data serialized + MemSerialize(); + + return Status::OK(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -162,7 +200,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6 const float *vectors, QueryResults &results) { server::CollectQueryMetrics metrics(nq); - meta::DatesT dates = {meta::Meta::GetDate()}; + meta::DatesT dates = {utils::GetDate()}; Status result = Query(table_id, k, nq, nprobe, vectors, dates, results); return result; @@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch return Status::OK(); } -void DBImpl::StartTimerTasks() { - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); -} - void DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); @@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) { return meta_ptr_->Size(result); } -DBImpl::~DBImpl() { - shutting_down_.store(true, std::memory_order_release); - bg_timer_thread_.join(); - std::set ids; - mem_mgr_->Serialize(ids); -} - } // namespace engine } // namespace milvus } // namespace zilliz diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index b8f43daf83724e5df0a224c6687cd14dcd42fcf0..7fb8295b2f37e72054e1acaaa9621f4d94fe3840 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -36,6 +36,9 @@ class DBImpl : public DB { explicit DBImpl(const Options &options); + Status Start() override; + Status Stop() override; + Status CreateTable(meta::TableSchema &table_schema) override; Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override; @@ -91,18 +94,15 @@ class DBImpl : public DB { ~DBImpl() override; private: - Status - QueryAsync(const std::string &table_id, - const meta::TableFilesSchema &files, - uint64_t k, - uint64_t nq, - uint64_t nprobe, - const float *vectors, - const meta::DatesT &dates, - QueryResults &results); - - - void StartTimerTasks(); + Status QueryAsync(const std::string &table_id, + const meta::TableFilesSchema &files, + uint64_t k, + uint64_t nq, + uint64_t nprobe, + const float *vectors, + const meta::DatesT &dates, + QueryResults &results); + void BackgroundTimerTask(); void StartMetricTask(); diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index 4eec3346b0260b627796f9a191d6479d2d0f5455..efe238d86f4120e343b64725d383bfc199087749 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -152,8 +152,33 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) { && index1.metric_type_ == index2.metric_type_; } -bool UserDefinedId(int64_t flag) { - return flag & meta::FLAG_MASK_USERID; +meta::DateT GetDate(const std::time_t& t, int day_delta) { + struct tm ltm; + localtime_r(&t, <m); + if (day_delta > 0) { + do { + ++ltm.tm_mday; + --day_delta; + } while(day_delta > 0); + mktime(<m); + } else if (day_delta < 0) { + do { + --ltm.tm_mday; + ++day_delta; + } while(day_delta < 0); + mktime(<m); + } else { + ltm.tm_mday; + } + return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday; +} + +meta::DateT GetDateWithDelta(int day_delta) { + return GetDate(std::time(nullptr), day_delta); +} + +meta::DateT GetDate() { + return GetDate(std::time(nullptr), 0); } } // namespace utils diff --git a/cpp/src/db/Utils.h b/cpp/src/db/Utils.h index d6244ebc916b3859aed9c8a18d5c745bed652eaf..2094250a1f3c5fd71b81d742004033b5edaca1cb 100644 --- a/cpp/src/db/Utils.h +++ b/cpp/src/db/Utils.h @@ -10,6 +10,7 @@ #include "db/Types.h" #include +#include namespace zilliz { namespace milvus { @@ -27,7 +28,9 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& bool IsSameIndex(const TableIndex& index1, const TableIndex& index2); -bool UserDefinedId(int64_t flag); +meta::DateT GetDate(const std::time_t &t, int day_delta = 0); +meta::DateT GetDate(); +meta::DateT GetDateWithDelta(int day_delta); } // namespace utils } // namespace engine diff --git a/cpp/src/db/meta/Meta.cpp b/cpp/src/db/meta/Meta.cpp deleted file mode 100644 index a86051a1c483c4edcaee42e20d4ad840e3de6a2a..0000000000000000000000000000000000000000 --- a/cpp/src/db/meta/Meta.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#include "Meta.h" - -#include -#include - -namespace zilliz { -namespace milvus { -namespace engine { -namespace meta { - -Meta::~Meta() = default; - -DateT Meta::GetDate(const std::time_t& t, int day_delta) { - struct tm ltm; - localtime_r(&t, <m); - if (day_delta > 0) { - do { - ++ltm.tm_mday; - --day_delta; - } while(day_delta > 0); - mktime(<m); - } else if (day_delta < 0) { - do { - --ltm.tm_mday; - ++day_delta; - } while(day_delta < 0); - mktime(<m); - } else { - ltm.tm_mday; - } - return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday; -} - -DateT Meta::GetDateWithDelta(int day_delta) { - return GetDate(std::time(nullptr), day_delta); -} - -DateT Meta::GetDate() { - return GetDate(std::time(nullptr), 0); -} - -} // namespace meta -} // namespace engine -} // namespace milvus -} // namespace zilliz diff --git a/cpp/src/db/meta/Meta.h b/cpp/src/db/meta/Meta.h index b7210815ddf19ebb6a18c92c13e2ccc5dbec4192..8d5dd63a0ac9f94fba3fec91c56d5b729c04aa66 100644 --- a/cpp/src/db/meta/Meta.h +++ b/cpp/src/db/meta/Meta.h @@ -11,7 +11,6 @@ #include "db/Types.h" #include -#include #include namespace zilliz { @@ -19,105 +18,70 @@ namespace milvus { namespace engine { namespace meta { - class Meta { public: using Ptr = std::shared_ptr; - virtual - ~Meta() = 0; - - virtual Status - CreateTable(TableSchema &table_schema) = 0; - - virtual Status - DescribeTable(TableSchema &table_schema) = 0; - - virtual Status - HasTable(const std::string &table_id, bool &has_or_not) = 0; + virtual ~Meta() = default; - virtual Status - AllTables(std::vector &table_schema_array) = 0; + virtual Status CreateTable(TableSchema &table_schema) = 0; - virtual Status - UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0; + virtual Status DescribeTable(TableSchema &table_schema) = 0; - virtual Status - UpdateTableFlag(const std::string &table_id, int64_t flag) = 0; + virtual Status HasTable(const std::string &table_id, bool &has_or_not) = 0; - virtual Status - DeleteTable(const std::string &table_id) = 0; + virtual Status AllTables(std::vector &table_schema_array) = 0; - virtual Status - DeleteTableFiles(const std::string &table_id) = 0; + virtual Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0; - virtual Status - CreateTableFile(TableFileSchema &file_schema) = 0; + virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0; - virtual Status - DropPartitionsByDates(const std::string &table_id, const DatesT &dates) = 0; + virtual Status DeleteTable(const std::string &table_id) = 0; - virtual Status - GetTableFiles(const std::string &table_id, const std::vector &ids, TableFilesSchema &table_files) = 0; + virtual Status DeleteTableFiles(const std::string &table_id) = 0; - virtual Status - UpdateTableFilesToIndex(const std::string &table_id) = 0; + virtual Status CreateTableFile(TableFileSchema &file_schema) = 0; - virtual Status - UpdateTableFile(TableFileSchema &file_schema) = 0; + virtual Status DropPartitionsByDates(const std::string &table_id, const DatesT &dates) = 0; - virtual Status - UpdateTableFiles(TableFilesSchema &files) = 0; + virtual Status GetTableFiles(const std::string &table_id, + const std::vector &ids, + TableFilesSchema &table_files) = 0; - virtual Status - FilesToSearch(const std::string &table_id, - const std::vector &ids, - const DatesT &partition, - DatePartionedTableFilesSchema &files) = 0; + virtual Status UpdateTableFilesToIndex(const std::string &table_id) = 0; - virtual Status - FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0; + virtual Status UpdateTableFile(TableFileSchema &file_schema) = 0; - virtual Status - Size(uint64_t &result) = 0; + virtual Status UpdateTableFiles(TableFilesSchema &files) = 0; - virtual Status - Archive() = 0; + virtual Status FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) = 0; - virtual Status - FilesToIndex(TableFilesSchema &) = 0; + virtual Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0; - virtual Status - FilesByType(const std::string &table_id, - const std::vector &file_types, - std::vector& file_ids) = 0; + virtual Status Size(uint64_t &result) = 0; - virtual Status - DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0; + virtual Status Archive() = 0; - virtual Status - DropTableIndex(const std::string &table_id) = 0; + virtual Status FilesToIndex(TableFilesSchema &) = 0; - virtual Status - CleanUp() = 0; + virtual Status FilesByType(const std::string &table_id, + const std::vector &file_types, + std::vector& file_ids) = 0; - virtual Status - CleanUpFilesWithTTL(uint16_t) = 0; + virtual Status DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0; - virtual Status - DropAll() = 0; + virtual Status DropTableIndex(const std::string &table_id) = 0; - virtual Status - Count(const std::string &table_id, uint64_t &result) = 0; + virtual Status CleanUp() = 0; - static DateT - GetDate(const std::time_t &t, int day_delta = 0); + virtual Status CleanUpFilesWithTTL(uint16_t) = 0; - static DateT - GetDate(); + virtual Status DropAll() = 0; - static DateT - GetDateWithDelta(int day_delta); + virtual Status Count(const std::string &table_id, uint64_t &result) = 0; }; // MetaData diff --git a/cpp/src/db/meta/MetaTypes.h b/cpp/src/db/meta/MetaTypes.h index e4c68decbc03acf4663904c661725ce3a2774205..e31be40ddc25c72476007cb66483b59f7d978bdd 100644 --- a/cpp/src/db/meta/MetaTypes.h +++ b/cpp/src/db/meta/MetaTypes.h @@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384; constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2; constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; -constexpr int64_t FLAG_MASK_USERID = 1; +constexpr int64_t FLAG_MASK_NO_USERID = 0x1; +constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1; typedef int DateT; const DateT EmptyDate = -1; diff --git a/cpp/src/db/meta/MySQLMetaImpl.cpp b/cpp/src/db/meta/MySQLMetaImpl.cpp index b14ac11569470a00d97f2266e9e9100a7b9cce95..beae3fa1e98a02305b812a63e99c2f4072305d72 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.cpp +++ b/cpp/src/db/meta/MySQLMetaImpl.cpp @@ -41,6 +41,18 @@ Status HandleException(const std::string &desc, std::exception &e) { } +MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode) + : options_(options_), + mode_(mode) { + Initialize(); +} + +MySQLMetaImpl::~MySQLMetaImpl() { + if (mode_ != Options::MODE::READ_ONLY) { + CleanUp(); + } +} + Status MySQLMetaImpl::NextTableId(std::string &table_id) { std::stringstream ss; SimpleIDGenerator g; @@ -57,12 +69,6 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) { return Status::OK(); } -MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode) - : options_(options_), - mode_(mode) { - Initialize(); -} - Status MySQLMetaImpl::Initialize() { if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); @@ -202,15 +208,6 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, } try { - - auto yesterday = GetDateWithDelta(-1); - - for (auto &date : dates) { - if (date >= yesterday) { - return Status::Error("Could not delete partitions within 2 days"); - } - } - std::stringstream dateListSS; for (auto &date : dates) { dateListSS << std::to_string(date) << ", "; @@ -229,7 +226,8 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, Query dropPartitionsByDatesQuery = connectionPtr->query(); dropPartitionsByDatesQuery << "UPDATE TableFiles " << - "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," << + "updated_time = " << utils::GetMicroSecTimeStamp() << " " << "WHERE table_id = " << quote << table_id << " AND " << "date in (" << dateListStr << ");"; @@ -877,7 +875,7 @@ Status MySQLMetaImpl::AllTables(std::vector &table_schema_array) { Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { if (file_schema.date_ == EmptyDate) { - file_schema.date_ = Meta::GetDate(); + file_schema.date_ = utils::GetDate(); } TableSchema table_schema; table_schema.table_id_ = file_schema.table_id_; @@ -2031,12 +2029,6 @@ Status MySQLMetaImpl::DropAll() { return Status::OK(); } -MySQLMetaImpl::~MySQLMetaImpl() { - if (mode_ != Options::MODE::READ_ONLY) { - CleanUp(); - } -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/cpp/src/db/meta/MySQLMetaImpl.h b/cpp/src/db/meta/MySQLMetaImpl.h index f5454d2f5f5f9c2bc5e3dfff36ee6b3a61511602..08897cbe30bf37260dd793b0ae0f54d755787862 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.h +++ b/cpp/src/db/meta/MySQLMetaImpl.h @@ -24,6 +24,7 @@ using namespace mysqlpp; class MySQLMetaImpl : public Meta { public: MySQLMetaImpl(const DBMetaOptions &options_, const int &mode); + ~MySQLMetaImpl(); Status CreateTable(TableSchema &table_schema) override; @@ -86,8 +87,6 @@ class MySQLMetaImpl : public Meta { Status Count(const std::string &table_id, uint64_t &result) override; - virtual ~MySQLMetaImpl(); - private: Status NextFileId(std::string &file_id); Status NextTableId(std::string &table_id); diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index 0b111b255a39f920b6a6defc43cc61a2f09dd5e9..73b4501aecbe07957d588eccdcf2095f2f3d1530 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -68,6 +68,15 @@ using ConnectorT = decltype(StoragePrototype("")); static std::unique_ptr ConnectorPtr; using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL); +SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_) + : options_(options_) { + Initialize(); +} + +SqliteMetaImpl::~SqliteMetaImpl() { + CleanUp(); +} + Status SqliteMetaImpl::NextTableId(std::string &table_id) { std::stringstream ss; SimpleIDGenerator g; @@ -84,11 +93,6 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) { return Status::OK(); } -SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_) - : options_(options_) { - Initialize(); -} - Status SqliteMetaImpl::Initialize() { if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); @@ -111,7 +115,7 @@ Status SqliteMetaImpl::Initialize() { // PXU TODO: Temp solution. Will fix later Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id, - const DatesT &dates) { + const DatesT &dates) { if (dates.size() == 0) { return Status::OK(); } @@ -124,20 +128,13 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id, } try { - auto yesterday = GetDateWithDelta(-1); - - for (auto &date : dates) { - if (date >= yesterday) { - return Status::Error("Could not delete partitions with 2 days"); - } - } - //multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); ConnectorPtr->update_all( set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE + c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp() ), where( c(&TableFileSchema::table_id_) == table_id and @@ -543,7 +540,7 @@ Status SqliteMetaImpl::AllTables(std::vector& table_schema_array) { Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) { if (file_schema.date_ == EmptyDate) { - file_schema.date_ = Meta::GetDate(); + file_schema.date_ = utils::GetDate(); } TableSchema table_schema; table_schema.table_id_ = file_schema.table_id_; @@ -1214,10 +1211,6 @@ Status SqliteMetaImpl::DropAll() { return Status::OK(); } -SqliteMetaImpl::~SqliteMetaImpl() { - CleanUp(); -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/cpp/src/db/meta/SqliteMetaImpl.h b/cpp/src/db/meta/SqliteMetaImpl.h index 4a780d9084197f06c2a96ca0823566a34d5e5641..ad0d21f5e958636cef7a16b93a701125c05f146a 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.h +++ b/cpp/src/db/meta/SqliteMetaImpl.h @@ -20,6 +20,7 @@ auto StoragePrototype(const std::string &path); class SqliteMetaImpl : public Meta { public: explicit SqliteMetaImpl(const DBMetaOptions &options_); + ~SqliteMetaImpl(); Status CreateTable(TableSchema &table_schema) override; @@ -80,8 +81,6 @@ class SqliteMetaImpl : public Meta { Status Count(const std::string &table_id, uint64_t &result) override; - ~SqliteMetaImpl() override; - private: Status NextFileId(std::string &file_id); Status NextTableId(std::string &table_id); diff --git a/cpp/src/metrics/Metrics.h b/cpp/src/metrics/Metrics.h index 48f9f2b1118409a1e403a0618bf74ba8013095df..eb6e500f26a3439c14a188a33cd5e7da81acfec3 100644 --- a/cpp/src/metrics/Metrics.h +++ b/cpp/src/metrics/Metrics.h @@ -37,21 +37,22 @@ public: } ~CollectInsertMetrics() { - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time_, end_time); - double avg_time = total_time / n_; - for (int i = 0; i < n_; ++i) { - Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time); - } + if(n_ > 0) { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + double avg_time = total_time / n_; + for (int i = 0; i < n_; ++i) { + Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time); + } - // server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); - if (status_.ok()) { - server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_); - server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_); - } - else { - server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_); - server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_); + // server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time)); + if (status_.ok()) { + server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_); + server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_); + } else { + server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_); + server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_); + } } } @@ -69,14 +70,16 @@ public: } ~CollectQueryMetrics() { - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time_, end_time); - for (int i = 0; i < nq_; ++i) { - server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time); + if(nq_ > 0) { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + for (int i = 0; i < nq_; ++i) { + server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time); + } + auto average_time = total_time / nq_; + server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_); + server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double(nq_) / total_time); } - auto average_time = total_time / nq_; - server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_); - server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time); } private: diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 972c2e260658f2ba56a6e868ec19777b7f92ff39..3ee8cbfdb6e4197669a5758f742a5fc6996fc532 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -7,6 +7,7 @@ #include "SchedInst.h" #include "server/ServerConfig.h" #include "ResourceFactory.h" +#include "knowhere/index/vector_index/gpu_ivf.h" namespace zilliz { namespace milvus { @@ -19,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr; std::mutex SchedInst::mutex_; void -SchedServInit() { +StartSchedulerService() { server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); for (auto &resource : resources) { @@ -36,8 +37,12 @@ SchedServInit() { device_id, enable_loader, enable_executor)); + + knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id); } + knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); + auto default_connection = Connection("default_connection", 500.0); auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); for (auto &conn : connections) { @@ -52,6 +57,11 @@ SchedServInit() { SchedInst::GetInstance()->Start(); } +void +StopSchedulerService() { + ResMgrInst::GetInstance()->Stop(); + SchedInst::GetInstance()->Stop(); +} } } } diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index 3ae36827a8211596aa28cfe02bfb329579478fa1..92f3575ebc20c846670ef0314a344a3511a86ff3 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -53,7 +53,10 @@ private: }; void -SchedServInit(); +StartSchedulerService(); + +void +StopSchedulerService(); } } diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index d5ca579773ee86875235096e4c3d034a01c2d7fb..647fea598e9f6879846ce12fc2fefa998473c075 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -17,6 +17,10 @@ namespace milvus { namespace server { DBWrapper::DBWrapper() { + +} + +ServerError DBWrapper::StartService() { //db config zilliz::milvus::engine::Options opt; ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); @@ -91,7 +95,9 @@ DBWrapper::DBWrapper() { //create db instance std::string msg = opt.meta.path; try { - zilliz::milvus::engine::DB::Open(opt, &db_); + engine::DB* db = nullptr; + zilliz::milvus::engine::DB::Open(opt, &db); + db_.reset(db); } catch(std::exception& ex) { msg = ex.what(); } @@ -100,10 +106,18 @@ DBWrapper::DBWrapper() { std::cout << "ERROR! Failed to open database: " << msg << std::endl; kill(0, SIGUSR1); } + + db_->Start(); + + return SERVER_SUCCESS; } -DBWrapper::~DBWrapper() { - delete db_; +ServerError DBWrapper::StopService() { + if(db_) { + db_->Stop(); + } + + return SERVER_SUCCESS; } } diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index fdde4b157cfbaa71aa093966282acf8ae91d5e4a..8b25dc0d28291499de46aa89c193220a8d0a679d 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -5,8 +5,11 @@ ******************************************************************************/ #pragma once +#include "utils/Error.h" #include "db/DB.h" +#include + namespace zilliz { namespace milvus { namespace server { @@ -14,18 +17,27 @@ namespace server { class DBWrapper { private: DBWrapper(); - ~DBWrapper(); + ~DBWrapper() = default; public: - static zilliz::milvus::engine::DB* DB() { - static DBWrapper db_wrapper; - return db_wrapper.db(); + static DBWrapper& GetInstance() { + static DBWrapper wrapper; + return wrapper; + } + + static std::shared_ptr DB() { + return GetInstance().EngineDB(); } - zilliz::milvus::engine::DB* db() { return db_; } + ServerError StartService(); + ServerError StopService(); + + std::shared_ptr EngineDB() { + return db_; + } private: - zilliz::milvus::engine::DB* db_ = nullptr; + std::shared_ptr db_; }; } diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 5c0229e319adc31edc2b0982de443f22639317ae..2df42791309f4b842a41af978441b769d3e1ba59 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -21,6 +21,7 @@ #include #include "metrics/Metrics.h" +#include "DBWrapper.h" namespace zilliz { namespace milvus { @@ -158,7 +159,7 @@ Server::Start() { signal(SIGTERM, SignalUtil::HandleSignal); server::Metrics::GetInstance().Init(); server::SystemInfo::GetInstance().Init(); - engine::SchedServInit(); + std::cout << "Milvus server start successfully." << std::endl; StartService(); @@ -221,12 +222,16 @@ Server::LoadConfig() { void Server::StartService() { + engine::StartSchedulerService(); + DBWrapper::GetInstance().StartService(); grpc::GrpcMilvusServer::StartService(); } void Server::StopService() { grpc::GrpcMilvusServer::StopService(); + DBWrapper::GetInstance().StopService(); + engine::StopSchedulerService(); } } diff --git a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index baf9116169b08f3445ad2b93019d0dea27b60b4b..737f3dab95d72ce5340f2446c4c26044b8990dfe 100644 --- a/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() { faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); - DBWrapper::DB();//initialize db - std::string server_address(address + ":" + std::to_string(port)); ::grpc::ServerBuilder builder; diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 637b4c3b34154fd9c7307c2cf5653ad188f97be2..6f1a42b641b480c684816f5893390efa11de85eb 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } -ServerError -GrpcBaseTask::Execute() { +ServerError GrpcBaseTask::Execute() { error_code_ = OnExecute(); + Done(); + return error_code_; +} + +void GrpcBaseTask::Done() { done_ = true; finish_cond_.notify_all(); - return error_code_; } -ServerError -GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { +ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) { return error_code_; } -ServerError -GrpcBaseTask::WaitToFinish() { +ServerError GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } -void -GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { +void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { if (task_ptr == nullptr) { return; } @@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr } } -void -GrpcRequestScheduler::Start() { +void GrpcRequestScheduler::Start() { if (!stopped_) { return; } @@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() { stopped_ = false; } -void -GrpcRequestScheduler::Stop() { +void GrpcRequestScheduler::Stop() { if (stopped_) { return; } @@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() { SERVER_LOG_INFO << "Scheduler stopped"; } -ServerError -GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { if (task_ptr == nullptr) { return SERVER_NULL_POINTER; } @@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { return task_ptr->WaitToFinish();//sync execution } -namespace { - void TakeTaskToExecute(TaskQueuePtr task_queue) { - if (task_queue == nullptr) { - return; - } - while (true) { - BaseTaskPtr task = task_queue->Take(); - if (task == nullptr) { - SERVER_LOG_ERROR << "Take null from task queue, stop thread"; - break;//stop the thread - } +void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { + if (task_queue == nullptr) { + return; + } + + while (true) { + BaseTaskPtr task = task_queue->Take(); + if (task == nullptr) { + SERVER_LOG_ERROR << "Take null from task queue, stop thread"; + break;//stop the thread + } - try { - ServerError err = task->Execute(); - if (err != SERVER_SUCCESS) { - SERVER_LOG_ERROR << "Task failed with code: " << err; - } - } catch (std::exception &ex) { - SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); + try { + ServerError err = task->Execute(); + if (err != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Task failed with code: " << err; } + } catch (std::exception &ex) { + SERVER_LOG_ERROR << "Task failed to execute: " << ex.what(); } } } -ServerError -GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { +ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); @@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { task_groups_.insert(std::make_pair(group_name, queue)); //start a thread - ThreadPtr thread = std::make_shared(&TakeTaskToExecute, queue); + ThreadPtr thread = std::make_shared(&GrpcRequestScheduler::TakeTaskToExecute, this, queue); execute_threads_.push_back(thread); SERVER_LOG_INFO << "Create new thread for task group: " << group_name; } diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index a436e8dec6ceb36bd222b52aeeab63f1d3bae5f2..96be98836c6087fa2aaa64f671b1a7dc80b1efb5 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -25,30 +25,24 @@ protected: virtual ~GrpcBaseTask(); public: - ServerError - Execute(); + ServerError Execute(); - ServerError - WaitToFinish(); + void Done(); - std::string - TaskGroup() const { return task_group_; } + ServerError WaitToFinish(); - ServerError - ErrorCode() const { return error_code_; } + std::string TaskGroup() const { return task_group_; } - std::string - ErrorMsg() const { return error_msg_; } + ServerError ErrorCode() const { return error_code_; } - bool - IsAsync() const { return async_; } + std::string ErrorMsg() const { return error_msg_; } + + bool IsAsync() const { return async_; } protected: - virtual ServerError - OnExecute() = 0; + virtual ServerError OnExecute() = 0; - ServerError - SetError(ServerError error_code, const std::string &msg); + ServerError SetError(ServerError error_code, const std::string &msg); protected: mutable std::mutex finish_mtx_; @@ -77,19 +71,18 @@ public: void Stop(); - ServerError - ExecuteTask(const BaseTaskPtr &task_ptr); + ServerError ExecuteTask(const BaseTaskPtr &task_ptr); - static void - ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); + static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); protected: GrpcRequestScheduler(); virtual ~GrpcRequestScheduler(); - ServerError - PutTaskToQueue(const BaseTaskPtr &task_ptr); + void TakeTaskToExecute(TaskQueuePtr task_queue); + + ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr); private: mutable std::mutex queue_mtx_; diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index c277f11a6189fc8a639c15d749d64217fd4e96cd..1b9bc935fae454cc7c663c8366049f12ce619a18 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -93,6 +93,7 @@ namespace { return; } + //range: [start_day, end_day) for (long i = 0; i < days; i++) { time_t tt_day = tt_start + DAY_SECONDS * i; tm tm_day; @@ -456,21 +457,17 @@ InsertTask::OnExecute() { } } + //step 3: check table flag //all user provide id, or all internal id - uint64_t row_count = 0; - DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count); - bool empty_table = (row_count == 0); bool user_provide_ids = !insert_param_->row_id_array().empty(); - if(!empty_table) { - //user already provided id before, all insert action require user id - if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) { - return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch"); - } + //user already provided id before, all insert action require user id + if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { + return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch"); + } - //user didn't provided id before, no need to provide user id - if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) { - return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch"); - } + //user didn't provided id before, no need to provide user id + if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { + return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch"); } rc.RecordSection("check validation"); @@ -481,7 +478,7 @@ InsertTask::OnExecute() { ProfilerStart(fname.c_str()); #endif - //step 3: prepare float data + //step 4: prepare float data std::vector vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0); // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data @@ -504,7 +501,7 @@ InsertTask::OnExecute() { rc.ElapseFromBegin("prepare vectors data"); - //step 4: insert vectors + //step 5: insert vectors auto vec_count = (uint64_t) insert_param_->row_record_array_size(); std::vector vec_ids(insert_param_->row_id_array_size(), 0); if(!insert_param_->row_id_array().empty()) { @@ -529,11 +526,10 @@ InsertTask::OnExecute() { return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); } - //step 5: update table flag - if(empty_table && user_provide_ids) { - stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), - table_info.flag_ | engine::meta::FLAG_MASK_USERID); - } + //step 6: update table flag + user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID + : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; + stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); #ifdef MILVUS_ENABLE_PROFILING ProfilerStop(); diff --git a/cpp/src/wrapper/knowhere/vec_impl.cpp b/cpp/src/wrapper/knowhere/vec_impl.cpp index b0fb4c07990b8d106a79a235a4bed819fc032b51..0b9855c639f973acac5e5187560abba8195760e4 100644 --- a/cpp/src/wrapper/knowhere/vec_impl.cpp +++ b/cpp/src/wrapper/knowhere/vec_impl.cpp @@ -241,8 +241,9 @@ server::KnowhereError IVFMixIndex::BuildAll(const long &nb, index_->Add(dataset, cfg); if (auto device_index = std::dynamic_pointer_cast(index_)) { - auto host_index = device_index->Copy_index_gpu_to_cpu(); + auto host_index = device_index->CopyGpuToCpu(Config()); index_ = host_index; + type = TransferToCpuIndexType(type); } else { WRAPPER_LOG_ERROR << "Build IVFMIXIndex Failed"; } diff --git a/cpp/src/wrapper/knowhere/vec_index.cpp b/cpp/src/wrapper/knowhere/vec_index.cpp index c31c5261dabedf1054d0e8279b33fe4f3dcefbd7..9ac0d8b3ad8e14d925081cf91a540ac56a5640f0 100644 --- a/cpp/src/wrapper/knowhere/vec_index.cpp +++ b/cpp/src/wrapper/knowhere/vec_index.cpp @@ -106,6 +106,10 @@ VecIndexPtr GetVecIndexFactory(const IndexType &type) { index = std::make_shared(0); return std::make_shared(index, IndexType::FAISS_IVFSQ8_MIX); } + case IndexType::FAISS_IVFSQ8: { + index = std::make_shared(); + break; + } case IndexType::NSG_MIX: { // TODO(linxj): bug. index = std::make_shared(0); break; @@ -194,10 +198,10 @@ server::KnowhereError write_index(VecIndexPtr index, const std::string &location // TODO(linxj): redo here. void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Config &cfg) { auto nlist = cfg.get_with_default("nlist", 0); - if (size <= TYPICAL_COUNT/16384 + 1) { + if (size <= TYPICAL_COUNT / 16384 + 1) { //handle less row count, avoid nlist set to 0 cfg["nlist"] = 1; - } else if (int(size/TYPICAL_COUNT) * nlist == 0) { + } else if (int(size / TYPICAL_COUNT) * nlist == 0) { //calculate a proper nlist if nlist not specified or size less than TYPICAL_COUNT cfg["nlist"] = int(size / TYPICAL_COUNT * 16384); } @@ -225,6 +229,20 @@ void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Co } } +IndexType TransferToCpuIndexType(const IndexType &type) { + switch (type) { + case IndexType::FAISS_IVFFLAT_MIX: { + return IndexType::FAISS_IVFFLAT_CPU; + } + case IndexType::FAISS_IVFSQ8_MIX: { + return IndexType::FAISS_IVFSQ8; + } + default: { + return IndexType::INVALID; + } + } +} + } } } diff --git a/cpp/src/wrapper/knowhere/vec_index.h b/cpp/src/wrapper/knowhere/vec_index.h index 19f0c6d36051cb801bd39344591bcfb17bc7da96..1c45ce89fc78defcbb9390b3c8afb4446b2870b2 100644 --- a/cpp/src/wrapper/knowhere/vec_index.h +++ b/cpp/src/wrapper/knowhere/vec_index.h @@ -32,6 +32,7 @@ enum class IndexType { FAISS_IVFPQ_GPU, SPTAG_KDT_RNT_CPU, FAISS_IVFSQ8_MIX, + FAISS_IVFSQ8, NSG_MIX, }; @@ -88,6 +89,8 @@ extern VecIndexPtr LoadVecIndex(const IndexType &index_type, const zilliz::knowh extern void AutoGenParams(const IndexType& type, const long& size, Config& cfg); +extern IndexType TransferToCpuIndexType(const IndexType& type); + } } } diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index 500186a0206f94b162569383abcd9f7406e06399..f37896df33db7fb822fd9f649caa1a51fd89438a 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -293,18 +293,15 @@ TEST_F(DBTest, PRELOADTABLE_TEST) { ASSERT_STATS(stat); ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); - engine::IDNumbers vector_ids; - engine::IDNumbers target_ids; - - int64_t nb = 100000; + int64_t nb = VECTOR_COUNT; std::vector xb; BuildVectors(nb, xb); int loop = 5; - for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), target_ids); - ASSERT_EQ(target_ids.size(), nb); + engine::IDNumbers vector_ids; + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + ASSERT_EQ(vector_ids.size(), nb); } engine::TableIndex index; @@ -342,9 +339,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { ASSERT_STATS(stat); ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); - engine::IDNumbers vector_ids; - engine::IDNumbers target_ids; - uint64_t size; db_->Size(size); @@ -354,6 +348,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { int loop = INSERT_LOOP; for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); std::this_thread::sleep_for(std::chrono::microseconds(1)); } @@ -378,20 +373,17 @@ TEST_F(DBTest2, DELETE_TEST) { db_->HasTable(TABLE_NAME, has_table); ASSERT_TRUE(has_table); - engine::IDNumbers vector_ids; - uint64_t size; db_->Size(size); - int64_t nb = INSERT_LOOP; + int64_t nb = VECTOR_COUNT; std::vector xb; BuildVectors(nb, xb); - int loop = 20; - for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); - std::this_thread::sleep_for(std::chrono::microseconds(1)); - } + engine::IDNumbers vector_ids; + stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + engine::TableIndex index; + stat = db_->CreateIndex(TABLE_NAME, index); std::vector dates; stat = db_->DeleteTable(TABLE_NAME, dates); @@ -420,25 +412,31 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) { db_->HasTable(TABLE_NAME, has_table); ASSERT_TRUE(has_table); - engine::IDNumbers vector_ids; - uint64_t size; db_->Size(size); + ASSERT_EQ(size, 0UL); - int64_t nb = INSERT_LOOP; + int64_t nb = VECTOR_COUNT; std::vector xb; BuildVectors(nb, xb); - int loop = 20; - for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); - std::this_thread::sleep_for(std::chrono::microseconds(1)); - } + engine::IDNumbers vector_ids; + stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + engine::TableIndex index; + stat = db_->CreateIndex(TABLE_NAME, index); + + db_->Size(size); + ASSERT_NE(size, 0UL); std::vector dates; - std::string start_value = CurrentTmDate(-3); - std::string end_value = CurrentTmDate(-2); + std::string start_value = CurrentTmDate(); + std::string end_value = CurrentTmDate(1); ConvertTimeRangeToDBDates(start_value, end_value, dates); - db_->DeleteTable(TABLE_NAME, dates); + stat = db_->DeleteTable(TABLE_NAME, dates); + ASSERT_STATS(stat); + + uint64_t row_count = 0; + db_->GetTableRowCount(TABLE_NAME, row_count); + ASSERT_EQ(row_count, 0UL); } \ No newline at end of file diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index fa28a74eab4a831fd17214a2a8acf93c822c1806..2baad61ca46c26d130a31a006b1b3346d29a1099 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -74,21 +74,21 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { ASSERT_EQ(table_file.file_type_, new_file_type); meta::DatesT dates; - dates.push_back(meta::Meta::GetDate()); + dates.push_back(utils::GetDate()); status = impl_->DropPartitionsByDates(table_file.table_id_, dates); - ASSERT_FALSE(status.ok()); + ASSERT_TRUE(status.ok()); dates.clear(); for (auto i=2; i < 10; ++i) { - dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + dates.push_back(utils::GetDateWithDelta(-1*i)); } status = impl_->DropPartitionsByDates(table_file.table_id_, dates); ASSERT_TRUE(status.ok()); - table_file.date_ = meta::Meta::GetDateWithDelta(-2); + table_file.date_ = utils::GetDateWithDelta(-2); status = impl_->UpdateTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2)); + ASSERT_EQ(table_file.date_, utils::GetDateWithDelta(-2)); ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); dates.clear(); diff --git a/cpp/unittest/db/misc_test.cpp b/cpp/unittest/db/misc_test.cpp index 608a5ca175f3b6b1785e0d552a30312aaf7a57da..a948d1f4d93b52dc969d76c8d926ef17dcf8b071 100644 --- a/cpp/unittest/db/misc_test.cpp +++ b/cpp/unittest/db/misc_test.cpp @@ -105,7 +105,7 @@ TEST(DBMiscTest, META_TEST) { time_t tt; time( &tt ); int delta = 10; - engine::meta::DateT dt = impl.GetDate(tt, delta); + engine::meta::DateT dt = engine::utils::GetDate(tt, delta); ASSERT_GT(dt, 0); } diff --git a/cpp/unittest/db/mysql_meta_test.cpp b/cpp/unittest/db/mysql_meta_test.cpp index 46b831d3ab4cd9aa0b4231bc5a6ff8273b85cabd..2ad842a2236aba947f5d0d5ea2a349dc73ec8e98 100644 --- a/cpp/unittest/db/mysql_meta_test.cpp +++ b/cpp/unittest/db/mysql_meta_test.cpp @@ -90,7 +90,7 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) { ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); meta::DatesT dates; - dates.push_back(meta::Meta::GetDate()); + dates.push_back(utils::GetDate()); status = impl.DropPartitionsByDates(table_file.table_id_, dates); ASSERT_FALSE(status.ok()); @@ -110,15 +110,15 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) { dates.clear(); for (auto i=2; i < 10; ++i) { - dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + dates.push_back(utils::GetDateWithDelta(-1*i)); } status = impl.DropPartitionsByDates(table_file.table_id_, dates); ASSERT_TRUE(status.ok()); - table_file.date_ = meta::Meta::GetDateWithDelta(-2); + table_file.date_ = utils::GetDateWithDelta(-2); status = impl.UpdateTableFile(table_file); ASSERT_TRUE(status.ok()); - ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2)); + ASSERT_EQ(table_file.date_, utils::GetDateWithDelta(-2)); ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); dates.clear(); diff --git a/cpp/unittest/knowhere/knowhere_test.cpp b/cpp/unittest/knowhere/knowhere_test.cpp index abe7c84d4e74ca1b609987d81976b3809a33fd99..0b9e6717e11b164ff7ea8704feeed18e4e11ccaa 100644 --- a/cpp/unittest/knowhere/knowhere_test.cpp +++ b/cpp/unittest/knowhere/knowhere_test.cpp @@ -8,23 +8,29 @@ #include #include +#include "knowhere/index/vector_index/gpu_ivf.h" #include "utils.h" INITIALIZE_EASYLOGGINGPP using namespace zilliz::milvus::engine; -using namespace zilliz::knowhere; +//using namespace zilliz::knowhere; using ::testing::TestWithParam; using ::testing::Values; using ::testing::Combine; +constexpr int64_t DIM = 512; +constexpr int64_t NB = 1000000; class KnowhereWrapperTest : public TestWithParam<::std::tuple> { protected: void SetUp() override { + zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0); + zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(1); + std::string generator_type; std::tie(index_type, generator_type, dim, nb, nq, k, train_cfg, search_cfg) = GetParam(); @@ -66,8 +72,8 @@ class KnowhereWrapperTest Config train_cfg; Config search_cfg; - int dim = 64; - int nb = 10000; + int dim = DIM; + int nb = NB; int nq = 10; int k = 10; std::vector xb; @@ -94,27 +100,27 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest, // Config::object{{"nlist", 100}, {"dim", 64}}, // Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 40}} //), - std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default", - 64, 100000, 10, 10, - Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}}, - Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}} - ), - std::make_tuple(IndexType::FAISS_IDMAP, "Default", - 64, 100000, 10, 10, - Config::object{{"dim", 64}, {"metric_type", "L2"}}, - Config::object{{"dim", 64}, {"k", 10}} - ), +// std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default", +// 64, 100000, 10, 10, +// Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}}, +// Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}} +// ), +// std::make_tuple(IndexType::FAISS_IDMAP, "Default", +// 64, 100000, 10, 10, +// Config::object{{"dim", 64}, {"metric_type", "L2"}}, +// Config::object{{"dim", 64}, {"k", 10}} +// ), std::make_tuple(IndexType::FAISS_IVFSQ8_MIX, "Default", - 64, 100000, 10, 10, - Config::object{{"dim", 64}, {"nlist", 1000}, {"nbits", 8}, {"metric_type", "L2"}}, - Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}} - ), - std::make_tuple(IndexType::NSG_MIX, "Default", - 128, 250000, 10, 10, - Config::object{{"dim", 128}, {"nlist", 8192}, {"nprobe", 16}, {"metric_type", "L2"}, - {"knng", 200}, {"search_length", 40}, {"out_degree", 60}, {"candidate_pool_size", 200}}, - Config::object{{"k", 10}, {"search_length", 20}} + DIM, NB, 10, 10, + Config::object{{"dim", DIM}, {"nlist", 1000}, {"nbits", 8}, {"metric_type", "L2"}}, + Config::object{{"dim", DIM}, {"k", 10}, {"nprobe", 5}} ) +// std::make_tuple(IndexType::NSG_MIX, "Default", +// 128, 250000, 10, 10, +// Config::object{{"dim", 128}, {"nlist", 8192}, {"nprobe", 16}, {"metric_type", "L2"}, +// {"knng", 200}, {"search_length", 40}, {"out_degree", 60}, {"candidate_pool_size", 200}}, +// Config::object{{"k", 10}, {"search_length", 20}} +// ) //std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", // 64, 10000, 10, 10, // Config::object{{"TPTNumber", 1}, {"dim", 64}}, @@ -135,6 +141,31 @@ TEST_P(KnowhereWrapperTest, base_test) { AssertResult(res_ids, res_dis); } +TEST_P(KnowhereWrapperTest, to_gpu_test) { + EXPECT_EQ(index_->GetType(), index_type); + + auto elems = nq * k; + std::vector res_ids(elems); + std::vector res_dis(elems); + + index_->BuildAll(nb, xb.data(), ids.data(), train_cfg); + index_->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + AssertResult(res_ids, res_dis); + { + index_->CopyToGpu(1); + } + + std::string file_location = "/tmp/whatever"; + write_index(index_, file_location); + auto new_index = read_index(file_location); + + auto dev_idx = new_index->CopyToGpu(1); + for (int i = 0; i < 10000; ++i) { + dev_idx->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg); + } + AssertResult(res_ids, res_dis); +} + TEST_P(KnowhereWrapperTest, serialize) { EXPECT_EQ(index_->GetType(), index_type);