Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
7bfa68fa
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7bfa68fa
编写于
9月 02, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
差异文件
fix confilct
Former-commit-id: ac53028413896a6d4a1783b34660c7915c47af0e
上级
244c73f8
d1edbf70
变更
38
隐藏空白更改
内联
并排
Showing
38 changed file
with
459 addition
and
391 deletion
+459
-391
ci/jenkinsfile/milvus_build.groovy
ci/jenkinsfile/milvus_build.groovy
+4
-1
ci/jenkinsfile/milvus_build_no_ut.groovy
ci/jenkinsfile/milvus_build_no_ut.groovy
+3
-1
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+4
-0
cpp/build.sh
cpp/build.sh
+9
-8
cpp/cmake/ThirdPartyPackages.cmake
cpp/cmake/ThirdPartyPackages.cmake
+2
-1
cpp/conf/server_config.template
cpp/conf/server_config.template
+11
-4
cpp/src/core/cmake/ThirdPartyPackages.cmake
cpp/src/core/cmake/ThirdPartyPackages.cmake
+3
-2
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
+13
-0
cpp/src/db/DB.h
cpp/src/db/DB.h
+3
-0
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+42
-15
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+12
-12
cpp/src/db/Utils.cpp
cpp/src/db/Utils.cpp
+27
-2
cpp/src/db/Utils.h
cpp/src/db/Utils.h
+4
-1
cpp/src/db/meta/Meta.cpp
cpp/src/db/meta/Meta.cpp
+0
-50
cpp/src/db/meta/Meta.h
cpp/src/db/meta/Meta.h
+34
-70
cpp/src/db/meta/MetaTypes.h
cpp/src/db/meta/MetaTypes.h
+2
-1
cpp/src/db/meta/MySQLMetaImpl.cpp
cpp/src/db/meta/MySQLMetaImpl.cpp
+15
-23
cpp/src/db/meta/MySQLMetaImpl.h
cpp/src/db/meta/MySQLMetaImpl.h
+1
-2
cpp/src/db/meta/SqliteMetaImpl.cpp
cpp/src/db/meta/SqliteMetaImpl.cpp
+13
-20
cpp/src/db/meta/SqliteMetaImpl.h
cpp/src/db/meta/SqliteMetaImpl.h
+1
-2
cpp/src/metrics/Metrics.h
cpp/src/metrics/Metrics.h
+24
-21
cpp/src/scheduler/SchedInst.cpp
cpp/src/scheduler/SchedInst.cpp
+11
-1
cpp/src/scheduler/SchedInst.h
cpp/src/scheduler/SchedInst.h
+4
-1
cpp/src/server/DBWrapper.cpp
cpp/src/server/DBWrapper.cpp
+17
-3
cpp/src/server/DBWrapper.h
cpp/src/server/DBWrapper.h
+18
-6
cpp/src/server/Server.cpp
cpp/src/server/Server.cpp
+6
-1
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
+0
-2
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
+31
-36
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
+15
-22
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
+15
-19
cpp/src/wrapper/knowhere/vec_impl.cpp
cpp/src/wrapper/knowhere/vec_impl.cpp
+2
-1
cpp/src/wrapper/knowhere/vec_index.cpp
cpp/src/wrapper/knowhere/vec_index.cpp
+20
-2
cpp/src/wrapper/knowhere/vec_index.h
cpp/src/wrapper/knowhere/vec_index.h
+3
-0
cpp/unittest/db/db_tests.cpp
cpp/unittest/db/db_tests.cpp
+27
-29
cpp/unittest/db/meta_tests.cpp
cpp/unittest/db/meta_tests.cpp
+5
-5
cpp/unittest/db/misc_test.cpp
cpp/unittest/db/misc_test.cpp
+1
-1
cpp/unittest/db/mysql_meta_test.cpp
cpp/unittest/db/mysql_meta_test.cpp
+4
-4
cpp/unittest/knowhere/knowhere_test.cpp
cpp/unittest/knowhere/knowhere_test.cpp
+53
-22
未找到文件。
ci/jenkinsfile/milvus_build.groovy
浏览文件 @
7bfa68fa
...
...
@@ -3,13 +3,16 @@ container('milvus-build-env') {
gitlabCommitStatus
(
name:
'Build Engine'
)
{
dir
(
"milvus_engine"
)
{
try
{
def
knowhere_build_dir
=
"${env.WORKSPACE}/milvus_engine/cpp/
thirdparty/knowhe
re/cmake_build"
def
knowhere_build_dir
=
"${env.WORKSPACE}/milvus_engine/cpp/
src/co
re/cmake_build"
checkout
([
$class
:
'GitSCM'
,
branches:
[[
name:
"${SEMVER}"
]],
doGenerateSubmoduleConfigurations:
false
,
extensions:
[[
$class
:
'SubmoduleOption'
,
disableSubmodules:
false
,
parentCredentials:
true
,
recursiveSubmodules:
true
,
reference:
''
,
trackingSubmodules:
false
]],
submoduleCfg:
[],
userRemoteConfigs:
[[
credentialsId:
"${params.GIT_USER}"
,
url:
"git@192.168.1.105:megasearch/milvus.git"
,
name:
'origin'
,
refspec:
"+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"
]]])
/*
dir ("cpp/thirdparty/knowhere") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j"
}
*/
dir
(
"cpp"
)
{
sh
"git config --global user.email \"test@zilliz.com\""
...
...
ci/jenkinsfile/milvus_build_no_ut.groovy
浏览文件 @
7bfa68fa
...
...
@@ -3,14 +3,16 @@ container('milvus-build-env') {
gitlabCommitStatus
(
name:
'Build Engine'
)
{
dir
(
"milvus_engine"
)
{
try
{
def
knowhere_build_dir
=
"${env.WORKSPACE}/milvus_engine/cpp/
thirdparty/knowhe
re/cmake_build"
def
knowhere_build_dir
=
"${env.WORKSPACE}/milvus_engine/cpp/
src/co
re/cmake_build"
checkout
([
$class
:
'GitSCM'
,
branches:
[[
name:
"${SEMVER}"
]],
doGenerateSubmoduleConfigurations:
false
,
extensions:
[[
$class
:
'SubmoduleOption'
,
disableSubmodules:
false
,
parentCredentials:
true
,
recursiveSubmodules:
true
,
reference:
''
,
trackingSubmodules:
false
]],
submoduleCfg:
[],
userRemoteConfigs:
[[
credentialsId:
"${params.GIT_USER}"
,
url:
"git@192.168.1.105:megasearch/milvus.git"
,
name:
'origin'
,
refspec:
"+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"
]]])
/*
dir ("cpp/thirdparty/knowhere") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/knowhere.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh "./build.sh -t ${params.BUILD_TYPE} -p ${knowhere_build_dir} -j"
}
*/
dir
(
"cpp"
)
{
sh
"git config --global user.email \"test@zilliz.com\""
...
...
cpp/CHANGELOG.md
浏览文件 @
7bfa68fa
...
...
@@ -16,6 +16,9 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9))
-
MS-430 - Search no result if index created with FLAT
-
MS-443 - Create index hang again
-
MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8
-
MS-450 - server hang after run stop_server.sh
-
MS-449 - Add vectors twice success, once with ids, the other no ids
## Improvement
-
MS-327 - Clean code for milvus
...
...
@@ -70,6 +73,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-440 - Add DumpTaskTables in sdk
-
MS-442 - Merge Knowhere
-
MS-445 - Rename CopyCompleted to LoadCompleted
-
MS-451 - Update server_config.template file, set GPU compute default
## New Feature
-
MS-343 - Implement ResourceMgr
...
...
cpp/build.sh
浏览文件 @
7bfa68fa
...
...
@@ -2,7 +2,6 @@
BUILD_TYPE
=
"Debug"
BUILD_UNITTEST
=
"OFF"
LICENSE_CHECK
=
"OFF"
INSTALL_PREFIX
=
$(
pwd
)
/milvus
MAKE_CLEAN
=
"OFF"
BUILD_COVERAGE
=
"OFF"
...
...
@@ -11,12 +10,14 @@ PROFILING="OFF"
BUILD_FAISS_WITH_MKL
=
"OFF"
USE_JFROG_CACHE
=
"OFF"
KNOWHERE_BUILD_DIR
=
"
`
pwd
`
/src/core/cmake_build"
KNOWHERE_OPTIONS
=
"-t
${
BUILD_TYPE
}
"
while
getopts
"p:d:t:k:uh
l
rcgmj"
arg
while
getopts
"p:d:t:k:uhrcgmj"
arg
do
case
$arg
in
t
)
BUILD_TYPE
=
$OPTARG
# BUILD_TYPE
KNOWHERE_OPTIONS
=
"-t
${
BUILD_TYPE
}
"
;;
u
)
echo
"Build and run unittest cases"
;
...
...
@@ -28,9 +29,6 @@ do
d
)
DB_PATH
=
$OPTARG
;;
l
)
LICENSE_CHECK
=
"ON"
;;
r
)
if
[[
-d
cmake_build
]]
;
then
rm
./cmake_build
-r
...
...
@@ -51,6 +49,7 @@ do
;;
j
)
USE_JFROG_CACHE
=
"ON"
KNOWHERE_OPTIONS
=
"
${
KNOWHERE_OPTIONS
}
-j"
;;
h
)
# help
echo
"
...
...
@@ -60,7 +59,6 @@ parameter:
-u: building unit test options(default: OFF)
-p: install prefix(default:
$(
pwd
)
/milvus)
-d: db path(default: /opt/milvus)
-l: build license version(default: OFF)
-r: remove previous build directory(default: OFF)
-c: code coverage(default: OFF)
-g: profiling(default: OFF)
...
...
@@ -85,8 +83,12 @@ if [[ ! -d cmake_build ]]; then
MAKE_CLEAN
=
"ON"
fi
pushd
`
pwd
`
/src/core
./build.sh
${
KNOWHERE_OPTIONS
}
popd
cd
cmake_build
git
CUDA_COMPILER
=
/usr/local/cuda/bin/nvcc
if
[[
${
MAKE_CLEAN
}
==
"ON"
]]
;
then
...
...
@@ -94,7 +96,6 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then
-DCMAKE_INSTALL_PREFIX=
${
INSTALL_PREFIX
}
-DCMAKE_BUILD_TYPE=
${
BUILD_TYPE
}
\
-DCMAKE_CUDA_COMPILER=
${
CUDA_COMPILER
}
\
-DCMAKE_LICENSE_CHECK=
${
LICENSE_CHECK
}
\
-DBUILD_COVERAGE=
${
BUILD_COVERAGE
}
\
-DMILVUS_DB_PATH=
${
DB_PATH
}
\
-DMILVUS_ENABLE_PROFILING=
${
PROFILING
}
\
...
...
cpp/cmake/ThirdPartyPackages.cmake
浏览文件 @
7bfa68fa
...
...
@@ -309,7 +309,8 @@ else()
# set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
endif
()
set
(
FAISS_MD5
"a589663865a8558205533c8ac414278c"
)
# set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
set
(
FAISS_MD5
"31167ecbd1903fec600dc4ac00b9be9e"
)
if
(
DEFINED ENV{MILVUS_KNOWHERE_URL}
)
set
(
KNOWHERE_SOURCE_URL
"$ENV{MILVUS_KNOWHERE_URL}"
)
...
...
cpp/conf/server_config.template
浏览文件 @
7bfa68fa
...
...
@@ -64,9 +64,9 @@ resource_config:
memory: 64
device_id: 0
enable_loader: true
enable_executor:
tru
e
enable_executor:
fals
e
g
tx106
0:
g
pu
0:
type: GPU
memory: 6
device_id: 0
...
...
@@ -80,10 +80,17 @@ resource_config:
enable_loader: false
enable_executor: false
# gtx1660:
# type: GPU
# memory: 6
# device_id: 1
# enable_loader: true
# enable_executor: true
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
connections:
- ssda===cpu
- cpu===g
tx106
0
- cpu===gtx1660
- cpu===g
pu
0
#
- cpu===gtx1660
cpp/src/core/cmake/ThirdPartyPackages.cmake
浏览文件 @
7bfa68fa
...
...
@@ -260,7 +260,8 @@ else()
# set(FAISS_SOURCE_URL "${CMAKE_SOURCE_DIR}/thirdparty/faiss-1.5.3")
message
(
STATUS
${
FAISS_SOURCE_URL
}
)
endif
()
set
(
FAISS_MD5
"a589663865a8558205533c8ac414278c"
)
# set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
set
(
FAISS_MD5
"31167ecbd1903fec600dc4ac00b9be9e"
)
if
(
DEFINED ENV{KNOWHERE_ARROW_URL}
)
set
(
ARROW_SOURCE_URL
"$ENV{KNOWHERE_ARROW_URL}"
)
...
...
@@ -924,7 +925,7 @@ macro(build_faiss)
if
(
USE_JFROG_CACHE STREQUAL
"ON"
)
# Check_Last_Modify("${CMAKE_SOURCE_DIR}/thirdparty/faiss_cache_check_lists.txt" "${CMAKE_SOURCE_DIR}" FAISS_LAST_MODIFIED_COMMIT_ID)
string
(
MD5 FAISS_COMBINE_MD5
"
${
FAISS_MD5
}${
LAPACK_MD5
}${
OPENBLAS_MD5
}
"
)
string
(
MD5 FAISS_COMBINE_MD5
"
${
FAISS_LAST_MODIFIED_COMMIT_ID
}${
LAPACK_MD5
}${
OPENBLAS_MD5
}
"
)
#
string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}")
set
(
FAISS_CACHE_PACKAGE_NAME
"faiss_
${
FAISS_COMBINE_MD5
}
.tar.gz"
)
set
(
FAISS_CACHE_URL
"
${
JFROG_ARTFACTORY_CACHE_URL
}
/
${
FAISS_CACHE_PACKAGE_NAME
}
"
)
set
(
FAISS_CACHE_PACKAGE_PATH
"
${
THIRDPARTY_PACKAGE_CACHE
}
/
${
FAISS_CACHE_PACKAGE_NAME
}
"
)
...
...
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
0 → 100644
浏览文件 @
7bfa68fa
# source
src/
include/
# third party
thirdparty/
# cmake
cmake/
CMakeLists.txt
# script
build.sh
\ No newline at end of file
cpp/src/db/DB.h
浏览文件 @
7bfa68fa
...
...
@@ -22,6 +22,9 @@ class DB {
public:
static
void
Open
(
const
Options
&
options
,
DB
**
dbptr
);
virtual
Status
Start
()
=
0
;
virtual
Status
Stop
()
=
0
;
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
=
0
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
7bfa68fa
...
...
@@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
DBImpl
::
DBImpl
(
const
Options
&
options
)
:
options_
(
options
),
shutting_down_
(
fals
e
),
shutting_down_
(
tru
e
),
compact_thread_pool_
(
1
,
1
),
index_thread_pool_
(
1
,
1
)
{
meta_ptr_
=
DBMetaImplFactory
::
Build
(
options
.
meta
,
options
.
mode
);
mem_mgr_
=
MemManagerFactory
::
Build
(
meta_ptr_
,
options_
);
if
(
options
.
mode
!=
Options
::
MODE
::
READ_ONLY
)
{
Start
();
}
DBImpl
::~
DBImpl
()
{
Stop
();
}
Status
DBImpl
::
Start
()
{
if
(
!
shutting_down_
.
load
(
std
::
memory_order_acquire
)){
return
Status
::
OK
();
}
//for distribute version, some nodes are read only
if
(
options_
.
mode
!=
Options
::
MODE
::
READ_ONLY
)
{
ENGINE_LOG_TRACE
<<
"StartTimerTasks"
;
StartTimerTasks
(
);
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
}
shutting_down_
.
store
(
false
,
std
::
memory_order_release
);
return
Status
::
OK
();
}
Status
DBImpl
::
Stop
()
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
)){
return
Status
::
OK
();
}
shutting_down_
.
store
(
true
,
std
::
memory_order_release
);
bg_timer_thread_
.
join
();
//wait compaction/buildindex finish
for
(
auto
&
result
:
compact_thread_results_
)
{
result
.
wait
();
}
for
(
auto
&
result
:
index_thread_results_
)
{
result
.
wait
();
}
//makesure all memory data serialized
MemSerialize
();
return
Status
::
OK
();
}
Status
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
...
...
@@ -162,7 +200,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
const
float
*
vectors
,
QueryResults
&
results
)
{
server
::
CollectQueryMetrics
metrics
(
nq
);
meta
::
DatesT
dates
=
{
meta
::
Meta
::
GetDate
()};
meta
::
DatesT
dates
=
{
utils
::
GetDate
()};
Status
result
=
Query
(
table_id
,
k
,
nq
,
nprobe
,
vectors
,
dates
,
results
);
return
result
;
...
...
@@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return
Status
::
OK
();
}
void
DBImpl
::
StartTimerTasks
()
{
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
}
void
DBImpl
::
BackgroundTimerTask
()
{
Status
status
;
server
::
SystemInfo
::
GetInstance
().
Init
();
...
...
@@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) {
return
meta_ptr_
->
Size
(
result
);
}
DBImpl
::~
DBImpl
()
{
shutting_down_
.
store
(
true
,
std
::
memory_order_release
);
bg_timer_thread_
.
join
();
std
::
set
<
std
::
string
>
ids
;
mem_mgr_
->
Serialize
(
ids
);
}
}
// namespace engine
}
// namespace milvus
}
// namespace zilliz
cpp/src/db/DBImpl.h
浏览文件 @
7bfa68fa
...
...
@@ -36,6 +36,9 @@ class DBImpl : public DB {
explicit
DBImpl
(
const
Options
&
options
);
Status
Start
()
override
;
Status
Stop
()
override
;
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
override
;
...
...
@@ -91,18 +94,15 @@ class DBImpl : public DB {
~
DBImpl
()
override
;
private:
Status
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
void
StartTimerTasks
();
Status
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
void
BackgroundTimerTask
();
void
StartMetricTask
();
...
...
cpp/src/db/Utils.cpp
浏览文件 @
7bfa68fa
...
...
@@ -152,8 +152,33 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&&
index1
.
metric_type_
==
index2
.
metric_type_
;
}
bool
UserDefinedId
(
int64_t
flag
)
{
return
flag
&
meta
::
FLAG_MASK_USERID
;
meta
::
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
)
{
struct
tm
ltm
;
localtime_r
(
&
t
,
&
ltm
);
if
(
day_delta
>
0
)
{
do
{
++
ltm
.
tm_mday
;
--
day_delta
;
}
while
(
day_delta
>
0
);
mktime
(
&
ltm
);
}
else
if
(
day_delta
<
0
)
{
do
{
--
ltm
.
tm_mday
;
++
day_delta
;
}
while
(
day_delta
<
0
);
mktime
(
&
ltm
);
}
else
{
ltm
.
tm_mday
;
}
return
ltm
.
tm_year
*
10000
+
ltm
.
tm_mon
*
100
+
ltm
.
tm_mday
;
}
meta
::
DateT
GetDateWithDelta
(
int
day_delta
)
{
return
GetDate
(
std
::
time
(
nullptr
),
day_delta
);
}
meta
::
DateT
GetDate
()
{
return
GetDate
(
std
::
time
(
nullptr
),
0
);
}
}
// namespace utils
...
...
cpp/src/db/Utils.h
浏览文件 @
7bfa68fa
...
...
@@ -10,6 +10,7 @@
#include "db/Types.h"
#include <string>
#include <ctime>
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -27,7 +28,9 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool
IsSameIndex
(
const
TableIndex
&
index1
,
const
TableIndex
&
index2
);
bool
UserDefinedId
(
int64_t
flag
);
meta
::
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
=
0
);
meta
::
DateT
GetDate
();
meta
::
DateT
GetDateWithDelta
(
int
day_delta
);
}
// namespace utils
}
// namespace engine
...
...
cpp/src/db/meta/Meta.cpp
已删除
100644 → 0
浏览文件 @
244c73f8
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Meta.h"
#include <ctime>
#include <stdio.h>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
namespace
meta
{
Meta
::~
Meta
()
=
default
;
DateT
Meta
::
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
)
{
struct
tm
ltm
;
localtime_r
(
&
t
,
&
ltm
);
if
(
day_delta
>
0
)
{
do
{
++
ltm
.
tm_mday
;
--
day_delta
;
}
while
(
day_delta
>
0
);
mktime
(
&
ltm
);
}
else
if
(
day_delta
<
0
)
{
do
{
--
ltm
.
tm_mday
;
++
day_delta
;
}
while
(
day_delta
<
0
);
mktime
(
&
ltm
);
}
else
{
ltm
.
tm_mday
;
}
return
ltm
.
tm_year
*
10000
+
ltm
.
tm_mon
*
100
+
ltm
.
tm_mday
;
}
DateT
Meta
::
GetDateWithDelta
(
int
day_delta
)
{
return
GetDate
(
std
::
time
(
nullptr
),
day_delta
);
}
DateT
Meta
::
GetDate
()
{
return
GetDate
(
std
::
time
(
nullptr
),
0
);
}
}
// namespace meta
}
// namespace engine
}
// namespace milvus
}
// namespace zilliz
cpp/src/db/meta/Meta.h
浏览文件 @
7bfa68fa
...
...
@@ -11,7 +11,6 @@
#include "db/Types.h"
#include <cstddef>
#include <ctime>
#include <memory>
namespace
zilliz
{
...
...
@@ -19,105 +18,70 @@ namespace milvus {
namespace
engine
{
namespace
meta
{
class
Meta
{
public:
using
Ptr
=
std
::
shared_ptr
<
Meta
>
;
virtual
~
Meta
()
=
0
;
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
DescribeTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
=
0
;
virtual
~
Meta
()
=
default
;
virtual
Status
AllTables
(
std
::
vector
<
TableSchema
>
&
table_schema_array
)
=
0
;
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
=
0
;
virtual
Status
DescribeTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
AllTables
(
std
::
vector
<
TableSchema
>
&
table_schema_array
)
=
0
;
virtual
Status
DeleteTableFiles
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
=
0
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
=
0
;
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
GetTableFiles
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
=
0
;
virtual
Status
DeleteTableFiles
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
UpdateTableFilesToIndex
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
UpdateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
=
0
;
virtual
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
=
0
;
virtual
Status
GetTableFiles
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
=
0
;
virtual
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
UpdateTableFilesToIndex
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
UpdateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
Size
(
uint64_t
&
result
)
=
0
;
virtual
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
=
0
;
virtual
Status
Archive
()
=
0
;
virtual
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
FilesToIndex
(
TableFilesSchema
&
)
=
0
;
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>&
file_ids
)
=
0
;
virtual
Status
Size
(
uint64_t
&
result
)
=
0
;
virtual
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
=
0
;
virtual
Status
Archive
()
=
0
;
virtual
Status
DropTableIndex
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
FilesToIndex
(
TableFilesSchema
&
)
=
0
;
virtual
Status
CleanUp
()
=
0
;
virtual
Status
FilesByType
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
int
>
&
file_types
,
std
::
vector
<
std
::
string
>&
file_ids
)
=
0
;
virtual
Status
CleanUpFilesWithTTL
(
uint16_t
)
=
0
;
virtual
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
=
0
;
virtual
Status
DropAll
()
=
0
;
virtual
Status
DropTableIndex
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
=
0
;
virtual
Status
CleanUp
()
=
0
;
static
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
=
0
);
virtual
Status
CleanUpFilesWithTTL
(
uint16_t
)
=
0
;
static
DateT
GetDate
();
virtual
Status
DropAll
()
=
0
;
static
DateT
GetDateWithDelta
(
int
day_delta
);
virtual
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
=
0
;
};
// MetaData
...
...
cpp/src/db/meta/MetaTypes.h
浏览文件 @
7bfa68fa
...
...
@@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr
int32_t
DEFAULT_METRIC_TYPE
=
(
int
)
MetricType
::
L2
;
constexpr
int32_t
DEFAULT_INDEX_FILE_SIZE
=
ONE_GB
;
constexpr
int64_t
FLAG_MASK_USERID
=
1
;
constexpr
int64_t
FLAG_MASK_NO_USERID
=
0x1
;
constexpr
int64_t
FLAG_MASK_HAS_USERID
=
0x1
<<
1
;
typedef
int
DateT
;
const
DateT
EmptyDate
=
-
1
;
...
...
cpp/src/db/meta/MySQLMetaImpl.cpp
浏览文件 @
7bfa68fa
...
...
@@ -41,6 +41,18 @@ Status HandleException(const std::string &desc, std::exception &e) {
}
MySQLMetaImpl
::
MySQLMetaImpl
(
const
DBMetaOptions
&
options_
,
const
int
&
mode
)
:
options_
(
options_
),
mode_
(
mode
)
{
Initialize
();
}
MySQLMetaImpl
::~
MySQLMetaImpl
()
{
if
(
mode_
!=
Options
::
MODE
::
READ_ONLY
)
{
CleanUp
();
}
}
Status
MySQLMetaImpl
::
NextTableId
(
std
::
string
&
table_id
)
{
std
::
stringstream
ss
;
SimpleIDGenerator
g
;
...
...
@@ -57,12 +69,6 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) {
return
Status
::
OK
();
}
MySQLMetaImpl
::
MySQLMetaImpl
(
const
DBMetaOptions
&
options_
,
const
int
&
mode
)
:
options_
(
options_
),
mode_
(
mode
)
{
Initialize
();
}
Status
MySQLMetaImpl
::
Initialize
()
{
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
...
...
@@ -202,15 +208,6 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
try
{
auto
yesterday
=
GetDateWithDelta
(
-
1
);
for
(
auto
&
date
:
dates
)
{
if
(
date
>=
yesterday
)
{
return
Status
::
Error
(
"Could not delete partitions within 2 days"
);
}
}
std
::
stringstream
dateListSS
;
for
(
auto
&
date
:
dates
)
{
dateListSS
<<
std
::
to_string
(
date
)
<<
", "
;
...
...
@@ -229,7 +226,8 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
Query
dropPartitionsByDatesQuery
=
connectionPtr
->
query
();
dropPartitionsByDatesQuery
<<
"UPDATE TableFiles "
<<
"SET file_type = "
<<
std
::
to_string
(
TableFileSchema
::
TO_DELETE
)
<<
" "
<<
"SET file_type = "
<<
std
::
to_string
(
TableFileSchema
::
TO_DELETE
)
<<
","
<<
"updated_time = "
<<
utils
::
GetMicroSecTimeStamp
()
<<
" "
<<
"WHERE table_id = "
<<
quote
<<
table_id
<<
" AND "
<<
"date in ("
<<
dateListStr
<<
");"
;
...
...
@@ -877,7 +875,7 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
Status
MySQLMetaImpl
::
CreateTableFile
(
TableFileSchema
&
file_schema
)
{
if
(
file_schema
.
date_
==
EmptyDate
)
{
file_schema
.
date_
=
Meta
::
GetDate
();
file_schema
.
date_
=
utils
::
GetDate
();
}
TableSchema
table_schema
;
table_schema
.
table_id_
=
file_schema
.
table_id_
;
...
...
@@ -2031,12 +2029,6 @@ Status MySQLMetaImpl::DropAll() {
return
Status
::
OK
();
}
MySQLMetaImpl
::~
MySQLMetaImpl
()
{
if
(
mode_
!=
Options
::
MODE
::
READ_ONLY
)
{
CleanUp
();
}
}
}
// namespace meta
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/meta/MySQLMetaImpl.h
浏览文件 @
7bfa68fa
...
...
@@ -24,6 +24,7 @@ using namespace mysqlpp;
class
MySQLMetaImpl
:
public
Meta
{
public:
MySQLMetaImpl
(
const
DBMetaOptions
&
options_
,
const
int
&
mode
);
~
MySQLMetaImpl
();
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
...
...
@@ -86,8 +87,6 @@ class MySQLMetaImpl : public Meta {
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
override
;
virtual
~
MySQLMetaImpl
();
private:
Status
NextFileId
(
std
::
string
&
file_id
);
Status
NextTableId
(
std
::
string
&
table_id
);
...
...
cpp/src/db/meta/SqliteMetaImpl.cpp
浏览文件 @
7bfa68fa
...
...
@@ -68,6 +68,15 @@ using ConnectorT = decltype(StoragePrototype(""));
static
std
::
unique_ptr
<
ConnectorT
>
ConnectorPtr
;
using
ConditionT
=
decltype
(
c
(
&
TableFileSchema
::
id_
)
==
1UL
);
SqliteMetaImpl
::
SqliteMetaImpl
(
const
DBMetaOptions
&
options_
)
:
options_
(
options_
)
{
Initialize
();
}
SqliteMetaImpl
::~
SqliteMetaImpl
()
{
CleanUp
();
}
Status
SqliteMetaImpl
::
NextTableId
(
std
::
string
&
table_id
)
{
std
::
stringstream
ss
;
SimpleIDGenerator
g
;
...
...
@@ -84,11 +93,6 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
return
Status
::
OK
();
}
SqliteMetaImpl
::
SqliteMetaImpl
(
const
DBMetaOptions
&
options_
)
:
options_
(
options_
)
{
Initialize
();
}
Status
SqliteMetaImpl
::
Initialize
()
{
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
...
...
@@ -111,7 +115,7 @@ Status SqliteMetaImpl::Initialize() {
// PXU TODO: Temp solution. Will fix later
Status
SqliteMetaImpl
::
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
{
const
DatesT
&
dates
)
{
if
(
dates
.
size
()
==
0
)
{
return
Status
::
OK
();
}
...
...
@@ -124,20 +128,13 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
}
try
{
auto
yesterday
=
GetDateWithDelta
(
-
1
);
for
(
auto
&
date
:
dates
)
{
if
(
date
>=
yesterday
)
{
return
Status
::
Error
(
"Could not delete partitions with 2 days"
);
}
}
//multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std
::
lock_guard
<
std
::
mutex
>
meta_lock
(
meta_mutex_
);
ConnectorPtr
->
update_all
(
set
(
c
(
&
TableFileSchema
::
file_type_
)
=
(
int
)
TableFileSchema
::
TO_DELETE
c
(
&
TableFileSchema
::
file_type_
)
=
(
int
)
TableFileSchema
::
TO_DELETE
,
c
(
&
TableFileSchema
::
updated_time_
)
=
utils
::
GetMicroSecTimeStamp
()
),
where
(
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
and
...
...
@@ -543,7 +540,7 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
Status
SqliteMetaImpl
::
CreateTableFile
(
TableFileSchema
&
file_schema
)
{
if
(
file_schema
.
date_
==
EmptyDate
)
{
file_schema
.
date_
=
Meta
::
GetDate
();
file_schema
.
date_
=
utils
::
GetDate
();
}
TableSchema
table_schema
;
table_schema
.
table_id_
=
file_schema
.
table_id_
;
...
...
@@ -1214,10 +1211,6 @@ Status SqliteMetaImpl::DropAll() {
return
Status
::
OK
();
}
SqliteMetaImpl
::~
SqliteMetaImpl
()
{
CleanUp
();
}
}
// namespace meta
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/meta/SqliteMetaImpl.h
浏览文件 @
7bfa68fa
...
...
@@ -20,6 +20,7 @@ auto StoragePrototype(const std::string &path);
class
SqliteMetaImpl
:
public
Meta
{
public:
explicit
SqliteMetaImpl
(
const
DBMetaOptions
&
options_
);
~
SqliteMetaImpl
();
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
...
...
@@ -80,8 +81,6 @@ class SqliteMetaImpl : public Meta {
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
override
;
~
SqliteMetaImpl
()
override
;
private:
Status
NextFileId
(
std
::
string
&
file_id
);
Status
NextTableId
(
std
::
string
&
table_id
);
...
...
cpp/src/metrics/Metrics.h
浏览文件 @
7bfa68fa
...
...
@@ -37,21 +37,22 @@ public:
}
~
CollectInsertMetrics
()
{
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time_
,
end_time
);
double
avg_time
=
total_time
/
n_
;
for
(
int
i
=
0
;
i
<
n_
;
++
i
)
{
Metrics
::
GetInstance
().
AddVectorsDurationHistogramOberve
(
avg_time
);
}
if
(
n_
>
0
)
{
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time_
,
end_time
);
double
avg_time
=
total_time
/
n_
;
for
(
int
i
=
0
;
i
<
n_
;
++
i
)
{
Metrics
::
GetInstance
().
AddVectorsDurationHistogramOberve
(
avg_time
);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if
(
status_
.
ok
())
{
server
::
Metrics
::
GetInstance
().
AddVectorsSuccessTotalIncrement
(
n_
);
server
::
Metrics
::
GetInstance
().
AddVectorsSuccessGaugeSet
(
n_
);
}
else
{
server
::
Metrics
::
GetInstance
().
AddVectorsFailTotalIncremen
t
(
n_
);
server
::
Metrics
::
GetInstance
().
AddVectorsFailGaugeSet
(
n_
);
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if
(
status_
.
ok
())
{
server
::
Metrics
::
GetInstance
().
AddVectorsSuccessTotalIncrement
(
n_
);
server
::
Metrics
::
GetInstance
().
AddVectorsSuccessGaugeSet
(
n_
);
}
else
{
server
::
Metrics
::
GetInstance
().
AddVectorsFailTotalIncrement
(
n_
);
server
::
Metrics
::
GetInstance
().
AddVectorsFailGaugeSe
t
(
n_
);
}
}
}
...
...
@@ -69,14 +70,16 @@ public:
}
~
CollectQueryMetrics
()
{
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time_
,
end_time
);
for
(
int
i
=
0
;
i
<
nq_
;
++
i
)
{
server
::
Metrics
::
GetInstance
().
QueryResponseSummaryObserve
(
total_time
);
if
(
nq_
>
0
)
{
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time_
,
end_time
);
for
(
int
i
=
0
;
i
<
nq_
;
++
i
)
{
server
::
Metrics
::
GetInstance
().
QueryResponseSummaryObserve
(
total_time
);
}
auto
average_time
=
total_time
/
nq_
;
server
::
Metrics
::
GetInstance
().
QueryVectorResponseSummaryObserve
(
average_time
,
nq_
);
server
::
Metrics
::
GetInstance
().
QueryVectorResponsePerSecondGaugeSet
(
double
(
nq_
)
/
total_time
);
}
auto
average_time
=
total_time
/
nq_
;
server
::
Metrics
::
GetInstance
().
QueryVectorResponseSummaryObserve
(
average_time
,
nq_
);
server
::
Metrics
::
GetInstance
().
QueryVectorResponsePerSecondGaugeSet
(
double
(
nq_
)
/
total_time
);
}
private:
...
...
cpp/src/scheduler/SchedInst.cpp
浏览文件 @
7bfa68fa
...
...
@@ -7,6 +7,7 @@
#include "SchedInst.h"
#include "server/ServerConfig.h"
#include "ResourceFactory.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -19,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr;
std
::
mutex
SchedInst
::
mutex_
;
void
S
chedServInit
()
{
S
tartSchedulerService
()
{
server
::
ConfigNode
&
config
=
server
::
ServerConfig
::
GetInstance
().
GetConfig
(
server
::
CONFIG_RESOURCE
);
auto
resources
=
config
.
GetChild
(
server
::
CONFIG_RESOURCES
).
GetChildren
();
for
(
auto
&
resource
:
resources
)
{
...
...
@@ -36,8 +37,12 @@ SchedServInit() {
device_id
,
enable_loader
,
enable_executor
));
knowhere
::
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
device_id
);
}
knowhere
::
FaissGpuResourceMgr
::
GetInstance
().
InitResource
();
auto
default_connection
=
Connection
(
"default_connection"
,
500.0
);
auto
connections
=
config
.
GetSequence
(
server
::
CONFIG_RESOURCE_CONNECTIONS
);
for
(
auto
&
conn
:
connections
)
{
...
...
@@ -52,6 +57,11 @@ SchedServInit() {
SchedInst
::
GetInstance
()
->
Start
();
}
void
StopSchedulerService
()
{
ResMgrInst
::
GetInstance
()
->
Stop
();
SchedInst
::
GetInstance
()
->
Stop
();
}
}
}
}
cpp/src/scheduler/SchedInst.h
浏览文件 @
7bfa68fa
...
...
@@ -53,7 +53,10 @@ private:
};
void
SchedServInit
();
StartSchedulerService
();
void
StopSchedulerService
();
}
}
...
...
cpp/src/server/DBWrapper.cpp
浏览文件 @
7bfa68fa
...
...
@@ -17,6 +17,10 @@ namespace milvus {
namespace
server
{
DBWrapper
::
DBWrapper
()
{
}
ServerError
DBWrapper
::
StartService
()
{
//db config
zilliz
::
milvus
::
engine
::
Options
opt
;
ConfigNode
&
db_config
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_DB
);
...
...
@@ -91,7 +95,9 @@ DBWrapper::DBWrapper() {
//create db instance
std
::
string
msg
=
opt
.
meta
.
path
;
try
{
zilliz
::
milvus
::
engine
::
DB
::
Open
(
opt
,
&
db_
);
engine
::
DB
*
db
=
nullptr
;
zilliz
::
milvus
::
engine
::
DB
::
Open
(
opt
,
&
db
);
db_
.
reset
(
db
);
}
catch
(
std
::
exception
&
ex
)
{
msg
=
ex
.
what
();
}
...
...
@@ -100,10 +106,18 @@ DBWrapper::DBWrapper() {
std
::
cout
<<
"ERROR! Failed to open database: "
<<
msg
<<
std
::
endl
;
kill
(
0
,
SIGUSR1
);
}
db_
->
Start
();
return
SERVER_SUCCESS
;
}
DBWrapper
::~
DBWrapper
()
{
delete
db_
;
ServerError
DBWrapper
::
StopService
()
{
if
(
db_
)
{
db_
->
Stop
();
}
return
SERVER_SUCCESS
;
}
}
...
...
cpp/src/server/DBWrapper.h
浏览文件 @
7bfa68fa
...
...
@@ -5,8 +5,11 @@
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "db/DB.h"
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
...
...
@@ -14,18 +17,27 @@ namespace server {
class
DBWrapper
{
private:
DBWrapper
();
~
DBWrapper
();
~
DBWrapper
()
=
default
;
public:
static
zilliz
::
milvus
::
engine
::
DB
*
DB
()
{
static
DBWrapper
db_wrapper
;
return
db_wrapper
.
db
();
static
DBWrapper
&
GetInstance
()
{
static
DBWrapper
wrapper
;
return
wrapper
;
}
static
std
::
shared_ptr
<
engine
::
DB
>
DB
()
{
return
GetInstance
().
EngineDB
();
}
zilliz
::
milvus
::
engine
::
DB
*
db
()
{
return
db_
;
}
ServerError
StartService
();
ServerError
StopService
();
std
::
shared_ptr
<
engine
::
DB
>
EngineDB
()
{
return
db_
;
}
private:
zilliz
::
milvus
::
engine
::
DB
*
db_
=
nullptr
;
std
::
shared_ptr
<
engine
::
DB
>
db_
;
};
}
...
...
cpp/src/server/Server.cpp
浏览文件 @
7bfa68fa
...
...
@@ -21,6 +21,7 @@
#include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -158,7 +159,7 @@ Server::Start() {
signal
(
SIGTERM
,
SignalUtil
::
HandleSignal
);
server
::
Metrics
::
GetInstance
().
Init
();
server
::
SystemInfo
::
GetInstance
().
Init
();
engine
::
SchedServInit
();
std
::
cout
<<
"Milvus server start successfully."
<<
std
::
endl
;
StartService
();
...
...
@@ -221,12 +222,16 @@ Server::LoadConfig() {
void
Server
::
StartService
()
{
engine
::
StartSchedulerService
();
DBWrapper
::
GetInstance
().
StartService
();
grpc
::
GrpcMilvusServer
::
StartService
();
}
void
Server
::
StopService
()
{
grpc
::
GrpcMilvusServer
::
StopService
();
DBWrapper
::
GetInstance
().
StopService
();
engine
::
StopSchedulerService
();
}
}
...
...
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
浏览文件 @
7bfa68fa
...
...
@@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() {
faiss
::
distance_compute_blas_threshold
=
engine_config
.
GetInt32Value
(
CONFIG_DCBT
,
20
);
DBWrapper
::
DB
();
//initialize db
std
::
string
server_address
(
address
+
":"
+
std
::
to_string
(
port
));
::
grpc
::
ServerBuilder
builder
;
...
...
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
浏览文件 @
7bfa68fa
...
...
@@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish
();
}
ServerError
GrpcBaseTask
::
Execute
()
{
ServerError
GrpcBaseTask
::
Execute
()
{
error_code_
=
OnExecute
();
Done
();
return
error_code_
;
}
void
GrpcBaseTask
::
Done
()
{
done_
=
true
;
finish_cond_
.
notify_all
();
return
error_code_
;
}
ServerError
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
ServerError
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
error_code_
=
error_code
;
error_msg_
=
error_msg
;
...
...
@@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
return
error_code_
;
}
ServerError
GrpcBaseTask
::
WaitToFinish
()
{
ServerError
GrpcBaseTask
::
WaitToFinish
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
finish_mtx_
);
finish_cond_
.
wait
(
lock
,
[
this
]
{
return
done_
;
});
...
...
@@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop
();
}
void
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
void
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
if
(
task_ptr
==
nullptr
)
{
return
;
}
...
...
@@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr
}
}
void
GrpcRequestScheduler
::
Start
()
{
void
GrpcRequestScheduler
::
Start
()
{
if
(
!
stopped_
)
{
return
;
}
...
...
@@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() {
stopped_
=
false
;
}
void
GrpcRequestScheduler
::
Stop
()
{
void
GrpcRequestScheduler
::
Stop
()
{
if
(
stopped_
)
{
return
;
}
...
...
@@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO
<<
"Scheduler stopped"
;
}
ServerError
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
ServerError
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
if
(
task_ptr
==
nullptr
)
{
return
SERVER_NULL_POINTER
;
}
...
...
@@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return
task_ptr
->
WaitToFinish
();
//sync execution
}
namespace
{
void
TakeTaskToExecute
(
TaskQueuePtr
task_queue
)
{
if
(
task_queue
==
nullptr
)
{
return
;
}
while
(
true
)
{
BaseTaskPtr
task
=
task_queue
->
Take
();
if
(
task
==
nullptr
)
{
SERVER_LOG_ERROR
<<
"Take null from task queue, stop thread"
;
break
;
//stop the thread
}
void
GrpcRequestScheduler
::
TakeTaskToExecute
(
TaskQueuePtr
task_queue
)
{
if
(
task_queue
==
nullptr
)
{
return
;
}
while
(
true
)
{
BaseTaskPtr
task
=
task_queue
->
Take
();
if
(
task
==
nullptr
)
{
SERVER_LOG_ERROR
<<
"Take null from task queue, stop thread"
;
break
;
//stop the thread
}
try
{
ServerError
err
=
task
->
Execute
();
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Task failed with code: "
<<
err
;
}
}
catch
(
std
::
exception
&
ex
)
{
SERVER_LOG_ERROR
<<
"Task failed to execute: "
<<
ex
.
what
();
try
{
ServerError
err
=
task
->
Execute
();
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Task failed with code: "
<<
err
;
}
}
catch
(
std
::
exception
&
ex
)
{
SERVER_LOG_ERROR
<<
"Task failed to execute: "
<<
ex
.
what
();
}
}
}
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mtx_
);
std
::
string
group_name
=
task_ptr
->
TaskGroup
();
...
...
@@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
task_groups_
.
insert
(
std
::
make_pair
(
group_name
,
queue
));
//start a thread
ThreadPtr
thread
=
std
::
make_shared
<
std
::
thread
>
(
&
TakeTaskToExecute
,
queue
);
ThreadPtr
thread
=
std
::
make_shared
<
std
::
thread
>
(
&
GrpcRequestScheduler
::
TakeTaskToExecute
,
this
,
queue
);
execute_threads_
.
push_back
(
thread
);
SERVER_LOG_INFO
<<
"Create new thread for task group: "
<<
group_name
;
}
...
...
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
浏览文件 @
7bfa68fa
...
...
@@ -25,30 +25,24 @@ protected:
virtual
~
GrpcBaseTask
();
public:
ServerError
Execute
();
ServerError
Execute
();
ServerError
WaitToFinish
();
void
Done
();
std
::
string
TaskGroup
()
const
{
return
task_group_
;
}
ServerError
WaitToFinish
();
ServerError
ErrorCode
()
const
{
return
error_code_
;
}
std
::
string
TaskGroup
()
const
{
return
task_group_
;
}
std
::
string
ErrorMsg
()
const
{
return
error_msg_
;
}
ServerError
ErrorCode
()
const
{
return
error_code_
;
}
bool
IsAsync
()
const
{
return
async_
;
}
std
::
string
ErrorMsg
()
const
{
return
error_msg_
;
}
bool
IsAsync
()
const
{
return
async_
;
}
protected:
virtual
ServerError
OnExecute
()
=
0
;
virtual
ServerError
OnExecute
()
=
0
;
ServerError
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
ServerError
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
protected:
mutable
std
::
mutex
finish_mtx_
;
...
...
@@ -77,19 +71,18 @@ public:
void
Stop
();
ServerError
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
ServerError
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
static
void
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
static
void
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
protected:
GrpcRequestScheduler
();
virtual
~
GrpcRequestScheduler
();
ServerError
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
void
TakeTaskToExecute
(
TaskQueuePtr
task_queue
);
ServerError
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
private:
mutable
std
::
mutex
queue_mtx_
;
...
...
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
浏览文件 @
7bfa68fa
...
...
@@ -93,6 +93,7 @@ namespace {
return
;
}
//range: [start_day, end_day)
for
(
long
i
=
0
;
i
<
days
;
i
++
)
{
time_t
tt_day
=
tt_start
+
DAY_SECONDS
*
i
;
tm
tm_day
;
...
...
@@ -456,21 +457,17 @@ InsertTask::OnExecute() {
}
}
//step 3: check table flag
//all user provide id, or all internal id
uint64_t
row_count
=
0
;
DBWrapper
::
DB
()
->
GetTableRowCount
(
table_info
.
table_id_
,
row_count
);
bool
empty_table
=
(
row_count
==
0
);
bool
user_provide_ids
=
!
insert_param_
->
row_id_array
().
empty
();
if
(
!
empty_table
)
{
//user already provided id before, all insert action require user id
if
(
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
!
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are user defined, please provide id for this batch"
);
}
//user already provided id before, all insert action require user id
if
((
table_info
.
flag_
&
engine
::
meta
::
FLAG_MASK_HAS_USERID
)
&&
!
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are user defined, please provide id for this batch"
);
}
//user didn't provided id before, no need to provide user id
if
(
!
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are auto generated, no need to provide id for this batch"
);
}
//user didn't provided id before, no need to provide user id
if
((
table_info
.
flag_
&
engine
::
meta
::
FLAG_MASK_NO_USERID
)
&&
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are auto generated, no need to provide id for this batch"
);
}
rc
.
RecordSection
(
"check validation"
);
...
...
@@ -481,7 +478,7 @@ InsertTask::OnExecute() {
ProfilerStart
(
fname
.
c_str
());
#endif
//step
3
: prepare float data
//step
4
: prepare float data
std
::
vector
<
float
>
vec_f
(
insert_param_
->
row_record_array_size
()
*
table_info
.
dimension_
,
0
);
// TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
...
...
@@ -504,7 +501,7 @@ InsertTask::OnExecute() {
rc
.
ElapseFromBegin
(
"prepare vectors data"
);
//step
4
: insert vectors
//step
5
: insert vectors
auto
vec_count
=
(
uint64_t
)
insert_param_
->
row_record_array_size
();
std
::
vector
<
int64_t
>
vec_ids
(
insert_param_
->
row_id_array_size
(),
0
);
if
(
!
insert_param_
->
row_id_array
().
empty
())
{
...
...
@@ -529,11 +526,10 @@ InsertTask::OnExecute() {
return
SetError
(
SERVER_ILLEGAL_VECTOR_ID
,
msg
);
}
//step 5: update table flag
if
(
empty_table
&&
user_provide_ids
)
{
stat
=
DBWrapper
::
DB
()
->
UpdateTableFlag
(
insert_param_
->
table_name
(),
table_info
.
flag_
|
engine
::
meta
::
FLAG_MASK_USERID
);
}
//step 6: update table flag
user_provide_ids
?
table_info
.
flag_
|=
engine
::
meta
::
FLAG_MASK_HAS_USERID
:
table_info
.
flag_
|=
engine
::
meta
::
FLAG_MASK_NO_USERID
;
stat
=
DBWrapper
::
DB
()
->
UpdateTableFlag
(
insert_param_
->
table_name
(),
table_info
.
flag_
);
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop
();
...
...
cpp/src/wrapper/knowhere/vec_impl.cpp
浏览文件 @
7bfa68fa
...
...
@@ -241,8 +241,9 @@ server::KnowhereError IVFMixIndex::BuildAll(const long &nb,
index_
->
Add
(
dataset
,
cfg
);
if
(
auto
device_index
=
std
::
dynamic_pointer_cast
<
GPUIVF
>
(
index_
))
{
auto
host_index
=
device_index
->
Copy
_index_gpu_to_cpu
(
);
auto
host_index
=
device_index
->
Copy
GpuToCpu
(
Config
()
);
index_
=
host_index
;
type
=
TransferToCpuIndexType
(
type
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Build IVFMIXIndex Failed"
;
}
...
...
cpp/src/wrapper/knowhere/vec_index.cpp
浏览文件 @
7bfa68fa
...
...
@@ -106,6 +106,10 @@ VecIndexPtr GetVecIndexFactory(const IndexType &type) {
index
=
std
::
make_shared
<
zilliz
::
knowhere
::
GPUIVFSQ
>
(
0
);
return
std
::
make_shared
<
IVFMixIndex
>
(
index
,
IndexType
::
FAISS_IVFSQ8_MIX
);
}
case
IndexType
::
FAISS_IVFSQ8
:
{
index
=
std
::
make_shared
<
zilliz
::
knowhere
::
IVFSQ
>
();
break
;
}
case
IndexType
::
NSG_MIX
:
{
// TODO(linxj): bug.
index
=
std
::
make_shared
<
zilliz
::
knowhere
::
NSG
>
(
0
);
break
;
...
...
@@ -194,10 +198,10 @@ server::KnowhereError write_index(VecIndexPtr index, const std::string &location
// TODO(linxj): redo here.
void
AutoGenParams
(
const
IndexType
&
type
,
const
long
&
size
,
zilliz
::
knowhere
::
Config
&
cfg
)
{
auto
nlist
=
cfg
.
get_with_default
(
"nlist"
,
0
);
if
(
size
<=
TYPICAL_COUNT
/
16384
+
1
)
{
if
(
size
<=
TYPICAL_COUNT
/
16384
+
1
)
{
//handle less row count, avoid nlist set to 0
cfg
[
"nlist"
]
=
1
;
}
else
if
(
int
(
size
/
TYPICAL_COUNT
)
*
nlist
==
0
)
{
}
else
if
(
int
(
size
/
TYPICAL_COUNT
)
*
nlist
==
0
)
{
//calculate a proper nlist if nlist not specified or size less than TYPICAL_COUNT
cfg
[
"nlist"
]
=
int
(
size
/
TYPICAL_COUNT
*
16384
);
}
...
...
@@ -225,6 +229,20 @@ void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Co
}
}
IndexType
TransferToCpuIndexType
(
const
IndexType
&
type
)
{
switch
(
type
)
{
case
IndexType
::
FAISS_IVFFLAT_MIX
:
{
return
IndexType
::
FAISS_IVFFLAT_CPU
;
}
case
IndexType
::
FAISS_IVFSQ8_MIX
:
{
return
IndexType
::
FAISS_IVFSQ8
;
}
default:
{
return
IndexType
::
INVALID
;
}
}
}
}
}
}
cpp/src/wrapper/knowhere/vec_index.h
浏览文件 @
7bfa68fa
...
...
@@ -32,6 +32,7 @@ enum class IndexType {
FAISS_IVFPQ_GPU
,
SPTAG_KDT_RNT_CPU
,
FAISS_IVFSQ8_MIX
,
FAISS_IVFSQ8
,
NSG_MIX
,
};
...
...
@@ -88,6 +89,8 @@ extern VecIndexPtr LoadVecIndex(const IndexType &index_type, const zilliz::knowh
extern
void
AutoGenParams
(
const
IndexType
&
type
,
const
long
&
size
,
Config
&
cfg
);
extern
IndexType
TransferToCpuIndexType
(
const
IndexType
&
type
);
}
}
}
cpp/unittest/db/db_tests.cpp
浏览文件 @
7bfa68fa
...
...
@@ -293,18 +293,15 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
ASSERT_STATS
(
stat
);
ASSERT_EQ
(
table_info_get
.
dimension_
,
TABLE_DIM
);
engine
::
IDNumbers
vector_ids
;
engine
::
IDNumbers
target_ids
;
int64_t
nb
=
100000
;
int64_t
nb
=
VECTOR_COUNT
;
std
::
vector
<
float
>
xb
;
BuildVectors
(
nb
,
xb
);
int
loop
=
5
;
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
target_ids
);
ASSERT_EQ
(
target_ids
.
size
(),
nb
);
engine
::
IDNumbers
vector_ids
;
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
ASSERT_EQ
(
vector_ids
.
size
(),
nb
);
}
engine
::
TableIndex
index
;
...
...
@@ -342,9 +339,6 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
ASSERT_STATS
(
stat
);
ASSERT_EQ
(
table_info_get
.
dimension_
,
TABLE_DIM
);
engine
::
IDNumbers
vector_ids
;
engine
::
IDNumbers
target_ids
;
uint64_t
size
;
db_
->
Size
(
size
);
...
...
@@ -354,6 +348,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
int
loop
=
INSERT_LOOP
;
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
engine
::
IDNumbers
vector_ids
;
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
microseconds
(
1
));
}
...
...
@@ -378,20 +373,17 @@ TEST_F(DBTest2, DELETE_TEST) {
db_
->
HasTable
(
TABLE_NAME
,
has_table
);
ASSERT_TRUE
(
has_table
);
engine
::
IDNumbers
vector_ids
;
uint64_t
size
;
db_
->
Size
(
size
);
int64_t
nb
=
INSERT_LOOP
;
int64_t
nb
=
VECTOR_COUNT
;
std
::
vector
<
float
>
xb
;
BuildVectors
(
nb
,
xb
);
int
loop
=
20
;
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
microseconds
(
1
));
}
engine
::
IDNumbers
vector_ids
;
stat
=
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
engine
::
TableIndex
index
;
stat
=
db_
->
CreateIndex
(
TABLE_NAME
,
index
);
std
::
vector
<
engine
::
meta
::
DateT
>
dates
;
stat
=
db_
->
DeleteTable
(
TABLE_NAME
,
dates
);
...
...
@@ -420,25 +412,31 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
db_
->
HasTable
(
TABLE_NAME
,
has_table
);
ASSERT_TRUE
(
has_table
);
engine
::
IDNumbers
vector_ids
;
uint64_t
size
;
db_
->
Size
(
size
);
ASSERT_EQ
(
size
,
0UL
);
int64_t
nb
=
INSERT_LOOP
;
int64_t
nb
=
VECTOR_COUNT
;
std
::
vector
<
float
>
xb
;
BuildVectors
(
nb
,
xb
);
int
loop
=
20
;
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
microseconds
(
1
));
}
engine
::
IDNumbers
vector_ids
;
stat
=
db_
->
InsertVectors
(
TABLE_NAME
,
nb
,
xb
.
data
(),
vector_ids
);
engine
::
TableIndex
index
;
stat
=
db_
->
CreateIndex
(
TABLE_NAME
,
index
);
db_
->
Size
(
size
);
ASSERT_NE
(
size
,
0UL
);
std
::
vector
<
engine
::
meta
::
DateT
>
dates
;
std
::
string
start_value
=
CurrentTmDate
(
-
3
);
std
::
string
end_value
=
CurrentTmDate
(
-
2
);
std
::
string
start_value
=
CurrentTmDate
();
std
::
string
end_value
=
CurrentTmDate
(
1
);
ConvertTimeRangeToDBDates
(
start_value
,
end_value
,
dates
);
db_
->
DeleteTable
(
TABLE_NAME
,
dates
);
stat
=
db_
->
DeleteTable
(
TABLE_NAME
,
dates
);
ASSERT_STATS
(
stat
);
uint64_t
row_count
=
0
;
db_
->
GetTableRowCount
(
TABLE_NAME
,
row_count
);
ASSERT_EQ
(
row_count
,
0UL
);
}
\ No newline at end of file
cpp/unittest/db/meta_tests.cpp
浏览文件 @
7bfa68fa
...
...
@@ -74,21 +74,21 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
ASSERT_EQ
(
table_file
.
file_type_
,
new_file_type
);
meta
::
DatesT
dates
;
dates
.
push_back
(
meta
::
Meta
::
GetDate
());
dates
.
push_back
(
utils
::
GetDate
());
status
=
impl_
->
DropPartitionsByDates
(
table_file
.
table_id_
,
dates
);
ASSERT_
FALS
E
(
status
.
ok
());
ASSERT_
TRU
E
(
status
.
ok
());
dates
.
clear
();
for
(
auto
i
=
2
;
i
<
10
;
++
i
)
{
dates
.
push_back
(
meta
::
Meta
::
GetDateWithDelta
(
-
1
*
i
));
dates
.
push_back
(
utils
::
GetDateWithDelta
(
-
1
*
i
));
}
status
=
impl_
->
DropPartitionsByDates
(
table_file
.
table_id_
,
dates
);
ASSERT_TRUE
(
status
.
ok
());
table_file
.
date_
=
meta
::
Meta
::
GetDateWithDelta
(
-
2
);
table_file
.
date_
=
utils
::
GetDateWithDelta
(
-
2
);
status
=
impl_
->
UpdateTableFile
(
table_file
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
table_file
.
date_
,
meta
::
Meta
::
GetDateWithDelta
(
-
2
));
ASSERT_EQ
(
table_file
.
date_
,
utils
::
GetDateWithDelta
(
-
2
));
ASSERT_FALSE
(
table_file
.
file_type_
==
meta
::
TableFileSchema
::
TO_DELETE
);
dates
.
clear
();
...
...
cpp/unittest/db/misc_test.cpp
浏览文件 @
7bfa68fa
...
...
@@ -105,7 +105,7 @@ TEST(DBMiscTest, META_TEST) {
time_t
tt
;
time
(
&
tt
);
int
delta
=
10
;
engine
::
meta
::
DateT
dt
=
impl
.
GetDate
(
tt
,
delta
);
engine
::
meta
::
DateT
dt
=
engine
::
utils
::
GetDate
(
tt
,
delta
);
ASSERT_GT
(
dt
,
0
);
}
...
...
cpp/unittest/db/mysql_meta_test.cpp
浏览文件 @
7bfa68fa
...
...
@@ -90,7 +90,7 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
ASSERT_EQ
(
table_file
.
file_type_
,
meta
::
TableFileSchema
::
NEW
);
meta
::
DatesT
dates
;
dates
.
push_back
(
meta
::
Meta
::
GetDate
());
dates
.
push_back
(
utils
::
GetDate
());
status
=
impl
.
DropPartitionsByDates
(
table_file
.
table_id_
,
dates
);
ASSERT_FALSE
(
status
.
ok
());
...
...
@@ -110,15 +110,15 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILE_TEST) {
dates
.
clear
();
for
(
auto
i
=
2
;
i
<
10
;
++
i
)
{
dates
.
push_back
(
meta
::
Meta
::
GetDateWithDelta
(
-
1
*
i
));
dates
.
push_back
(
utils
::
GetDateWithDelta
(
-
1
*
i
));
}
status
=
impl
.
DropPartitionsByDates
(
table_file
.
table_id_
,
dates
);
ASSERT_TRUE
(
status
.
ok
());
table_file
.
date_
=
meta
::
Meta
::
GetDateWithDelta
(
-
2
);
table_file
.
date_
=
utils
::
GetDateWithDelta
(
-
2
);
status
=
impl
.
UpdateTableFile
(
table_file
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
table_file
.
date_
,
meta
::
Meta
::
GetDateWithDelta
(
-
2
));
ASSERT_EQ
(
table_file
.
date_
,
utils
::
GetDateWithDelta
(
-
2
));
ASSERT_FALSE
(
table_file
.
file_type_
==
meta
::
TableFileSchema
::
TO_DELETE
);
dates
.
clear
();
...
...
cpp/unittest/knowhere/knowhere_test.cpp
浏览文件 @
7bfa68fa
...
...
@@ -8,23 +8,29 @@
#include <easylogging++.h>
#include <wrapper/knowhere/vec_index.h>
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "utils.h"
INITIALIZE_EASYLOGGINGPP
using
namespace
zilliz
::
milvus
::
engine
;
using
namespace
zilliz
::
knowhere
;
//
using namespace zilliz::knowhere;
using
::
testing
::
TestWithParam
;
using
::
testing
::
Values
;
using
::
testing
::
Combine
;
constexpr
int64_t
DIM
=
512
;
constexpr
int64_t
NB
=
1000000
;
class
KnowhereWrapperTest
:
public
TestWithParam
<::
std
::
tuple
<
IndexType
,
std
::
string
,
int
,
int
,
int
,
int
,
Config
,
Config
>>
{
protected:
void
SetUp
()
override
{
zilliz
::
knowhere
::
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
0
);
zilliz
::
knowhere
::
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
1
);
std
::
string
generator_type
;
std
::
tie
(
index_type
,
generator_type
,
dim
,
nb
,
nq
,
k
,
train_cfg
,
search_cfg
)
=
GetParam
();
...
...
@@ -66,8 +72,8 @@ class KnowhereWrapperTest
Config
train_cfg
;
Config
search_cfg
;
int
dim
=
64
;
int
nb
=
10000
;
int
dim
=
DIM
;
int
nb
=
NB
;
int
nq
=
10
;
int
k
=
10
;
std
::
vector
<
float
>
xb
;
...
...
@@ -94,27 +100,27 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
// Config::object{{"nlist", 100}, {"dim", 64}},
// Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 40}}
//),
std
::
make_tuple
(
IndexType
::
FAISS_IVFFLAT_MIX
,
"Default"
,
64
,
100000
,
10
,
10
,
Config
::
object
{{
"nlist"
,
1000
},
{
"dim"
,
64
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"dim"
,
64
},
{
"k"
,
10
},
{
"nprobe"
,
5
}}
),
std
::
make_tuple
(
IndexType
::
FAISS_IDMAP
,
"Default"
,
64
,
100000
,
10
,
10
,
Config
::
object
{{
"dim"
,
64
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"dim"
,
64
},
{
"k"
,
10
}}
),
//
std::make_tuple(IndexType::FAISS_IVFFLAT_MIX, "Default",
//
64, 100000, 10, 10,
//
Config::object{{"nlist", 1000}, {"dim", 64}, {"metric_type", "L2"}},
//
Config::object{{"dim", 64}, {"k", 10}, {"nprobe", 5}}
//
),
//
std::make_tuple(IndexType::FAISS_IDMAP, "Default",
//
64, 100000, 10, 10,
//
Config::object{{"dim", 64}, {"metric_type", "L2"}},
//
Config::object{{"dim", 64}, {"k", 10}}
//
),
std
::
make_tuple
(
IndexType
::
FAISS_IVFSQ8_MIX
,
"Default"
,
64
,
100000
,
10
,
10
,
Config
::
object
{{
"dim"
,
64
},
{
"nlist"
,
1000
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"dim"
,
64
},
{
"k"
,
10
},
{
"nprobe"
,
5
}}
),
std
::
make_tuple
(
IndexType
::
NSG_MIX
,
"Default"
,
128
,
250000
,
10
,
10
,
Config
::
object
{{
"dim"
,
128
},
{
"nlist"
,
8192
},
{
"nprobe"
,
16
},
{
"metric_type"
,
"L2"
},
{
"knng"
,
200
},
{
"search_length"
,
40
},
{
"out_degree"
,
60
},
{
"candidate_pool_size"
,
200
}},
Config
::
object
{{
"k"
,
10
},
{
"search_length"
,
20
}}
DIM
,
NB
,
10
,
10
,
Config
::
object
{{
"dim"
,
DIM
},
{
"nlist"
,
1000
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"dim"
,
DIM
},
{
"k"
,
10
},
{
"nprobe"
,
5
}}
)
// std::make_tuple(IndexType::NSG_MIX, "Default",
// 128, 250000, 10, 10,
// Config::object{{"dim", 128}, {"nlist", 8192}, {"nprobe", 16}, {"metric_type", "L2"},
// {"knng", 200}, {"search_length", 40}, {"out_degree", 60}, {"candidate_pool_size", 200}},
// Config::object{{"k", 10}, {"search_length", 20}}
// )
//std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default",
// 64, 10000, 10, 10,
// Config::object{{"TPTNumber", 1}, {"dim", 64}},
...
...
@@ -135,6 +141,31 @@ TEST_P(KnowhereWrapperTest, base_test) {
AssertResult
(
res_ids
,
res_dis
);
}
TEST_P
(
KnowhereWrapperTest
,
to_gpu_test
)
{
EXPECT_EQ
(
index_
->
GetType
(),
index_type
);
auto
elems
=
nq
*
k
;
std
::
vector
<
int64_t
>
res_ids
(
elems
);
std
::
vector
<
float
>
res_dis
(
elems
);
index_
->
BuildAll
(
nb
,
xb
.
data
(),
ids
.
data
(),
train_cfg
);
index_
->
Search
(
nq
,
xq
.
data
(),
res_dis
.
data
(),
res_ids
.
data
(),
search_cfg
);
AssertResult
(
res_ids
,
res_dis
);
{
index_
->
CopyToGpu
(
1
);
}
std
::
string
file_location
=
"/tmp/whatever"
;
write_index
(
index_
,
file_location
);
auto
new_index
=
read_index
(
file_location
);
auto
dev_idx
=
new_index
->
CopyToGpu
(
1
);
for
(
int
i
=
0
;
i
<
10000
;
++
i
)
{
dev_idx
->
Search
(
nq
,
xq
.
data
(),
res_dis
.
data
(),
res_ids
.
data
(),
search_cfg
);
}
AssertResult
(
res_ids
,
res_dis
);
}
TEST_P
(
KnowhereWrapperTest
,
serialize
)
{
EXPECT_EQ
(
index_
->
GetType
(),
index_type
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录