提交 3eac8169 编写于 作者: Y Yu Kun

fix confilct


Former-commit-id: e0360836c4ef54aa8dca66c7fd3225d88b5bb189
......@@ -3,13 +3,16 @@ container('milvus-build-env') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/thirdparty/knowhere/cmake_build"
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build"
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
/*
dir ("cpp/thirdparty/knowhere") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j"
}
*/
dir ("cpp") {
sh "git config --global user.email \"test@zilliz.com\""
......
......@@ -3,14 +3,16 @@ container('milvus-build-env') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/thirdparty/knowhere/cmake_build"
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build"
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
/*
dir ("cpp/thirdparty/knowhere") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j"
}
*/
dir ("cpp") {
sh "git config --global user.email \"test@zilliz.com\""
......
......@@ -16,6 +16,9 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9))
- MS-430 - Search no result if index created with FLAT
- MS-443 - Create index hang again
- MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8
- MS-450 - server hang after run stop_server.sh
- MS-449 - Add vectors twice success, once with ids, the other no ids
## Improvement
- MS-327 - Clean code for milvus
......@@ -70,6 +73,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-440 - Add DumpTaskTables in sdk
- MS-442 - Merge Knowhere
- MS-445 - Rename CopyCompleted to LoadCompleted
- MS-451 - Update server_config.template file, set GPU compute default
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -2,7 +2,6 @@
BUILD_TYPE="Debug"
BUILD_UNITTEST="OFF"
LICENSE_CHECK="OFF"
INSTALL_PREFIX=$(pwd)/milvus
MAKE_CLEAN="OFF"
BUILD_COVERAGE="OFF"
......@@ -11,12 +10,14 @@ PROFILING="OFF"
BUILD_FAISS_WITH_MKL="OFF"
USE_JFROG_CACHE="OFF"
KNOWHERE_BUILD_DIR="`pwd`/src/core/cmake_build"
KNOWHERE_OPTIONS="-t ${BUILD_TYPE}"
while getopts "p:d:t:k:uhlrcgmj" arg
while getopts "p:d:t:k:uhrcgmj" arg
do
case $arg in
t)
BUILD_TYPE=$OPTARG # BUILD_TYPE
KNOWHERE_OPTIONS="-t ${BUILD_TYPE}"
;;
u)
echo "Build and run unittest cases" ;
......@@ -28,9 +29,6 @@ do
d)
DB_PATH=$OPTARG
;;
l)
LICENSE_CHECK="ON"
;;
r)
if [[ -d cmake_build ]]; then
rm ./cmake_build -r
......@@ -51,6 +49,7 @@ do
;;
j)
USE_JFROG_CACHE="ON"
KNOWHERE_OPTIONS="${KNOWHERE_OPTIONS} -j"
;;
h) # help
echo "
......@@ -60,7 +59,6 @@ parameter:
-u: building unit test options(default: OFF)
-p: install prefix(default: $(pwd)/milvus)
-d: db path(default: /opt/milvus)
-l: build license version(default: OFF)
-r: remove previous build directory(default: OFF)
-c: code coverage(default: OFF)
-g: profiling(default: OFF)
......@@ -85,8 +83,12 @@ if [[ ! -d cmake_build ]]; then
MAKE_CLEAN="ON"
fi
pushd `pwd`/src/core
./build.sh ${KNOWHERE_OPTIONS}
popd
cd cmake_build
git
CUDA_COMPILER=/usr/local/cuda/bin/nvcc
if [[ ${MAKE_CLEAN} == "ON" ]]; then
......@@ -94,7 +96,6 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then
-DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX}
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DCMAKE_CUDA_COMPILER=${CUDA_COMPILER} \
-DCMAKE_LICENSE_CHECK=${LICENSE_CHECK} \
-DBUILD_COVERAGE=${BUILD_COVERAGE} \
-DMILVUS_DB_PATH=${DB_PATH} \
-DMILVUS_ENABLE_PROFILING=${PROFILING} \
......
......@@ -309,7 +309,8 @@ else()
# set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
endif()
set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
# set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
set(FAISS_MD5 "31167ecbd1903fec600dc4ac00b9be9e")
if(DEFINED ENV{MILVUS_KNOWHERE_URL})
set(KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}")
......
......@@ -64,9 +64,9 @@ resource_config:
memory: 64
device_id: 0
enable_loader: true
enable_executor: true
enable_executor: false
gtx1060:
gpu0:
type: GPU
memory: 6
device_id: 0
......@@ -80,10 +80,17 @@ resource_config:
enable_loader: false
enable_executor: false
# gtx1660:
# type: GPU
# memory: 6
# device_id: 1
# enable_loader: true
# enable_executor: true
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
connections:
- ssda===cpu
- cpu===gtx1060
- cpu===gtx1660
- cpu===gpu0
# - cpu===gtx1660
......@@ -260,7 +260,8 @@ else()
# set(FAISS_SOURCE_URL "${CMAKE_SOURCE_DIR}/thirdparty/faiss-1.5.3")
message(STATUS ${FAISS_SOURCE_URL})
endif()
set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
# set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
set(FAISS_MD5 "31167ecbd1903fec600dc4ac00b9be9e")
if(DEFINED ENV{KNOWHERE_ARROW_URL})
set(ARROW_SOURCE_URL "$ENV{KNOWHERE_ARROW_URL}")
......@@ -924,7 +925,7 @@ macro(build_faiss)
if(USE_JFROG_CACHE STREQUAL "ON")
# Check_Last_Modify("${CMAKE_SOURCE_DIR}/thirdparty/faiss_cache_check_lists.txt" "${CMAKE_SOURCE_DIR}" FAISS_LAST_MODIFIED_COMMIT_ID)
string(MD5 FAISS_COMBINE_MD5 "${FAISS_MD5}${LAPACK_MD5}${OPENBLAS_MD5}")
string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}")
# string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}")
set(FAISS_CACHE_PACKAGE_NAME "faiss_${FAISS_COMBINE_MD5}.tar.gz")
set(FAISS_CACHE_URL "${JFROG_ARTFACTORY_CACHE_URL}/${FAISS_CACHE_PACKAGE_NAME}")
set(FAISS_CACHE_PACKAGE_PATH "${THIRDPARTY_PACKAGE_CACHE}/${FAISS_CACHE_PACKAGE_NAME}")
......
# source
src/
include/
# third party
thirdparty/
# cmake
cmake/
CMakeLists.txt
# script
build.sh
\ No newline at end of file
......@@ -22,6 +22,9 @@ class DB {
public:
static void Open(const Options& options, DB** dbptr);
virtual Status Start() = 0;
virtual Status Stop() = 0;
virtual Status CreateTable(meta::TableSchema& table_schema_) = 0;
virtual Status DeleteTable(const std::string& table_id, const meta::DatesT& dates) = 0;
virtual Status DescribeTable(meta::TableSchema& table_schema_) = 0;
......
......@@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
DBImpl::DBImpl(const Options& options)
: options_(options),
shutting_down_(false),
shutting_down_(true),
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
if (options.mode != Options::MODE::READ_ONLY) {
Start();
}
DBImpl::~DBImpl() {
Stop();
}
Status DBImpl::Start() {
if (!shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
}
//for distribute version, some nodes are read only
if (options_.mode != Options::MODE::READ_ONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
StartTimerTasks();
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
shutting_down_.store(false, std::memory_order_release);
return Status::OK();
}
Status DBImpl::Stop() {
if (shutting_down_.load(std::memory_order_acquire)){
return Status::OK();
}
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
//wait compaction/buildindex finish
for(auto& result : compact_thread_results_) {
result.wait();
}
for(auto& result : index_thread_results_) {
result.wait();
}
//makesure all memory data serialized
MemSerialize();
return Status::OK();
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -162,7 +200,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
const float *vectors, QueryResults &results) {
server::CollectQueryMetrics metrics(nq);
meta::DatesT dates = {meta::Meta::GetDate()};
meta::DatesT dates = {utils::GetDate()};
Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
return result;
......@@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return Status::OK();
}
void DBImpl::StartTimerTasks() {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
}
void DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
......@@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) {
return meta_ptr_->Size(result);
}
DBImpl::~DBImpl() {
shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join();
std::set<std::string> ids;
mem_mgr_->Serialize(ids);
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -36,6 +36,9 @@ class DBImpl : public DB {
explicit DBImpl(const Options &options);
Status Start() override;
Status Stop() override;
Status CreateTable(meta::TableSchema &table_schema) override;
Status DeleteTable(const std::string &table_id, const meta::DatesT &dates) override;
......@@ -91,18 +94,15 @@ class DBImpl : public DB {
~DBImpl() override;
private:
Status
QueryAsync(const std::string &table_id,
const meta::TableFilesSchema &files,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
const float *vectors,
const meta::DatesT &dates,
QueryResults &results);
void StartTimerTasks();
Status QueryAsync(const std::string &table_id,
const meta::TableFilesSchema &files,
uint64_t k,
uint64_t nq,
uint64_t nprobe,
const float *vectors,
const meta::DatesT &dates,
QueryResults &results);
void BackgroundTimerTask();
void StartMetricTask();
......
......@@ -152,8 +152,33 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&& index1.metric_type_ == index2.metric_type_;
}
bool UserDefinedId(int64_t flag) {
return flag & meta::FLAG_MASK_USERID;
meta::DateT GetDate(const std::time_t& t, int day_delta) {
struct tm ltm;
localtime_r(&t, &ltm);
if (day_delta > 0) {
do {
++ltm.tm_mday;
--day_delta;
} while(day_delta > 0);
mktime(&ltm);
} else if (day_delta < 0) {
do {
--ltm.tm_mday;
++day_delta;
} while(day_delta < 0);
mktime(&ltm);
} else {
ltm.tm_mday;
}
return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday;
}
meta::DateT GetDateWithDelta(int day_delta) {
return GetDate(std::time(nullptr), day_delta);
}
meta::DateT GetDate() {
return GetDate(std::time(nullptr), 0);
}
} // namespace utils
......
......@@ -10,6 +10,7 @@
#include "db/Types.h"
#include <string>
#include <ctime>
namespace zilliz {
namespace milvus {
......@@ -27,7 +28,9 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2);
bool UserDefinedId(int64_t flag);
meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT GetDate();
meta::DateT GetDateWithDelta(int day_delta);
} // namespace utils
} // namespace engine
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Meta.h"
#include <ctime>
#include <stdio.h>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
Meta::~Meta() = default;
DateT Meta::GetDate(const std::time_t& t, int day_delta) {
struct tm ltm;
localtime_r(&t, &ltm);
if (day_delta > 0) {
do {
++ltm.tm_mday;
--day_delta;
} while(day_delta > 0);
mktime(&ltm);
} else if (day_delta < 0) {
do {
--ltm.tm_mday;
++day_delta;
} while(day_delta < 0);
mktime(&ltm);
} else {
ltm.tm_mday;
}
return ltm.tm_year*10000 + ltm.tm_mon*100 + ltm.tm_mday;
}
DateT Meta::GetDateWithDelta(int day_delta) {
return GetDate(std::time(nullptr), day_delta);
}
DateT Meta::GetDate() {
return GetDate(std::time(nullptr), 0);
}
} // namespace meta
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -11,7 +11,6 @@
#include "db/Types.h"
#include <cstddef>
#include <ctime>
#include <memory>
namespace zilliz {
......@@ -19,105 +18,70 @@ namespace milvus {
namespace engine {
namespace meta {
class Meta {
public:
using Ptr = std::shared_ptr<Meta>;
virtual
~Meta() = 0;
virtual Status
CreateTable(TableSchema &table_schema) = 0;
virtual Status
DescribeTable(TableSchema &table_schema) = 0;
virtual Status
HasTable(const std::string &table_id, bool &has_or_not) = 0;
virtual ~Meta() = default;
virtual Status
AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status CreateTable(TableSchema &table_schema) = 0;
virtual Status
UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status DescribeTable(TableSchema &table_schema) = 0;
virtual Status
UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status HasTable(const std::string &table_id, bool &has_or_not) = 0;
virtual Status
DeleteTable(const std::string &table_id) = 0;
virtual Status AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status
DeleteTableFiles(const std::string &table_id) = 0;
virtual Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status
CreateTableFile(TableFileSchema &file_schema) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;
virtual Status
DropPartitionsByDates(const std::string &table_id, const DatesT &dates) = 0;
virtual Status DeleteTable(const std::string &table_id) = 0;
virtual Status
GetTableFiles(const std::string &table_id, const std::vector<size_t> &ids, TableFilesSchema &table_files) = 0;
virtual Status DeleteTableFiles(const std::string &table_id) = 0;
virtual Status
UpdateTableFilesToIndex(const std::string &table_id) = 0;
virtual Status CreateTableFile(TableFileSchema &file_schema) = 0;
virtual Status
UpdateTableFile(TableFileSchema &file_schema) = 0;
virtual Status DropPartitionsByDates(const std::string &table_id, const DatesT &dates) = 0;
virtual Status
UpdateTableFiles(TableFilesSchema &files) = 0;
virtual Status GetTableFiles(const std::string &table_id,
const std::vector<size_t> &ids,
TableFilesSchema &table_files) = 0;
virtual Status
FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
const DatesT &partition,
DatePartionedTableFilesSchema &files) = 0;
virtual Status UpdateTableFilesToIndex(const std::string &table_id) = 0;
virtual Status
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
virtual Status UpdateTableFile(TableFileSchema &file_schema) = 0;
virtual Status
Size(uint64_t &result) = 0;
virtual Status UpdateTableFiles(TableFilesSchema &files) = 0;
virtual Status
Archive() = 0;
virtual Status FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
const DatesT &partition,
DatePartionedTableFilesSchema &files) = 0;
virtual Status
FilesToIndex(TableFilesSchema &) = 0;
virtual Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
virtual Status
FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string>& file_ids) = 0;
virtual Status Size(uint64_t &result) = 0;
virtual Status
DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0;
virtual Status Archive() = 0;
virtual Status
DropTableIndex(const std::string &table_id) = 0;
virtual Status FilesToIndex(TableFilesSchema &) = 0;
virtual Status
CleanUp() = 0;
virtual Status FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string>& file_ids) = 0;
virtual Status
CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0;
virtual Status
DropAll() = 0;
virtual Status DropTableIndex(const std::string &table_id) = 0;
virtual Status
Count(const std::string &table_id, uint64_t &result) = 0;
virtual Status CleanUp() = 0;
static DateT
GetDate(const std::time_t &t, int day_delta = 0);
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
static DateT
GetDate();
virtual Status DropAll() = 0;
static DateT
GetDateWithDelta(int day_delta);
virtual Status Count(const std::string &table_id, uint64_t &result) = 0;
}; // MetaData
......
......@@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB;
constexpr int64_t FLAG_MASK_USERID = 1;
constexpr int64_t FLAG_MASK_NO_USERID = 0x1;
constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1;
typedef int DateT;
const DateT EmptyDate = -1;
......
......@@ -41,6 +41,18 @@ Status HandleException(const std::string &desc, std::exception &e) {
}
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
: options_(options_),
mode_(mode) {
Initialize();
}
MySQLMetaImpl::~MySQLMetaImpl() {
if (mode_ != Options::MODE::READ_ONLY) {
CleanUp();
}
}
Status MySQLMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -57,12 +69,6 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) {
return Status::OK();
}
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
: options_(options_),
mode_(mode) {
Initialize();
}
Status MySQLMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
......@@ -202,15 +208,6 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
try {
auto yesterday = GetDateWithDelta(-1);
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions within 2 days");
}
}
std::stringstream dateListSS;
for (auto &date : dates) {
dateListSS << std::to_string(date) << ", ";
......@@ -229,7 +226,8 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Query dropPartitionsByDatesQuery = connectionPtr->query();
dropPartitionsByDatesQuery << "UPDATE TableFiles " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
"updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"date in (" << dateListStr << ");";
......@@ -877,7 +875,7 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
if (file_schema.date_ == EmptyDate) {
file_schema.date_ = Meta::GetDate();
file_schema.date_ = utils::GetDate();
}
TableSchema table_schema;
table_schema.table_id_ = file_schema.table_id_;
......@@ -2031,12 +2029,6 @@ Status MySQLMetaImpl::DropAll() {
return Status::OK();
}
MySQLMetaImpl::~MySQLMetaImpl() {
if (mode_ != Options::MODE::READ_ONLY) {
CleanUp();
}
}
} // namespace meta
} // namespace engine
} // namespace milvus
......
......@@ -24,6 +24,7 @@ using namespace mysqlpp;
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode);
~MySQLMetaImpl();
Status CreateTable(TableSchema &table_schema) override;
......@@ -86,8 +87,6 @@ class MySQLMetaImpl : public Meta {
Status Count(const std::string &table_id, uint64_t &result) override;
virtual ~MySQLMetaImpl();
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
......
......@@ -68,6 +68,15 @@ using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
: options_(options_) {
Initialize();
}
SqliteMetaImpl::~SqliteMetaImpl() {
CleanUp();
}
Status SqliteMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -84,11 +93,6 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
return Status::OK();
}
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
: options_(options_) {
Initialize();
}
Status SqliteMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
......@@ -111,7 +115,7 @@ Status SqliteMetaImpl::Initialize() {
// PXU TODO: Temp solution. Will fix later
Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) {
const DatesT &dates) {
if (dates.size() == 0) {
return Status::OK();
}
......@@ -124,20 +128,13 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
try {
auto yesterday = GetDateWithDelta(-1);
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions with 2 days");
}
}
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
),
where(
c(&TableFileSchema::table_id_) == table_id and
......@@ -543,7 +540,7 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
if (file_schema.date_ == EmptyDate) {
file_schema.date_ = Meta::GetDate();
file_schema.date_ = utils::GetDate();
}
TableSchema table_schema;
table_schema.table_id_ = file_schema.table_id_;
......@@ -1214,10 +1211,6 @@ Status SqliteMetaImpl::DropAll() {
return Status::OK();
}
SqliteMetaImpl::~SqliteMetaImpl() {
CleanUp();
}
} // namespace meta
} // namespace engine
} // namespace milvus
......
......@@ -20,6 +20,7 @@ auto StoragePrototype(const std::string &path);
class SqliteMetaImpl : public Meta {
public:
explicit SqliteMetaImpl(const DBMetaOptions &options_);
~SqliteMetaImpl();
Status CreateTable(TableSchema &table_schema) override;
......@@ -80,8 +81,6 @@ class SqliteMetaImpl : public Meta {
Status Count(const std::string &table_id, uint64_t &result) override;
~SqliteMetaImpl() override;
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
......
......@@ -37,21 +37,22 @@ public:
}
~CollectInsertMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
if(n_ > 0) {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
double avg_time = total_time / n_;
for (int i = 0; i < n_; ++i) {
Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (status_.ok()) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (status_.ok()) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n_);
} else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n_);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n_);
}
}
}
......@@ -69,14 +70,16 @@ public:
}
~CollectQueryMetrics() {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
if(nq_ > 0) {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time_, end_time);
for (int i = 0; i < nq_; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double(nq_) / total_time);
}
auto average_time = total_time / nq_;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq_);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq_) / total_time);
}
private:
......
......@@ -7,6 +7,7 @@
#include "SchedInst.h"
#include "server/ServerConfig.h"
#include "ResourceFactory.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
namespace zilliz {
namespace milvus {
......@@ -19,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr;
std::mutex SchedInst::mutex_;
void
SchedServInit() {
StartSchedulerService() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
for (auto &resource : resources) {
......@@ -36,8 +37,12 @@ SchedServInit() {
device_id,
enable_loader,
enable_executor));
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id);
}
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS);
for (auto &conn : connections) {
......@@ -52,6 +57,11 @@ SchedServInit() {
SchedInst::GetInstance()->Start();
}
void
StopSchedulerService() {
ResMgrInst::GetInstance()->Stop();
SchedInst::GetInstance()->Stop();
}
}
}
}
......@@ -53,7 +53,10 @@ private:
};
void
SchedServInit();
StartSchedulerService();
void
StopSchedulerService();
}
}
......
......@@ -17,6 +17,10 @@ namespace milvus {
namespace server {
DBWrapper::DBWrapper() {
}
ServerError DBWrapper::StartService() {
//db config
zilliz::milvus::engine::Options opt;
ConfigNode& db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
......@@ -91,7 +95,9 @@ DBWrapper::DBWrapper() {
//create db instance
std::string msg = opt.meta.path;
try {
zilliz::milvus::engine::DB::Open(opt, &db_);
engine::DB* db = nullptr;
zilliz::milvus::engine::DB::Open(opt, &db);
db_.reset(db);
} catch(std::exception& ex) {
msg = ex.what();
}
......@@ -100,10 +106,18 @@ DBWrapper::DBWrapper() {
std::cout << "ERROR! Failed to open database: " << msg << std::endl;
kill(0, SIGUSR1);
}
db_->Start();
return SERVER_SUCCESS;
}
DBWrapper::~DBWrapper() {
delete db_;
ServerError DBWrapper::StopService() {
if(db_) {
db_->Stop();
}
return SERVER_SUCCESS;
}
}
......
......@@ -5,8 +5,11 @@
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "db/DB.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace server {
......@@ -14,18 +17,27 @@ namespace server {
class DBWrapper {
private:
DBWrapper();
~DBWrapper();
~DBWrapper() = default;
public:
static zilliz::milvus::engine::DB* DB() {
static DBWrapper db_wrapper;
return db_wrapper.db();
static DBWrapper& GetInstance() {
static DBWrapper wrapper;
return wrapper;
}
static std::shared_ptr<engine::DB> DB() {
return GetInstance().EngineDB();
}
zilliz::milvus::engine::DB* db() { return db_; }
ServerError StartService();
ServerError StopService();
std::shared_ptr<engine::DB> EngineDB() {
return db_;
}
private:
zilliz::milvus::engine::DB* db_ = nullptr;
std::shared_ptr<engine::DB> db_;
};
}
......
......@@ -21,6 +21,7 @@
#include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace zilliz {
namespace milvus {
......@@ -158,7 +159,7 @@ Server::Start() {
signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init();
engine::SchedServInit();
std::cout << "Milvus server start successfully." << std::endl;
StartService();
......@@ -221,12 +222,16 @@ Server::LoadConfig() {
void
Server::StartService() {
engine::StartSchedulerService();
DBWrapper::GetInstance().StartService();
grpc::GrpcMilvusServer::StartService();
}
void
Server::StopService() {
grpc::GrpcMilvusServer::StopService();
DBWrapper::GetInstance().StopService();
engine::StopSchedulerService();
}
}
......
......@@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() {
faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20);
DBWrapper::DB();//initialize db
std::string server_address(address + ":" + std::to_string(port));
::grpc::ServerBuilder builder;
......
......@@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish();
}
ServerError
GrpcBaseTask::Execute() {
ServerError GrpcBaseTask::Execute() {
error_code_ = OnExecute();
Done();
return error_code_;
}
void GrpcBaseTask::Done() {
done_ = true;
finish_cond_.notify_all();
return error_code_;
}
ServerError
GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
ServerError GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
error_code_ = error_code;
error_msg_ = error_msg;
......@@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
return error_code_;
}
ServerError
GrpcBaseTask::WaitToFinish() {
ServerError GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
......@@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop();
}
void
GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
if (task_ptr == nullptr) {
return;
}
......@@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr
}
}
void
GrpcRequestScheduler::Start() {
void GrpcRequestScheduler::Start() {
if (!stopped_) {
return;
}
......@@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() {
stopped_ = false;
}
void
GrpcRequestScheduler::Stop() {
void GrpcRequestScheduler::Stop() {
if (stopped_) {
return;
}
......@@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler stopped";
}
ServerError
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
ServerError GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
if (task_ptr == nullptr) {
return SERVER_NULL_POINTER;
}
......@@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return task_ptr->WaitToFinish();//sync execution
}
namespace {
void TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
break;//stop the thread
}
void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
break;//stop the thread
}
try {
ServerError err = task->Execute();
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
try {
ServerError err = task->Execute();
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
}
}
}
ServerError
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
ServerError GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup();
......@@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
task_groups_.insert(std::make_pair(group_name, queue));
//start a thread
ThreadPtr thread = std::make_shared<std::thread>(&TakeTaskToExecute, queue);
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
execute_threads_.push_back(thread);
SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
}
......
......@@ -25,30 +25,24 @@ protected:
virtual ~GrpcBaseTask();
public:
ServerError
Execute();
ServerError Execute();
ServerError
WaitToFinish();
void Done();
std::string
TaskGroup() const { return task_group_; }
ServerError WaitToFinish();
ServerError
ErrorCode() const { return error_code_; }
std::string TaskGroup() const { return task_group_; }
std::string
ErrorMsg() const { return error_msg_; }
ServerError ErrorCode() const { return error_code_; }
bool
IsAsync() const { return async_; }
std::string ErrorMsg() const { return error_msg_; }
bool IsAsync() const { return async_; }
protected:
virtual ServerError
OnExecute() = 0;
virtual ServerError OnExecute() = 0;
ServerError
SetError(ServerError error_code, const std::string &msg);
ServerError SetError(ServerError error_code, const std::string &msg);
protected:
mutable std::mutex finish_mtx_;
......@@ -77,19 +71,18 @@ public:
void Stop();
ServerError
ExecuteTask(const BaseTaskPtr &task_ptr);
ServerError ExecuteTask(const BaseTaskPtr &task_ptr);
static void
ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
protected:
GrpcRequestScheduler();
virtual ~GrpcRequestScheduler();
ServerError
PutTaskToQueue(const BaseTaskPtr &task_ptr);
void TakeTaskToExecute(TaskQueuePtr task_queue);
ServerError PutTaskToQueue(const BaseTaskPtr &task_ptr);
private:
mutable std::mutex queue_mtx_;
......
......@@ -93,6 +93,7 @@ namespace {
return;
}
//range: [start_day, end_day)
for (long i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS * i;
tm tm_day;
......@@ -456,21 +457,17 @@ InsertTask::OnExecute() {
}
}
//step 3: check table flag
//all user provide id, or all internal id
uint64_t row_count = 0;
DBWrapper::DB()->GetTableRowCount(table_info.table_id_, row_count);
bool empty_table = (row_count == 0);
bool user_provide_ids = !insert_param_->row_id_array().empty();
if(!empty_table) {
//user already provided id before, all insert action require user id
if(engine::utils::UserDefinedId(table_info.flag_) && !user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
}
//user already provided id before, all insert action require user id
if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
}
//user didn't provided id before, no need to provide user id
if(!engine::utils::UserDefinedId(table_info.flag_) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
}
//user didn't provided id before, no need to provide user id
if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
}
rc.RecordSection("check validation");
......@@ -481,7 +478,7 @@ InsertTask::OnExecute() {
ProfilerStart(fname.c_str());
#endif
//step 3: prepare float data
//step 4: prepare float data
std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0);
// TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
......@@ -504,7 +501,7 @@ InsertTask::OnExecute() {
rc.ElapseFromBegin("prepare vectors data");
//step 4: insert vectors
//step 5: insert vectors
auto vec_count = (uint64_t) insert_param_->row_record_array_size();
std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
if(!insert_param_->row_id_array().empty()) {
......@@ -529,11 +526,10 @@ InsertTask::OnExecute() {
return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
}
//step 5: update table flag
if(empty_table && user_provide_ids) {
stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(),
table_info.flag_ | engine::meta::FLAG_MASK_USERID);
}
//step 6: update table flag
user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
: table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
stat = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop();
......
......@@ -241,8 +241,9 @@ server::KnowhereError IVFMixIndex::BuildAll(const long &nb,
index_->Add(dataset, cfg);
if (auto device_index = std::dynamic_pointer_cast<GPUIVF>(index_)) {
auto host_index = device_index->Copy_index_gpu_to_cpu();
auto host_index = device_index->CopyGpuToCpu(Config());
index_ = host_index;
type = TransferToCpuIndexType(type);
} else {
WRAPPER_LOG_ERROR << "Build IVFMIXIndex Failed";
}
......
......@@ -106,6 +106,10 @@ VecIndexPtr GetVecIndexFactory(const IndexType &type) {
index = std::make_shared<zilliz::knowhere::GPUIVFSQ>(0);
return std::make_shared<IVFMixIndex>(index, IndexType::FAISS_IVFSQ8_MIX);
}
case IndexType::FAISS_IVFSQ8: {
index = std::make_shared<zilliz::knowhere::IVFSQ>();
break;
}
case IndexType::NSG_MIX: { // TODO(linxj): bug.
index = std::make_shared<zilliz::knowhere::NSG>(0);
break;
......@@ -194,10 +198,10 @@ server::KnowhereError write_index(VecIndexPtr index, const std::string &location
// TODO(linxj): redo here.
void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Config &cfg) {
auto nlist = cfg.get_with_default("nlist", 0);
if (size <= TYPICAL_COUNT/16384 + 1) {
if (size <= TYPICAL_COUNT / 16384 + 1) {
//handle less row count, avoid nlist set to 0
cfg["nlist"] = 1;
} else if (int(size/TYPICAL_COUNT) * nlist == 0) {
} else if (int(size / TYPICAL_COUNT) * nlist == 0) {
//calculate a proper nlist if nlist not specified or size less than TYPICAL_COUNT
cfg["nlist"] = int(size / TYPICAL_COUNT * 16384);
}
......@@ -225,6 +229,20 @@ void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Co
}
}
IndexType TransferToCpuIndexType(const IndexType &type) {
switch (type) {
case IndexType::FAISS_IVFFLAT_MIX: {
return IndexType::FAISS_IVFFLAT_CPU;
}
case IndexType::FAISS_IVFSQ8_MIX: {
return IndexType::FAISS_IVFSQ8;
}
default: {
return IndexType::INVALID;
}
}
}
}
}
}
......@@ -32,6 +32,7 @@ enum class IndexType {
FAISS_IVFPQ_GPU,
SPTAG_KDT_RNT_CPU,
FAISS_IVFSQ8_MIX,
FAISS_IVFSQ8,
NSG_MIX,
};
......@@ -88,6 +89,8 @@ extern VecIndexPtr LoadVecIndex(const IndexType &index_type, const zilliz::knowh
extern void AutoGenParams(const IndexType& type, const long& size, Config& cfg);
extern IndexType TransferToCpuIndexType(const IndexType& type);
}
}
}
......@@ -293,18 +293,15 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int64_t nb = 100000;
int64_t nb = VECTOR_COUNT;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 5;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), target_ids);
ASSERT_EQ(target_ids.size(), nb);
engine::IDNumbers vector_ids;
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_EQ(vector_ids.size(), nb);
}
engine::TableIndex index;
......@@ -342,9 +339,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
ASSERT_STATS(stat);
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
uint64_t size;
db_->Size(size);
......@@ -354,6 +348,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
int loop = INSERT_LOOP;
for (auto i=0; i<loop; ++i) {
engine::IDNumbers vector_ids;
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
......@@ -378,20 +373,17 @@ TEST_F(DBTest2, DELETE_TEST) {
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
int64_t nb = INSERT_LOOP;
int64_t nb = VECTOR_COUNT;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
engine::IDNumbers vector_ids;
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
engine::TableIndex index;
stat = db_->CreateIndex(TABLE_NAME, index);
std::vector<engine::meta::DateT> dates;
stat = db_->DeleteTable(TABLE_NAME, dates);
......@@ -420,25 +412,31 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
db_->HasTable(TABLE_NAME, has_table);
ASSERT_TRUE(has_table);
engine::IDNumbers vector_ids;
uint64_t size;
db_->Size(size);
ASSERT_EQ(size, 0UL);
int64_t nb = INSERT_LOOP;
int64_t nb = VECTOR_COUNT;
std::vector<float> xb;
BuildVectors(nb, xb);
int loop = 20;
for (auto i=0; i<loop; ++i) {
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
engine::IDNumbers vector_ids;
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
engine::TableIndex index;
stat = db_->CreateIndex(TABLE_NAME, index);
db_->Size(size);
ASSERT_NE(size, 0UL);
std::vector<engine::meta::DateT> dates;
std::string start_value = CurrentTmDate(-3);
std::string end_value = CurrentTmDate(-2);
std::string start_value = CurrentTmDate();
std::string end_value = CurrentTmDate(1);
ConvertTimeRangeToDBDates(start_value, end_value, dates);
db_->DeleteTable(TABLE_NAME, dates);
stat = db_->DeleteTable(TABLE_NAME, dates);
ASSERT_STATS(stat);
uint64_t row_count = 0;
db_->GetTableRowCount(TABLE_NAME, row_count);
ASSERT_EQ(row_count, 0UL);
}
\ No newline at end of file
......@@ -74,21 +74,21 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
ASSERT_EQ(table_file.file_type_, new_file_type);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
dates.push_back(utils::GetDate());
status = impl_->DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.ok());
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
dates.push_back(utils::GetDateWithDelta(-1*i));
}
status = impl_->DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_TRUE(status.ok());
table_file.date_ = meta::Meta::GetDateWithDelta(-2);
table_file.date_ = utils::GetDateWithDelta(-2);
status = impl_->UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2));
ASSERT_EQ(table_file.date_, utils::GetDateWithDelta(-2));
ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE);
dates.clear();
......
......@@ -105,7 +105,7 @@ TEST(DBMiscTest, META_TEST) {
time_t tt;
time( &tt );
int delta = 10;
engine::meta::DateT dt = impl.GetDate(tt, delta);
engine::meta::DateT dt = engine::utils::GetDate(tt, delta);
ASSERT_GT(dt, 0);
}
......
......@@ -90,7 +90,7 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
meta::DatesT dates;
dates.push_back(meta::Meta::GetDate());
dates.push_back(utils::GetDate());
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_FALSE(status.ok());
......@@ -110,15 +110,15 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
dates.clear();
for (auto i=2; i < 10; ++i) {
dates.push_back(meta::Meta::GetDateWithDelta(-1*i));
dates.push_back(utils::GetDateWithDelta(-1*i));
}
status = impl.DropPartitionsByDates(table_file.table_id_, dates);
ASSERT_TRUE(status.ok());
table_file.date_ = meta::Meta::GetDateWithDelta(-2);
table_file.date_ = utils::GetDateWithDelta(-2);
status = impl.UpdateTableFile(table_file);
ASSERT_TRUE(status.ok());
ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2));
ASSERT_EQ(table_file.date_, utils::GetDateWithDelta(-2));
ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE);
dates.clear();
......
......@@ -8,23 +8,29 @@
#include <easylogging++.h>
#include <wrapper/knowhere/vec_index.h>
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "utils.h"
INITIALIZE_EASYLOGGINGPP
using namespace zilliz::milvus::engine;
using namespace zilliz::knowhere;
//using namespace zilliz::knowhere;
using ::testing::TestWithParam;
using ::testing::Values;
using ::testing::Combine;
constexpr int64_t DIM = 512;
constexpr int64_t NB = 1000000;
class KnowhereWrapperTest
: public TestWithParam<::std::tuple<IndexType, std::string, int, int, int, int, Config, Config>> {
protected:
void SetUp() override {
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0);
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(1);
std::string generator_type;
std::tie(index_type, generator_type, dim, nb, nq, k, train_cfg, search_cfg) = GetParam();
......@@ -66,8 +72,8 @@ class KnowhereWrapperTest
Config train_cfg;
Config search_cfg;
int dim = 64;
int nb = 10000;
int dim = DIM;
int nb = NB;
int nq = 10;
int k = 10;
std::vector<float> xb;
......@@ -94,27 +100,27 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
// Config::object{{"nlist", 100}, {"dim", 64}},
// Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 40}}
//),
std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default",
64, 100000, 10, 10,
Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}},
Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}}
),
std::make_tuple(IndexType::FAISS_IDMAP, "Default",
64, 100000, 10, 10,
Config::object{{"dim", 64}, {"metric_type", "L2"}},
Config::object{{"dim", 64}, {"k", 10}}
),
// std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default",
// 64, 100000, 10, 10,
// Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}},
// Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}}
// ),
// std::make_tuple(IndexType::FAISS_IDMAP, "Default",
// 64, 100000, 10, 10,
// Config::object{{"dim", 64}, {"metric_type", "L2"}},
// Config::object{{"dim", 64}, {"k", 10}}
// ),
std::make_tuple(IndexType::FAISS_IVFSQ8_MIX, "Default",
64, 100000, 10, 10,
Config::object{{"dim", 64}, {"nlist", 1000}, {"nbits", 8}, {"metric_type", "L2"}},
Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}}
),
std::make_tuple(IndexType::NSG_MIX, "Default",
128, 250000, 10, 10,
Config::object{{"dim", 128}, {"nlist", 8192}, {"nprobe", 16}, {"metric_type", "L2"},
{"knng", 200}, {"search_length", 40}, {"out_degree", 60}, {"candidate_pool_size", 200}},
Config::object{{"k", 10}, {"search_length", 20}}
DIM, NB, 10, 10,
Config::object{{"dim", DIM}, {"nlist", 1000}, {"nbits", 8}, {"metric_type", "L2"}},
Config::object{{"dim", DIM}, {"k", 10}, {"nprobe", 5}}
)
// std::make_tuple(IndexType::NSG_MIX, "Default",
// 128, 250000, 10, 10,
// Config::object{{"dim", 128}, {"nlist", 8192}, {"nprobe", 16}, {"metric_type", "L2"},
// {"knng", 200}, {"search_length", 40}, {"out_degree", 60}, {"candidate_pool_size", 200}},
// Config::object{{"k", 10}, {"search_length", 20}}
// )
//std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default",
// 64, 10000, 10, 10,
// Config::object{{"TPTNumber", 1}, {"dim", 64}},
......@@ -135,6 +141,31 @@ TEST_P(KnowhereWrapperTest, base_test) {
AssertResult(res_ids, res_dis);
}
TEST_P(KnowhereWrapperTest, to_gpu_test) {
EXPECT_EQ(index_->GetType(), index_type);
auto elems = nq * k;
std::vector<int64_t> res_ids(elems);
std::vector<float> res_dis(elems);
index_->BuildAll(nb, xb.data(), ids.data(), train_cfg);
index_->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg);
AssertResult(res_ids, res_dis);
{
index_->CopyToGpu(1);
}
std::string file_location = "/tmp/whatever";
write_index(index_, file_location);
auto new_index = read_index(file_location);
auto dev_idx = new_index->CopyToGpu(1);
for (int i = 0; i < 10000; ++i) {
dev_idx->Search(nq, xq.data(), res_dis.data(), res_ids.data(), search_cfg);
}
AssertResult(res_ids, res_dis);
}
TEST_P(KnowhereWrapperTest, serialize) {
EXPECT_EQ(index_->GetType(), index_type);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册