Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
597456b4
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
597456b4
编写于
10月 13, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
差异文件
fix conflicts
Former-commit-id: 9e513a70425a88870cb8f1249c7f0643cb76bb7b
上级
a711db52
074bdbc5
变更
33
隐藏空白更改
内联
并排
Showing
33 changed file
with
737 addition
and
252 deletion
+737
-252
ci/jenkinsfile/dev_test.groovy
ci/jenkinsfile/dev_test.groovy
+1
-2
ci/jenkinsfile/dev_test_all.groovy
ci/jenkinsfile/dev_test_all.groovy
+1
-1
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/cmake/ThirdPartyPackages.cmake
cpp/cmake/ThirdPartyPackages.cmake
+1
-1
cpp/src/core/cmake/ThirdPartyPackagesCore.cmake
cpp/src/core/cmake/ThirdPartyPackagesCore.cmake
+2
-2
cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp
...re/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp
+9
-0
cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h
...core/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h
+4
-0
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVF.cpp
...rc/core/knowhere/knowhere/index/vector_index/IndexIVF.cpp
+13
-0
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp
...knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp
+39
-20
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h
...e/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h
+5
-1
cpp/src/core/unittest/test_ivf.cpp
cpp/src/core/unittest/test_ivf.cpp
+9
-6
cpp/src/core/unittest/test_kdt.cpp
cpp/src/core/unittest/test_kdt.cpp
+1
-0
cpp/src/core/unittest/test_nsg/test_nsg.cpp
cpp/src/core/unittest/test_nsg/test_nsg.cpp
+7
-7
cpp/src/db/engine/ExecutionEngine.h
cpp/src/db/engine/ExecutionEngine.h
+3
-2
cpp/src/db/engine/ExecutionEngineImpl.cpp
cpp/src/db/engine/ExecutionEngineImpl.cpp
+34
-5
cpp/src/db/engine/ExecutionEngineImpl.h
cpp/src/db/engine/ExecutionEngineImpl.h
+3
-2
cpp/src/scheduler/JobMgr.cpp
cpp/src/scheduler/JobMgr.cpp
+25
-1
cpp/src/scheduler/JobMgr.h
cpp/src/scheduler/JobMgr.h
+4
-1
cpp/src/scheduler/SchedInst.h
cpp/src/scheduler/SchedInst.h
+3
-2
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
+37
-34
cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp
cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp
+73
-0
cpp/src/scheduler/optimizer/LargeSQ8HPass.h
cpp/src/scheduler/optimizer/LargeSQ8HPass.h
+47
-0
cpp/src/scheduler/resource/Resource.cpp
cpp/src/scheduler/resource/Resource.cpp
+8
-7
cpp/src/scheduler/task/SearchTask.cpp
cpp/src/scheduler/task/SearchTask.cpp
+100
-29
cpp/src/scheduler/task/SearchTask.h
cpp/src/scheduler/task/SearchTask.h
+8
-5
cpp/src/wrapper/VecImpl.cpp
cpp/src/wrapper/VecImpl.cpp
+22
-6
cpp/src/wrapper/VecImpl.h
cpp/src/wrapper/VecImpl.h
+3
-1
cpp/src/wrapper/VecIndex.h
cpp/src/wrapper/VecIndex.h
+8
-2
cpp/unittest/db/test_db.cpp
cpp/unittest/db/test_db.cpp
+2
-3
cpp/unittest/db/test_search.cpp
cpp/unittest/db/test_search.cpp
+223
-77
cpp/unittest/scheduler/test_resource_mgr.cpp
cpp/unittest/scheduler/test_resource_mgr.cpp
+3
-0
cpp/unittest/scheduler/test_scheduler.cpp
cpp/unittest/scheduler/test_scheduler.cpp
+37
-34
cpp/unittest/wrapper/test_wrapper.cpp
cpp/unittest/wrapper/test_wrapper.cpp
+1
-1
未找到文件。
ci/jenkinsfile/dev_test.groovy
浏览文件 @
597456b4
...
...
@@ -2,10 +2,9 @@ timeout(time: 30, unit: 'MINUTES') {
try
{
dir
(
"${PROJECT_NAME}_test"
)
{
checkout
([
$class
:
'GitSCM'
,
branches:
[[
name:
"${SEMVER}"
]],
doGenerateSubmoduleConfigurations:
false
,
extensions:
[],
submoduleCfg:
[],
userRemoteConfigs:
[[
credentialsId:
"${params.GIT_USER}"
,
url:
"git@192.168.1.105:Test/milvus_test.git"
,
name:
'origin'
,
refspec:
"+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"
]]])
sh
'python3 -m pip install -r requirements.txt'
sh
'python3 -m pip install -r requirements.txt
-i http://pypi.douban.com/simple --trusted-host pypi.douban.com
'
sh
"pytest . --alluredir=\"test_out/dev/single/sqlite\" --level=1 --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local"
}
// mysql database backend test
load
"${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
...
...
ci/jenkinsfile/dev_test_all.groovy
浏览文件 @
597456b4
...
...
@@ -2,7 +2,7 @@ timeout(time: 60, unit: 'MINUTES') {
try
{
dir
(
"${PROJECT_NAME}_test"
)
{
checkout
([
$class
:
'GitSCM'
,
branches:
[[
name:
"${SEMVER}"
]],
doGenerateSubmoduleConfigurations:
false
,
extensions:
[],
submoduleCfg:
[],
userRemoteConfigs:
[[
credentialsId:
"${params.GIT_USER}"
,
url:
"git@192.168.1.105:Test/milvus_test.git"
,
name:
'origin'
,
refspec:
"+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"
]]])
sh
'python3 -m pip install -r requirements.txt'
sh
'python3 -m pip install -r requirements.txt
-i http://pypi.douban.com/simple --trusted-host pypi.douban.com
'
sh
"pytest . --alluredir=\"test_out/dev/single/sqlite\" --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local"
}
...
...
cpp/CHANGELOG.md
浏览文件 @
597456b4
...
...
@@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-619 - Add optimizer class in scheduler
-
MS-614 - Preload table at startup
-
MS-626 - Refactor DataObj to support cache any type data
-
MS-648 - Improve unittest
## New Feature
-
MS-627 - Integrate new index: IVFSQHybrid
...
...
cpp/cmake/ThirdPartyPackages.cmake
浏览文件 @
597456b4
...
...
@@ -143,7 +143,7 @@ if(USE_JFROG_CACHE STREQUAL "ON")
if
(
NOT DEFINED JFROG_ARTFACTORY_URL
)
message
(
FATAL_ERROR
"JFROG_ARTFACTORY_URL is not set"
)
endif
()
set
(
JFROG_ARTFACTORY_CACHE_URL
"
${
JFROG_ARTFACTORY_URL
}
/
generic-local/
milvus/thirdparty/cache/
${
CMAKE_OS_NAME
}
/
${
MILVUS_BUILD_ARCH
}
/
${
BUILD_TYPE
}
"
)
set
(
JFROG_ARTFACTORY_CACHE_URL
"
${
JFROG_ARTFACTORY_URL
}
/milvus/thirdparty/cache/
${
CMAKE_OS_NAME
}
/
${
MILVUS_BUILD_ARCH
}
/
${
BUILD_TYPE
}
"
)
if
(
DEFINED ENV{JFROG_USER_NAME}
)
set
(
JFROG_USER_NAME
"$ENV{JFROG_USER_NAME}"
)
endif
()
...
...
cpp/src/core/cmake/ThirdPartyPackagesCore.cmake
浏览文件 @
597456b4
...
...
@@ -123,7 +123,7 @@ if(NOT DEFINED USE_JFROG_CACHE)
set
(
USE_JFROG_CACHE
"OFF"
)
endif
()
if
(
USE_JFROG_CACHE STREQUAL
"ON"
)
set
(
JFROG_ARTFACTORY_CACHE_URL
"
${
JFROG_ARTFACTORY_URL
}
/
generic-local/
milvus/thirdparty/cache/
${
CMAKE_OS_NAME
}
/
${
KNOWHERE_BUILD_ARCH
}
/
${
BUILD_TYPE
}
"
)
set
(
JFROG_ARTFACTORY_CACHE_URL
"
${
JFROG_ARTFACTORY_URL
}
/milvus/thirdparty/cache/
${
CMAKE_OS_NAME
}
/
${
KNOWHERE_BUILD_ARCH
}
/
${
BUILD_TYPE
}
"
)
set
(
THIRDPARTY_PACKAGE_CACHE
"
${
THIRDPARTY_DIR
}
/cache"
)
endif
()
...
...
@@ -234,7 +234,7 @@ if(CUSTOMIZATION)
# set(FAISS_MD5 "57da9c4f599cc8fa4260488b1c96e1cc") # commit-id 6dbdf75987c34a2c853bd172ea0d384feea8358c branch-0.2.0
# set(FAISS_MD5 "21deb1c708490ca40ecb899122c01403") # commit-id 643e48f479637fd947e7b93fa4ca72b38ecc9a39 branch-0.2.0
# set(FAISS_MD5 "072db398351cca6e88f52d743bbb9fa0") # commit-id 3a2344d04744166af41ef1a74449d68a315bfe17 branch-0.2.1
set
(
FAISS_MD5
"
94988b7bdac4eb82a9575c702a3f2df3"
)
# commit-id 1407526b31cad26f98ceca8dddaface8f18c4c19
branch-0.2.1
set
(
FAISS_MD5
"
c89ea8e655f5cdf58f42486f13614714"
)
# commit-id 9c28a1cbb88f41fa03b03d7204106201ad33276b
branch-0.2.1
execute_process
(
COMMAND wget -q --method HEAD
${
FAISS_SOURCE_URL
}
RESULT_VARIABLE return_code
)
message
(
STATUS
"Check the remote cache file
${
FAISS_SOURCE_URL
}
. return code =
${
return_code
}
"
)
...
...
cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.cpp
浏览文件 @
597456b4
...
...
@@ -71,4 +71,13 @@ GPUIVFSQ::CopyGpuToCpu(const Config& config) {
return
std
::
make_shared
<
IVFSQ
>
(
new_index
);
}
void
GPUIVFSQ
::
search_impl
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
float
*
distances
,
int64_t
*
labels
,
const
Config
&
cfg
)
{
#ifdef CUSTOMIZATION
GPUIVF
::
search_impl
(
n
,
data
,
k
,
distances
,
labels
,
cfg
);
#else
IVF
::
search_impl
(
n
,
data
,
k
,
distances
,
labels
,
cfg
);
#endif
}
}
// namespace knowhere
cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFSQ.h
浏览文件 @
597456b4
...
...
@@ -38,6 +38,10 @@ class GPUIVFSQ : public GPUIVF {
VectorIndexPtr
CopyGpuToCpu
(
const
Config
&
config
)
override
;
protected:
void
search_impl
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
float
*
distances
,
int64_t
*
labels
,
const
Config
&
cfg
)
override
;
};
}
// namespace knowhere
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVF.cpp
浏览文件 @
597456b4
...
...
@@ -115,6 +115,19 @@ IVF::Search(const DatasetPtr& dataset, const Config& config) {
search_impl
(
rows
,
(
float
*
)
p_data
,
search_cfg
->
k
,
res_dis
,
res_ids
,
config
);
// std::stringstream ss_res_id, ss_res_dist;
// for (int i = 0; i < 10; ++i) {
// printf("%llu", res_ids[i]);
// printf("\n");
// printf("%.6f", res_dis[i]);
// printf("\n");
// ss_res_id << res_ids[i] << " ";
// ss_res_dist << res_dis[i] << " ";
// }
// std::cout << std::endl << "after search: " << std::endl;
// std::cout << ss_res_id.str() << std::endl;
// std::cout << ss_res_dist.str() << std::endl << std::endl;
auto
id_buf
=
MakeMutableBufferSmart
((
uint8_t
*
)
res_ids
,
sizeof
(
int64_t
)
*
elems
);
auto
dist_buf
=
MakeMutableBufferSmart
((
uint8_t
*
)
res_dis
,
sizeof
(
float
)
*
elems
);
...
...
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp
浏览文件 @
597456b4
...
...
@@ -17,6 +17,7 @@
// under the License.
#include "knowhere/index/vector_index/IndexIVFSQHybrid.h"
#include <utility>
#include "faiss/AutoTune.h"
#include "faiss/gpu/GpuAutoTune.h"
#include "faiss/gpu/GpuIndexIVF.h"
...
...
@@ -79,20 +80,8 @@ IVFSQHybrid::CopyGpuToCpu(const Config& config) {
VectorIndexPtr
IVFSQHybrid
::
CopyCpuToGpu
(
const
int64_t
&
device_id
,
const
Config
&
config
)
{
if
(
auto
res
=
FaissGpuResourceMgr
::
GetInstance
().
GetRes
(
device_id
))
{
ResScope
rs
(
res
,
device_id
,
false
);
faiss
::
gpu
::
GpuClonerOptions
option
;
option
.
allInGpu
=
true
;
faiss
::
IndexComposition
index_composition
;
index_composition
.
index
=
index_
.
get
();
index_composition
.
quantizer
=
nullptr
;
index_composition
.
mode
=
0
;
// copy all
auto
gpu_index
=
faiss
::
gpu
::
index_cpu_to_gpu
(
res
->
faiss_res
.
get
(),
device_id
,
&
index_composition
,
&
option
);
std
::
shared_ptr
<
faiss
::
Index
>
device_index
;
device_index
.
reset
(
gpu_index
);
return
std
::
make_shared
<
IVFSQHybrid
>
(
device_index
,
device_id
,
res
);
auto
p
=
CopyCpuToGpuWithQuantizer
(
device_id
,
config
);
return
p
.
first
;
}
else
{
KNOWHERE_THROW_MSG
(
"CopyCpuToGpu Error, can't get gpu_resource"
);
}
...
...
@@ -180,7 +169,7 @@ IVFSQHybrid::UnsetQuantizer() {
ivf_index
->
quantizer
=
nullptr
;
}
void
VectorIndexPtr
IVFSQHybrid
::
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
)
{
auto
quantizer_conf
=
std
::
dynamic_pointer_cast
<
QuantizerCfg
>
(
conf
);
if
(
quantizer_conf
!=
nullptr
)
{
...
...
@@ -188,9 +177,10 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
KNOWHERE_THROW_MSG
(
"mode only support 2 in this func"
);
}
}
if
(
quantizer_conf
->
gpu_id
!=
gpu_id_
)
{
KNOWHERE_THROW_MSG
(
"quantizer and data must on the same gpu card"
);
}
// if (quantizer_conf->gpu_id != gpu_id_) {
// KNOWHERE_THROW_MSG("quantizer and data must on the same gpu card");
// }
gpu_id_
=
quantizer_conf
->
gpu_id
;
if
(
auto
res
=
FaissGpuResourceMgr
::
GetInstance
().
GetRes
(
gpu_id_
))
{
ResScope
rs
(
res
,
gpu_id_
,
false
);
...
...
@@ -207,8 +197,37 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) {
index_composition
->
mode
=
quantizer_conf
->
mode
;
// only 2
auto
gpu_index
=
faiss
::
gpu
::
index_cpu_to_gpu
(
res
->
faiss_res
.
get
(),
gpu_id_
,
index_composition
,
&
option
);
index_
.
reset
(
gpu_index
);
gpu_mode
=
2
;
// all in gpu
std
::
shared_ptr
<
faiss
::
Index
>
new_idx
;
new_idx
.
reset
(
gpu_index
);
auto
sq_idx
=
std
::
make_shared
<
IVFSQHybrid
>
(
new_idx
,
gpu_id_
,
res
);
return
sq_idx
;
}
else
{
KNOWHERE_THROW_MSG
(
"CopyCpuToGpu Error, can't get gpu_resource"
);
}
}
std
::
pair
<
VectorIndexPtr
,
QuantizerPtr
>
IVFSQHybrid
::
CopyCpuToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
config
)
{
if
(
auto
res
=
FaissGpuResourceMgr
::
GetInstance
().
GetRes
(
device_id
))
{
ResScope
rs
(
res
,
device_id
,
false
);
faiss
::
gpu
::
GpuClonerOptions
option
;
option
.
allInGpu
=
true
;
faiss
::
IndexComposition
index_composition
;
index_composition
.
index
=
index_
.
get
();
index_composition
.
quantizer
=
nullptr
;
index_composition
.
mode
=
0
;
// copy all
auto
gpu_index
=
faiss
::
gpu
::
index_cpu_to_gpu
(
res
->
faiss_res
.
get
(),
device_id
,
&
index_composition
,
&
option
);
std
::
shared_ptr
<
faiss
::
Index
>
device_index
;
device_index
.
reset
(
gpu_index
);
auto
new_idx
=
std
::
make_shared
<
IVFSQHybrid
>
(
device_index
,
device_id
,
res
);
auto
q
=
std
::
make_shared
<
FaissIVFQuantizer
>
();
q
->
quantizer
=
index_composition
.
quantizer
;
q
->
size
=
index_composition
.
quantizer
->
d
*
index_composition
.
quantizer
->
getNumVecs
()
*
sizeof
(
float
);
return
std
::
make_pair
(
new_idx
,
q
);
}
else
{
KNOWHERE_THROW_MSG
(
"CopyCpuToGpu Error, can't get gpu_resource"
);
}
...
...
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h
浏览文件 @
597456b4
...
...
@@ -19,6 +19,7 @@
#include <faiss/index_io.h>
#include <memory>
#include <utility>
#include "IndexGPUIVFSQ.h"
#include "Quantizer.h"
...
...
@@ -60,9 +61,12 @@ class IVFSQHybrid : public GPUIVFSQ {
void
UnsetQuantizer
();
void
VectorIndexPtr
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
);
std
::
pair
<
VectorIndexPtr
,
QuantizerPtr
>
CopyCpuToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
config
);
IndexModelPtr
Train
(
const
DatasetPtr
&
dataset
,
const
Config
&
config
)
override
;
...
...
cpp/src/core/unittest/test_ivf.cpp
浏览文件 @
597456b4
...
...
@@ -154,8 +154,8 @@ class IVFTest : public DataGen, public TestWithParam<::std::tuple<std::string, P
INSTANTIATE_TEST_CASE_P
(
IVFParameters
,
IVFTest
,
Values
(
std
::
make_tuple
(
"IVF"
,
ParameterType
::
ivf
),
std
::
make_tuple
(
"GPUIVF"
,
ParameterType
::
ivf
),
//
std::make_tuple("IVFPQ", ParameterType::ivfpq),
//
std::make_tuple("GPUIVFPQ", ParameterType::ivfpq),
std
::
make_tuple
(
"IVFPQ"
,
ParameterType
::
ivfpq
),
std
::
make_tuple
(
"GPUIVFPQ"
,
ParameterType
::
ivfpq
),
std
::
make_tuple
(
"IVFSQ"
,
ParameterType
::
ivfsq
),
#ifdef CUSTOMIZATION
std
::
make_tuple
(
"IVFSQHybrid"
,
ParameterType
::
ivfsq
),
...
...
@@ -240,6 +240,7 @@ TEST_P(IVFTest, hybrid) {
auto
result
=
hybrid_1_idx
->
Search
(
query_dataset
,
conf
);
AssertAnns
(
result
,
nq
,
conf
->
k
);
PrintResult
(
result
,
nq
,
k
);
hybrid_1_idx
->
UnsetQuantizer
();
}
{
...
...
@@ -253,9 +254,9 @@ TEST_P(IVFTest, hybrid) {
quantizer_conf
->
gpu_id
=
device_id
;
auto
q
=
hybrid_2_idx
->
LoadQuantizer
(
quantizer_conf
);
quantizer_conf
->
mode
=
2
;
hybrid_2_idx
->
LoadData
(
q
,
quantizer_conf
);
auto
gpu_idx
=
hybrid_2_idx
->
LoadData
(
q
,
quantizer_conf
);
auto
result
=
hybrid_2
_idx
->
Search
(
query_dataset
,
conf
);
auto
result
=
gpu
_idx
->
Search
(
query_dataset
,
conf
);
AssertAnns
(
result
,
nq
,
conf
->
k
);
PrintResult
(
result
,
nq
,
k
);
}
...
...
@@ -438,6 +439,7 @@ TEST_P(IVFTest, clone_test) {
}
}
#ifdef CUSTOMIZATION
TEST_P
(
IVFTest
,
seal_test
)
{
// FaissGpuResourceMgr::GetInstance().InitDevice(device_id);
...
...
@@ -472,6 +474,7 @@ TEST_P(IVFTest, seal_test) {
auto
with_seal
=
tc
.
RecordSection
(
"With seal"
);
ASSERT_GE
(
without_seal
,
with_seal
);
}
#endif
class
GPURESTEST
:
public
DataGen
,
public
::
testing
::
Test
{
protected:
...
...
@@ -637,7 +640,7 @@ TEST_F(GPURESTEST, copyandsearch) {
// search and copy at the same time
printf
(
"==================
\n
"
);
index_type
=
"GPUIVF
SQ
"
;
index_type
=
"GPUIVF"
;
index_
=
IndexFactory
(
index_type
);
auto
conf
=
std
::
make_shared
<
knowhere
::
IVFSQCfg
>
();
...
...
@@ -699,7 +702,7 @@ TEST_F(GPURESTEST, copyandsearch) {
}
TEST_F
(
GPURESTEST
,
TrainAndSearch
)
{
index_type
=
"GPUIVF
SQ
"
;
index_type
=
"GPUIVF"
;
index_
=
IndexFactory
(
index_type
);
auto
conf
=
std
::
make_shared
<
knowhere
::
IVFSQCfg
>
();
...
...
cpp/src/core/unittest/test_kdt.cpp
浏览文件 @
597456b4
...
...
@@ -36,6 +36,7 @@ class KDTTest : public DataGen, public ::testing::Test {
protected:
void
SetUp
()
override
{
Generate
(
96
,
1000
,
10
);
index_
=
std
::
make_shared
<
knowhere
::
CPUKDTRNG
>
();
auto
tempconf
=
std
::
make_shared
<
knowhere
::
KDTCfg
>
();
...
...
cpp/src/core/unittest/test_nsg/test_nsg.cpp
浏览文件 @
597456b4
...
...
@@ -38,17 +38,17 @@ class NSGInterfaceTest : public DataGen, public ::testing::Test {
SetUp
()
override
{
// Init_with_default();
knowhere
::
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
DEVICE_ID
,
1024
*
1024
*
200
,
1024
*
1024
*
600
,
2
);
Generate
(
256
,
1000000
,
1
);
Generate
(
256
,
1000000
/
100
,
1
);
index_
=
std
::
make_shared
<
knowhere
::
NSG
>
();
auto
tmp_conf
=
std
::
make_shared
<
knowhere
::
NSGCfg
>
();
tmp_conf
->
gpu_id
=
DEVICE_ID
;
tmp_conf
->
knng
=
10
0
;
tmp_conf
->
nprobe
=
32
;
tmp_conf
->
nlist
=
163
84
;
tmp_conf
->
search_length
=
6
0
;
tmp_conf
->
out_degree
=
7
0
;
tmp_conf
->
candidate_pool_size
=
5
00
;
tmp_conf
->
knng
=
2
0
;
tmp_conf
->
nprobe
=
8
;
tmp_conf
->
nlist
=
163
;
tmp_conf
->
search_length
=
4
0
;
tmp_conf
->
out_degree
=
3
0
;
tmp_conf
->
candidate_pool_size
=
1
00
;
tmp_conf
->
metric_type
=
knowhere
::
METRICTYPE
::
L2
;
train_conf
=
tmp_conf
;
...
...
cpp/src/db/engine/ExecutionEngine.h
浏览文件 @
597456b4
...
...
@@ -65,7 +65,7 @@ class ExecutionEngine {
Load
(
bool
to_cache
=
true
)
=
0
;
virtual
Status
CopyToGpu
(
uint64_t
device_id
)
=
0
;
CopyToGpu
(
uint64_t
device_id
,
bool
hybrid
)
=
0
;
virtual
Status
CopyToIndexFileToGpu
(
uint64_t
device_id
)
=
0
;
...
...
@@ -80,7 +80,8 @@ class ExecutionEngine {
Merge
(
const
std
::
string
&
location
)
=
0
;
virtual
Status
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
)
const
=
0
;
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
,
bool
hybrid
)
const
=
0
;
virtual
std
::
shared_ptr
<
ExecutionEngine
>
BuildIndex
(
const
std
::
string
&
location
,
EngineType
engine_type
)
=
0
;
...
...
cpp/src/db/engine/ExecutionEngineImpl.cpp
浏览文件 @
597456b4
...
...
@@ -31,6 +31,7 @@
#include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h"
#include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h>
#include <src/scheduler/Utils.h>
#include <stdexcept>
#include <utility>
...
...
@@ -245,7 +246,31 @@ ExecutionEngineImpl::Load(bool to_cache) {
}
Status
ExecutionEngineImpl
::
CopyToGpu
(
uint64_t
device_id
)
{
ExecutionEngineImpl
::
CopyToGpu
(
uint64_t
device_id
,
bool
hybrid
)
{
if
(
hybrid
)
{
auto
key
=
location_
+
".quantizer"
;
auto
quantizer
=
std
::
static_pointer_cast
<
CachedQuantizer
>
(
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
GetIndex
(
key
));
auto
conf
=
std
::
make_shared
<
knowhere
::
QuantizerCfg
>
();
conf
->
gpu_id
=
device_id
;
if
(
quantizer
)
{
// cache hit
conf
->
mode
=
2
;
auto
new_index
=
index_
->
LoadData
(
quantizer
->
Data
(),
conf
);
index_
=
new_index
;
}
else
{
auto
pair
=
index_
->
CopyToGpuWithQuantizer
(
device_id
);
index_
=
pair
.
first
;
// cache
auto
cached_quantizer
=
std
::
make_shared
<
CachedQuantizer
>
(
pair
.
second
);
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
InsertItem
(
key
,
cached_quantizer
);
}
return
Status
::
OK
();
}
auto
index
=
std
::
static_pointer_cast
<
VecIndex
>
(
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
GetIndex
(
location_
));
bool
already_in_cache
=
(
index
!=
nullptr
);
if
(
already_in_cache
)
{
...
...
@@ -389,8 +414,8 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
}
Status
ExecutionEngineImpl
::
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
)
const
{
ExecutionEngineImpl
::
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
,
bool
hybrid
)
const
{
if
(
index_
==
nullptr
)
{
ENGINE_LOG_ERROR
<<
"ExecutionEngineImpl: index is null, failed to search"
;
return
Status
(
DB_ERROR
,
"index is null"
);
...
...
@@ -406,11 +431,15 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
auto
adapter
=
AdapterMgr
::
GetInstance
().
GetAdapter
(
index_
->
GetType
());
auto
conf
=
adapter
->
MatchSearch
(
temp_conf
,
index_
->
GetType
());
HybridLoad
();
if
(
hybrid
)
{
HybridLoad
();
}
auto
status
=
index_
->
Search
(
n
,
data
,
distances
,
labels
,
conf
);
HybridUnset
();
if
(
hybrid
)
{
HybridUnset
();
}
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Search error"
;
...
...
cpp/src/db/engine/ExecutionEngineImpl.h
浏览文件 @
597456b4
...
...
@@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
Load
(
bool
to_cache
)
override
;
Status
CopyToGpu
(
uint64_t
device_id
)
override
;
CopyToGpu
(
uint64_t
device_id
,
bool
hybrid
=
false
)
override
;
Status
CopyToIndexFileToGpu
(
uint64_t
device_id
)
override
;
...
...
@@ -71,7 +71,8 @@ class ExecutionEngineImpl : public ExecutionEngine {
Merge
(
const
std
::
string
&
location
)
override
;
Status
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
)
const
override
;
Search
(
int64_t
n
,
const
float
*
data
,
int64_t
k
,
int64_t
nprobe
,
float
*
distances
,
int64_t
*
labels
,
bool
hybrid
=
false
)
const
override
;
ExecutionEnginePtr
BuildIndex
(
const
std
::
string
&
location
,
EngineType
engine_type
)
override
;
...
...
cpp/src/scheduler/JobMgr.cpp
浏览文件 @
597456b4
...
...
@@ -19,9 +19,11 @@
#include "SchedInst.h"
#include "TaskCreator.h"
#include "optimizer/Optimizer.h"
#include "scheduler/Algorithm.h"
#include "scheduler/optimizer/Optimizer.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "task/Task.h"
#include <src/scheduler/optimizer/Optimizer.h>
#include <utility>
namespace
milvus
{
...
...
@@ -73,6 +75,10 @@ JobMgr::worker_function() {
OptimizerInst
::
GetInstance
()
->
Run
(
task
);
}
for
(
auto
&
task
:
tasks
)
{
calculate_path
(
task
);
}
// disk resources NEVER be empty.
if
(
auto
disk
=
res_mgr_
->
GetDiskResources
()[
0
].
lock
())
{
for
(
auto
&
task
:
tasks
)
{
...
...
@@ -87,5 +93,23 @@ JobMgr::build_task(const JobPtr& job) {
return
TaskCreator
::
Create
(
job
);
}
void
JobMgr
::
calculate_path
(
const
TaskPtr
&
task
)
{
if
(
task
->
type_
!=
TaskType
::
SearchTask
)
{
return
;
}
if
(
task
->
label
()
->
Type
()
!=
TaskLabelType
::
SPECIFIED_RESOURCE
)
{
return
;
}
std
::
vector
<
std
::
string
>
path
;
auto
spec_label
=
std
::
static_pointer_cast
<
SpecResLabel
>
(
task
->
label
());
auto
src
=
res_mgr_
->
GetDiskResources
()[
0
];
auto
dest
=
spec_label
->
resource
();
ShortestPath
(
src
.
lock
(),
dest
.
lock
(),
res_mgr_
,
path
);
task
->
path
()
=
Path
(
path
,
path
.
size
()
-
1
);
}
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/JobMgr.h
浏览文件 @
597456b4
...
...
@@ -52,9 +52,12 @@ class JobMgr {
void
worker_function
();
std
::
vector
<
TaskPtr
>
st
atic
st
d
::
vector
<
TaskPtr
>
build_task
(
const
JobPtr
&
job
);
void
calculate_path
(
const
TaskPtr
&
task
);
private:
bool
running_
=
false
;
std
::
queue
<
JobPtr
>
queue_
;
...
...
cpp/src/scheduler/SchedInst.h
浏览文件 @
597456b4
...
...
@@ -22,6 +22,7 @@
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "optimizer/HybridPass.h"
#include "optimizer/LargeSQ8HPass.h"
#include "optimizer/Optimizer.h"
#include <memory>
...
...
@@ -92,9 +93,9 @@ class OptimizerInst {
if
(
instance
==
nullptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
if
(
instance
==
nullptr
)
{
HybridPassPtr
pass_ptr
=
std
::
make_shared
<
HybridPass
>
();
std
::
vector
<
PassPtr
>
pass_list
;
pass_list
.
push_back
(
pass_ptr
);
pass_list
.
push_back
(
std
::
make_shared
<
LargeSQ8HPass
>
());
pass_list
.
push_back
(
std
::
make_shared
<
HybridPass
>
());
instance
=
std
::
make_shared
<
Optimizer
>
(
pass_list
);
}
}
...
...
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
597456b4
...
...
@@ -145,37 +145,39 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
transport_costs
.
push_back
(
transport_cost
);
paths
.
emplace_back
(
path
);
}
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
SEARCH
)
{
auto
label
=
task
->
label
();
auto
spec_label
=
std
::
static_pointer_cast
<
SpecResLabel
>
(
label
);
if
(
spec_label
->
resource
().
lock
()
->
type
()
==
ResourceType
::
CPU
)
{
std
::
vector
<
std
::
string
>
spec_path
;
spec_path
.
push_back
(
spec_label
->
resource
().
lock
()
->
name
());
spec_path
.
push_back
(
resource
->
name
());
task
->
path
()
=
Path
(
spec_path
,
spec_path
.
size
()
-
1
);
}
else
{
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t
min_cost
=
std
::
numeric_limits
<
uint64_t
>::
max
();
uint64_t
min_cost_idx
=
0
;
for
(
uint64_t
i
=
0
;
i
<
compute_resources
.
size
();
++
i
)
{
if
(
compute_resources
[
i
]
->
TotalTasks
()
==
0
)
{
min_cost_idx
=
i
;
break
;
}
uint64_t
cost
=
compute_resources
[
i
]
->
TaskAvgCost
()
*
compute_resources
[
i
]
->
NumOfTaskToExec
()
+
transport_costs
[
i
];
if
(
min_cost
>
cost
)
{
min_cost
=
cost
;
min_cost_idx
=
i
;
}
}
// step 3: set path in task
Path
task_path
(
paths
[
min_cost_idx
],
paths
[
min_cost_idx
].
size
()
-
1
);
task
->
path
()
=
task_path
;
}
}
else
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
BUILD
)
{
// if (task->job_.lock()->type() == JobType::SEARCH) {
// auto label = task->label();
// auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
// if (spec_label->resource().lock()->type() == ResourceType::CPU) {
// std::vector<std::string> spec_path;
// spec_path.push_back(spec_label->resource().lock()->name());
// spec_path.push_back(resource->name());
// task->path() = Path(spec_path, spec_path.size() - 1);
// } else {
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() *
// compute_resources[i]->NumOfTaskToExec() +
// transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// } else
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
BUILD
)
{
// step2: Read device id in config
// get build index gpu resource
server
::
Config
&
config
=
server
::
Config
::
GetInstance
();
...
...
@@ -201,12 +203,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
}
if
(
resource
->
name
()
==
task
->
path
().
Last
())
{
resource
->
Wakeup
Loade
r
();
resource
->
Wakeup
Executo
r
();
}
else
{
auto
next_res_name
=
task
->
path
().
Next
();
auto
next_res
=
res_mgr
.
lock
()
->
GetResource
(
next_res_name
);
event
->
task_table_item_
->
Move
();
next_res
->
task_table
().
Put
(
task
);
if
(
event
->
task_table_item_
->
Move
())
{
next_res
->
task_table
().
Put
(
task
);
}
}
}
...
...
cpp/src/scheduler/optimizer/LargeSQ8HPass.cpp
0 → 100644
浏览文件 @
597456b4
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduler/optimizer/LargeSQ8HPass.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/Utils.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "utils/Log.h"
namespace
milvus
{
namespace
scheduler
{
bool
LargeSQ8HPass
::
Run
(
const
TaskPtr
&
task
)
{
if
(
task
->
Type
()
!=
TaskType
::
SearchTask
)
{
return
false
;
}
auto
search_task
=
std
::
static_pointer_cast
<
XSearchTask
>
(
task
);
if
(
search_task
->
file_
->
engine_type_
!=
(
int
)
engine
::
EngineType
::
FAISS_IVFSQ8H
)
{
return
false
;
}
auto
search_job
=
std
::
static_pointer_cast
<
SearchJob
>
(
search_task
->
job_
.
lock
());
// TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu
if
(
search_job
->
nq
()
<
100
)
{
return
false
;
}
std
::
vector
<
uint64_t
>
gpus
=
scheduler
::
get_gpu_pool
();
std
::
vector
<
int64_t
>
all_free_mem
;
for
(
auto
&
gpu
:
gpus
)
{
auto
cache
=
cache
::
GpuCacheMgr
::
GetInstance
(
gpu
);
auto
free_mem
=
cache
->
CacheCapacity
()
-
cache
->
CacheUsage
();
all_free_mem
.
push_back
(
free_mem
);
}
auto
max_e
=
std
::
max_element
(
all_free_mem
.
begin
(),
all_free_mem
.
end
());
auto
best_index
=
std
::
distance
(
all_free_mem
.
begin
(),
max_e
);
auto
best_device_id
=
gpus
[
best_index
];
ResourcePtr
res_ptr
=
ResMgrInst
::
GetInstance
()
->
GetResource
(
ResourceType
::
GPU
,
best_device_id
);
if
(
not
res_ptr
)
{
SERVER_LOG_ERROR
<<
"GpuResource "
<<
best_device_id
<<
" invalid."
;
// TODO: throw critical error and exit
return
false
;
}
auto
label
=
std
::
make_shared
<
SpecResLabel
>
(
std
::
weak_ptr
<
Resource
>
(
res_ptr
));
task
->
label
()
=
label
;
return
true
;
}
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/optimizer/LargeSQ8HPass.h
0 → 100644
浏览文件 @
597456b4
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "Pass.h"
namespace
milvus
{
namespace
scheduler
{
class
LargeSQ8HPass
:
public
Pass
{
public:
LargeSQ8HPass
()
=
default
;
public:
bool
Run
(
const
TaskPtr
&
task
)
override
;
};
using
LargeSQ8HPassPtr
=
std
::
shared_ptr
<
LargeSQ8HPass
>
;
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/resource/Resource.cpp
浏览文件 @
597456b4
...
...
@@ -20,6 +20,7 @@
#include "scheduler/Utils.h"
#include <iostream>
#include <limits>
#include <utility>
namespace
milvus
{
...
...
@@ -112,18 +113,18 @@ Resource::pick_task_load() {
TaskTableItemPtr
Resource
::
pick_task_execute
()
{
// auto indexes = task_table_.PickToExecute(3);
auto
indexes
=
task_table_
.
PickToExecute
(
std
::
numeric_limits
<
uint64_t
>::
max
());
for
(
auto
index
:
indexes
)
{
// try to set one task executing, then return
// if (task_table_.Execute(index))
// return task_table_.Get(index);
if
(
task_table_
.
Get
(
index
)
->
task
->
path
().
Current
()
==
task_table_
.
Get
(
index
)
->
task
->
path
().
Last
())
{
if
(
task_table_
.
Execute
(
index
))
{
return
task_table_
.
Get
(
index
);
if
(
task_table_
[
index
]
->
task
->
label
()
->
Type
()
==
TaskLabelType
::
SPECIFIED_RESOURCE
)
{
if
(
task_table_
[
index
]
->
task
->
path
().
Last
()
!=
name
())
{
continue
;
}
}
if
(
task_table_
.
Execute
(
index
))
{
return
task_table_
.
Get
(
index
);
}
// else try next
}
return
nullptr
;
...
...
cpp/src/scheduler/task/SearchTask.cpp
浏览文件 @
597456b4
...
...
@@ -22,6 +22,7 @@
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <src/scheduler/SchedInst.h>
#include <algorithm>
#include <string>
#include <thread>
...
...
@@ -33,8 +34,6 @@ namespace scheduler {
static
constexpr
size_t
PARALLEL_REDUCE_THRESHOLD
=
10000
;
static
constexpr
size_t
PARALLEL_REDUCE_BATCH
=
1000
;
std
::
mutex
XSearchTask
::
merge_mutex_
;
// TODO(wxyu): remove unused code
// bool
// NeedParallelReduce(uint64_t nq, uint64_t topk) {
...
...
@@ -121,7 +120,11 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
stat
=
index_engine_
->
Load
();
type_str
=
"DISK2CPU"
;
}
else
if
(
type
==
LoadType
::
CPU2GPU
)
{
stat
=
index_engine_
->
CopyToGpu
(
device_id
);
bool
hybrid
=
false
;
if
(
index_engine_
->
IndexEngineType
()
==
engine
::
EngineType
::
FAISS_IVFSQ8H
)
{
hybrid
=
true
;
}
stat
=
index_engine_
->
CopyToGpu
(
device_id
,
hybrid
);
type_str
=
"CPU2GPU"
;
}
else
if
(
type
==
LoadType
::
GPU2CPU
)
{
stat
=
index_engine_
->
CopyToCpu
();
...
...
@@ -204,14 +207,20 @@ XSearchTask::Execute() {
try
{
// step 2: search
index_engine_
->
Search
(
nq
,
vectors
,
topk
,
nprobe
,
output_distance
.
data
(),
output_ids
.
data
());
bool
hybrid
=
false
;
if
(
index_engine_
->
IndexEngineType
()
==
engine
::
EngineType
::
FAISS_IVFSQ8H
&&
ResMgrInst
::
GetInstance
()
->
GetResource
(
path
().
Last
())
->
type
()
==
ResourceType
::
CPU
)
{
hybrid
=
true
;
}
index_engine_
->
Search
(
nq
,
vectors
,
topk
,
nprobe
,
output_distance
.
data
(),
output_ids
.
data
(),
hybrid
);
double
span
=
rc
.
RecordSection
(
hdr
+
", do search"
);
// search_job->AccumSearchCost(span);
// step 3: pick up topk result
auto
spec_k
=
index_engine_
->
Count
()
<
topk
?
index_engine_
->
Count
()
:
topk
;
XSearchTask
::
TopkResult
(
output_ids
,
output_distance
,
spec_k
,
nq
,
topk
,
metric_l2
,
search_job
->
GetResult
());
XSearchTask
::
MergeTopkToResultSet
(
output_ids
,
output_distance
,
spec_k
,
nq
,
topk
,
metric_l2
,
search_job
->
GetResult
());
span
=
rc
.
RecordSection
(
hdr
+
", reduce topk"
);
// search_job->AccumReduceCost(span);
...
...
@@ -220,7 +229,7 @@ XSearchTask::Execute() {
// search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
}
// step
5
: notify to send result to client
// step
4
: notify to send result to client
search_job
->
SearchDone
(
index_id_
);
}
...
...
@@ -230,36 +239,37 @@ XSearchTask::Execute() {
index_engine_
=
nullptr
;
}
Status
XSearchTask
::
TopkResult
(
const
std
::
vector
<
int64_t
>&
input_ids
,
const
std
::
vector
<
float
>&
input_distance
,
uint64_t
input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
,
scheduler
::
ResultSet
&
result
)
{
scheduler
::
ResultSet
result_buf
;
void
XSearchTask
::
MergeTopkToResultSet
(
const
std
::
vector
<
int64_t
>&
input_ids
,
const
std
::
vector
<
float
>&
input_distance
,
uint64_t
input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
,
scheduler
::
ResultSet
&
result
)
{
if
(
result
.
empty
())
{
result_buf
.
resize
(
nq
,
scheduler
::
Id2DistVec
(
input_k
,
scheduler
::
IdDistPair
(
-
1
,
0.0
)));
for
(
auto
i
=
0
;
i
<
nq
;
++
i
)
{
auto
&
result_buf_i
=
result_buf
[
i
];
result
.
resize
(
nq
);
}
for
(
uint64_t
i
=
0
;
i
<
nq
;
i
++
)
{
scheduler
::
Id2DistVec
result_buf
;
auto
&
result_i
=
result
[
i
];
if
(
result
[
i
].
empty
())
{
result_buf
.
resize
(
input_k
,
scheduler
::
IdDistPair
(
-
1
,
0.0
));
uint64_t
input_k_multi_i
=
input_k
*
i
;
for
(
auto
k
=
0
;
k
<
input_k
;
++
k
)
{
uint64_t
idx
=
input_k_multi_i
+
k
;
auto
&
result_buf_item
=
result_buf
_i
[
k
];
auto
&
result_buf_item
=
result_buf
[
k
];
result_buf_item
.
first
=
input_ids
[
idx
];
result_buf_item
.
second
=
input_distance
[
idx
];
}
}
}
else
{
size_t
tar_size
=
result
[
0
].
size
();
uint64_t
output_k
=
std
::
min
(
topk
,
input_k
+
tar_size
);
result_buf
.
resize
(
nq
,
scheduler
::
Id2DistVec
(
output_k
,
scheduler
::
IdDistPair
(
-
1
,
0.0
)));
for
(
auto
i
=
0
;
i
<
nq
;
++
i
)
{
}
else
{
size_t
tar_size
=
result_i
.
size
();
uint64_t
output_k
=
std
::
min
(
topk
,
input_k
+
tar_size
);
result_buf
.
resize
(
output_k
,
scheduler
::
IdDistPair
(
-
1
,
0.0
));
size_t
buf_k
=
0
,
src_k
=
0
,
tar_k
=
0
;
uint64_t
src_idx
;
auto
&
result_i
=
result
[
i
];
auto
&
result_buf_i
=
result_buf
[
i
];
uint64_t
input_k_multi_i
=
input_k
*
i
;
while
(
buf_k
<
output_k
&&
src_k
<
input_k
&&
tar_k
<
tar_size
)
{
src_idx
=
input_k_multi_i
+
src_k
;
auto
&
result_buf_item
=
result_buf
_i
[
buf_k
];
auto
&
result_buf_item
=
result_buf
[
buf_k
];
auto
&
result_item
=
result_i
[
tar_k
];
if
((
ascending
&&
input_distance
[
src_idx
]
<
result_item
.
second
)
||
(
!
ascending
&&
input_distance
[
src_idx
]
>
result_item
.
second
))
{
...
...
@@ -273,11 +283,11 @@ XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector
buf_k
++
;
}
if
(
buf_k
<
top
k
)
{
if
(
buf_k
<
output_
k
)
{
if
(
src_k
<
input_k
)
{
while
(
buf_k
<
output_k
&&
src_k
<
input_k
)
{
src_idx
=
input_k_multi_i
+
src_k
;
auto
&
result_buf_item
=
result_buf
_i
[
buf_k
];
auto
&
result_buf_item
=
result_buf
[
buf_k
];
result_buf_item
.
first
=
input_ids
[
src_idx
];
result_buf_item
.
second
=
input_distance
[
src_idx
];
src_k
++
;
...
...
@@ -285,18 +295,79 @@ XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector
}
}
else
{
while
(
buf_k
<
output_k
&&
tar_k
<
tar_size
)
{
result_buf
_i
[
buf_k
]
=
result_i
[
tar_k
];
result_buf
[
buf_k
]
=
result_i
[
tar_k
];
tar_k
++
;
buf_k
++
;
}
}
}
}
result_i
.
swap
(
result_buf
);
}
}
void
XSearchTask
::
MergeTopkArray
(
std
::
vector
<
int64_t
>&
tar_ids
,
std
::
vector
<
float
>&
tar_distance
,
uint64_t
&
tar_input_k
,
const
std
::
vector
<
int64_t
>&
src_ids
,
const
std
::
vector
<
float
>&
src_distance
,
uint64_t
src_input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
)
{
if
(
src_ids
.
empty
()
||
src_distance
.
empty
())
{
return
;
}
result
.
swap
(
result_buf
);
std
::
vector
<
int64_t
>
id_buf
(
nq
*
topk
,
-
1
);
std
::
vector
<
float
>
dist_buf
(
nq
*
topk
,
0.0
);
uint64_t
output_k
=
std
::
min
(
topk
,
tar_input_k
+
src_input_k
);
uint64_t
buf_k
,
src_k
,
tar_k
;
uint64_t
src_idx
,
tar_idx
,
buf_idx
;
uint64_t
src_input_k_multi_i
,
tar_input_k_multi_i
,
buf_k_multi_i
;
for
(
uint64_t
i
=
0
;
i
<
nq
;
i
++
)
{
src_input_k_multi_i
=
src_input_k
*
i
;
tar_input_k_multi_i
=
tar_input_k
*
i
;
buf_k_multi_i
=
output_k
*
i
;
buf_k
=
src_k
=
tar_k
=
0
;
while
(
buf_k
<
output_k
&&
src_k
<
src_input_k
&&
tar_k
<
tar_input_k
)
{
src_idx
=
src_input_k_multi_i
+
src_k
;
tar_idx
=
tar_input_k_multi_i
+
tar_k
;
buf_idx
=
buf_k_multi_i
+
buf_k
;
if
((
ascending
&&
src_distance
[
src_idx
]
<
tar_distance
[
tar_idx
])
||
(
!
ascending
&&
src_distance
[
src_idx
]
>
tar_distance
[
tar_idx
]))
{
id_buf
[
buf_idx
]
=
src_ids
[
src_idx
];
dist_buf
[
buf_idx
]
=
src_distance
[
src_idx
];
src_k
++
;
}
else
{
id_buf
[
buf_idx
]
=
tar_ids
[
tar_idx
];
dist_buf
[
buf_idx
]
=
tar_distance
[
tar_idx
];
tar_k
++
;
}
buf_k
++
;
}
if
(
buf_k
<
output_k
)
{
if
(
src_k
<
src_input_k
)
{
while
(
buf_k
<
output_k
&&
src_k
<
src_input_k
)
{
src_idx
=
src_input_k_multi_i
+
src_k
;
id_buf
[
buf_idx
]
=
src_ids
[
src_idx
];
dist_buf
[
buf_idx
]
=
src_distance
[
src_idx
];
src_k
++
;
buf_k
++
;
}
}
else
{
while
(
buf_k
<
output_k
&&
tar_k
<
tar_input_k
)
{
id_buf
[
buf_idx
]
=
tar_ids
[
tar_idx
];
dist_buf
[
buf_idx
]
=
tar_distance
[
tar_idx
];
tar_k
++
;
buf_k
++
;
}
}
}
}
return
Status
::
OK
();
tar_ids
.
swap
(
id_buf
);
tar_distance
.
swap
(
dist_buf
);
tar_input_k
=
output_k
;
}
}
// namespace scheduler
...
...
cpp/src/scheduler/task/SearchTask.h
浏览文件 @
597456b4
...
...
@@ -38,9 +38,14 @@ class XSearchTask : public Task {
Execute
()
override
;
public:
static
Status
TopkResult
(
const
std
::
vector
<
int64_t
>&
input_ids
,
const
std
::
vector
<
float
>&
input_distance
,
uint64_t
input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
,
scheduler
::
ResultSet
&
result
);
static
void
MergeTopkToResultSet
(
const
std
::
vector
<
int64_t
>&
input_ids
,
const
std
::
vector
<
float
>&
input_distance
,
uint64_t
input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
,
scheduler
::
ResultSet
&
result
);
static
void
MergeTopkArray
(
std
::
vector
<
int64_t
>&
tar_ids
,
std
::
vector
<
float
>&
tar_distance
,
uint64_t
&
tar_input_k
,
const
std
::
vector
<
int64_t
>&
src_ids
,
const
std
::
vector
<
float
>&
src_distance
,
uint64_t
src_input_k
,
uint64_t
nq
,
uint64_t
topk
,
bool
ascending
);
public:
TableFileSchemaPtr
file_
;
...
...
@@ -49,8 +54,6 @@ class XSearchTask : public Task {
int
index_type_
=
0
;
ExecutionEnginePtr
index_engine_
=
nullptr
;
bool
metric_l2
=
true
;
static
std
::
mutex
merge_mutex_
;
};
}
// namespace scheduler
...
...
cpp/src/wrapper/VecImpl.cpp
浏览文件 @
597456b4
...
...
@@ -315,24 +315,40 @@ IVFHybridIndex::UnsetQuantizer() {
return
Status
::
OK
();
}
Status
VecIndexPtr
IVFHybridIndex
::
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
)
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
new_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
new_idx
->
LoadData
(
q
,
conf
);
return
std
::
make_shared
<
IVFHybridIndex
>
(
new_idx
->
LoadData
(
q
,
conf
),
type
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not support for index type: "
<<
int
(
type
);
return
Status
(
KNOWHERE_ERROR
,
"not support"
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_UNEXPECTED_ERROR
,
e
.
what
());
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_ERROR
,
e
.
what
());
}
return
Status
::
OK
();
return
nullptr
;
}
std
::
pair
<
VecIndexPtr
,
knowhere
::
QuantizerPtr
>
IVFHybridIndex
::
CopyToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
cfg
)
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
hybrid_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
auto
pair
=
hybrid_idx
->
CopyCpuToGpuWithQuantizer
(
device_id
,
cfg
);
auto
new_idx
=
std
::
make_shared
<
IVFHybridIndex
>
(
pair
.
first
,
type
);
return
std
::
make_pair
(
new_idx
,
pair
.
second
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not support for index type: "
<<
int
(
type
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
return
std
::
make_pair
(
nullptr
,
nullptr
);
}
}
// namespace engine
...
...
cpp/src/wrapper/VecImpl.h
浏览文件 @
597456b4
...
...
@@ -105,8 +105,10 @@ class IVFHybridIndex : public IVFMixIndex {
Status
UnsetQuantizer
()
override
;
std
::
pair
<
VecIndexPtr
,
knowhere
::
QuantizerPtr
>
CopyToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
cfg
)
override
;
Status
VecIndexPtr
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
)
override
;
};
...
...
cpp/src/wrapper/VecIndex.h
浏览文件 @
597456b4
...
...
@@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <utility>
#include "cache/DataObj.h"
#include "knowhere/common/BinarySet.h"
...
...
@@ -103,9 +104,9 @@ class VecIndex : public cache::DataObj {
return
nullptr
;
}
virtual
Status
virtual
VecIndexPtr
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
)
{
return
Status
::
OK
()
;
return
nullptr
;
}
virtual
Status
...
...
@@ -117,6 +118,11 @@ class VecIndex : public cache::DataObj {
UnsetQuantizer
()
{
return
Status
::
OK
();
}
virtual
std
::
pair
<
VecIndexPtr
,
knowhere
::
QuantizerPtr
>
CopyToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
cfg
=
Config
())
{
return
std
::
make_pair
(
nullptr
,
nullptr
);
}
////////////////
private:
int64_t
size_
=
0
;
...
...
cpp/unittest/db/test_db.cpp
浏览文件 @
597456b4
...
...
@@ -297,6 +297,7 @@ TEST_F(DBTest, SEARCH_TEST) {
ASSERT_TRUE
(
stat
.
ok
());
}
#ifdef CUSTOMIZATION
//test FAISS_IVFSQ8H optimizer
index
.
engine_type_
=
(
int
)
milvus
::
engine
::
EngineType
::
FAISS_IVFSQ8H
;
db_
->
CreateIndex
(
TABLE_NAME
,
index
);
// wait until build index finish
...
...
@@ -314,9 +315,7 @@ TEST_F(DBTest, SEARCH_TEST) {
stat
=
db_
->
Query
(
TABLE_NAME
,
file_ids
,
k
,
nq
,
10
,
xq
.
data
(),
dates
,
results
);
ASSERT_TRUE
(
stat
.
ok
());
}
// TODO(lxj): add groundTruth assert
#endif
}
TEST_F
(
DBTest
,
PRELOADTABLE_TEST
)
{
...
...
cpp/unittest/db/test_search.cpp
浏览文件 @
597456b4
...
...
@@ -21,26 +21,51 @@
#include "scheduler/task/SearchTask.h"
#include "utils/TimeRecorder.h"
#include "utils/ThreadPool.h"
namespace
{
namespace
ms
=
milvus
::
scheduler
;
void
BuildResult
(
uint64_t
nq
,
BuildResult
(
std
::
vector
<
int64_t
>&
output_ids
,
std
::
vector
<
float
>&
output_distance
,
uint64_t
topk
,
bool
ascending
,
std
::
vector
<
int64_t
>&
output_ids
,
std
::
vector
<
float
>&
output_distence
)
{
uint64_t
nq
,
bool
ascending
)
{
output_ids
.
clear
();
output_ids
.
resize
(
nq
*
topk
);
output_dist
e
nce
.
clear
();
output_dist
e
nce
.
resize
(
nq
*
topk
);
output_dist
a
nce
.
clear
();
output_dist
a
nce
.
resize
(
nq
*
topk
);
for
(
uint64_t
i
=
0
;
i
<
nq
;
i
++
)
{
for
(
uint64_t
j
=
0
;
j
<
topk
;
j
++
)
{
output_ids
[
i
*
topk
+
j
]
=
(
int64_t
)(
drand48
()
*
100000
);
output_distence
[
i
*
topk
+
j
]
=
ascending
?
(
j
+
drand48
())
:
((
topk
-
j
)
+
drand48
());
output_distance
[
i
*
topk
+
j
]
=
ascending
?
(
j
+
drand48
())
:
((
topk
-
j
)
+
drand48
());
}
}
}
void
CopyResult
(
std
::
vector
<
int64_t
>&
output_ids
,
std
::
vector
<
float
>&
output_distance
,
uint64_t
output_topk
,
std
::
vector
<
int64_t
>&
input_ids
,
std
::
vector
<
float
>&
input_distance
,
uint64_t
input_topk
,
uint64_t
nq
)
{
ASSERT_TRUE
(
input_ids
.
size
()
>=
nq
*
input_topk
);
ASSERT_TRUE
(
input_distance
.
size
()
>=
nq
*
input_topk
);
ASSERT_TRUE
(
output_topk
<=
input_topk
);
output_ids
.
clear
();
output_ids
.
resize
(
nq
*
output_topk
);
output_distance
.
clear
();
output_distance
.
resize
(
nq
*
output_topk
);
for
(
uint64_t
i
=
0
;
i
<
nq
;
i
++
)
{
for
(
uint64_t
j
=
0
;
j
<
output_topk
;
j
++
)
{
output_ids
[
i
*
output_topk
+
j
]
=
input_ids
[
i
*
input_topk
+
j
];
output_distance
[
i
*
output_topk
+
j
]
=
input_distance
[
i
*
input_topk
+
j
];
}
}
}
...
...
@@ -50,8 +75,8 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
const
std
::
vector
<
float
>&
input_distance_1
,
const
std
::
vector
<
int64_t
>&
input_ids_2
,
const
std
::
vector
<
float
>&
input_distance_2
,
uint64_t
nq
,
uint64_t
topk
,
uint64_t
nq
,
bool
ascending
,
const
milvus
::
scheduler
::
ResultSet
&
result
)
{
ASSERT_EQ
(
result
.
size
(),
nq
);
...
...
@@ -91,43 +116,36 @@ TEST(DBSearchTest, TOPK_TEST) {
bool
ascending
;
std
::
vector
<
int64_t
>
ids1
,
ids2
;
std
::
vector
<
float
>
dist1
,
dist2
;
milvus
::
scheduler
::
ResultSet
result
;
milvus
::
Status
status
;
ms
::
ResultSet
result
;
/* test1, id1/dist1 valid, id2/dist2 empty */
ascending
=
true
;
BuildResult
(
NQ
,
TOP_K
,
ascending
,
ids1
,
dist1
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids1
,
dist1
,
TOP_K
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult
(
NQ
,
TOP_K
,
ascending
,
ids2
,
dist2
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test3, id1/dist1 small topk */
ids1
.
clear
();
dist1
.
clear
();
result
.
clear
();
BuildResult
(
NQ
,
TOP_K
/
2
,
ascending
,
ids1
,
dist1
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2
.
clear
();
dist2
.
clear
();
result
.
clear
();
BuildResult
(
NQ
,
TOP_K
/
3
,
ascending
,
ids2
,
dist2
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/////////////////////////////////////////////////////////////////////////////////////////
ascending
=
false
;
...
...
@@ -138,71 +156,199 @@ TEST(DBSearchTest, TOPK_TEST) {
result
.
clear
();
/* test1, id1/dist1 valid, id2/dist2 empty */
BuildResult
(
NQ
,
TOP_K
,
ascending
,
ids1
,
dist1
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids1
,
dist1
,
TOP_K
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test2, id1/dist1 valid, id2/dist2 valid */
BuildResult
(
NQ
,
TOP_K
,
ascending
,
ids2
,
dist2
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test3, id1/dist1 small topk */
ids1
.
clear
();
dist1
.
clear
();
result
.
clear
();
BuildResult
(
NQ
,
TOP_K
/
2
,
ascending
,
ids1
,
dist1
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
/* test4, id1/dist1 small topk, id2/dist2 small topk */
ids2
.
clear
();
dist2
.
clear
();
result
.
clear
();
BuildResult
(
NQ
,
TOP_K
/
3
,
ascending
,
ids2
,
dist2
);
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
TOP_K
,
ascending
,
result
);
ASSERT_TRUE
(
status
.
ok
());
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
NQ
,
TOP_K
,
ascending
,
result
);
BuildResult
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
ascending
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids1
,
dist1
,
TOP_K
/
2
,
NQ
,
TOP_K
,
ascending
,
result
);
ms
::
XSearchTask
::
MergeTopkToResultSet
(
ids2
,
dist2
,
TOP_K
/
3
,
NQ
,
TOP_K
,
ascending
,
result
);
CheckTopkResult
(
ids1
,
dist1
,
ids2
,
dist2
,
TOP_K
,
NQ
,
ascending
,
result
);
}
TEST
(
DBSearchTest
,
REDUCE_PERF_TEST
)
{
int32_t
nq
=
100
;
int32_t
top_k
=
1000
;
int32_t
index_file_num
=
478
;
/* sift1B dataset, index files num */
bool
ascending
=
true
;
std
::
vector
<
int32_t
>
thread_vec
=
{
4
,
8
,
11
};
std
::
vector
<
int32_t
>
nq_vec
=
{
1
,
10
,
100
,
1000
};
std
::
vector
<
int32_t
>
topk_vec
=
{
1
,
4
,
16
,
64
,
256
,
1024
};
int32_t
NQ
=
nq_vec
[
nq_vec
.
size
()
-
1
];
int32_t
TOPK
=
topk_vec
[
topk_vec
.
size
()
-
1
];
std
::
vector
<
std
::
vector
<
int64_t
>>
id_vec
;
std
::
vector
<
std
::
vector
<
float
>>
dist_vec
;
std
::
vector
<
int64_t
>
input_ids
;
std
::
vector
<
float
>
input_distance
;
milvus
::
scheduler
::
ResultSet
final_result
;
milvus
::
Status
status
;
double
span
,
reduce_cost
=
0.0
;
milvus
::
TimeRecorder
rc
(
""
);
for
(
int32_t
i
=
0
;
i
<
index_file_num
;
i
++
)
{
BuildResult
(
nq
,
top_k
,
ascending
,
input_ids
,
input_distance
);
rc
.
RecordSection
(
"do search for context: "
+
std
::
to_string
(
i
));
// pick up topk result
status
=
milvus
::
scheduler
::
XSearchTask
::
TopkResult
(
input_ids
,
input_distance
,
top_k
,
nq
,
top_k
,
ascending
,
final_result
);
ASSERT_TRUE
(
status
.
ok
());
ASSERT_EQ
(
final_result
.
size
(),
nq
);
span
=
rc
.
RecordSection
(
"reduce topk for context: "
+
std
::
to_string
(
i
));
reduce_cost
+=
span
;
int32_t
i
,
k
,
step
;
/* generate testing data */
for
(
i
=
0
;
i
<
index_file_num
;
i
++
)
{
BuildResult
(
input_ids
,
input_distance
,
TOPK
,
NQ
,
ascending
);
id_vec
.
push_back
(
input_ids
);
dist_vec
.
push_back
(
input_distance
);
}
for
(
int32_t
max_thread_num
:
thread_vec
)
{
milvus
::
ThreadPool
threadPool
(
max_thread_num
);
std
::
list
<
std
::
future
<
void
>>
threads_list
;
for
(
int32_t
nq
:
nq_vec
)
{
for
(
int32_t
top_k
:
topk_vec
)
{
ms
::
ResultSet
final_result
,
final_result_2
,
final_result_3
;
std
::
vector
<
std
::
vector
<
int64_t
>>
id_vec_1
(
index_file_num
);
std
::
vector
<
std
::
vector
<
float
>>
dist_vec_1
(
index_file_num
);
for
(
i
=
0
;
i
<
index_file_num
;
i
++
)
{
CopyResult
(
id_vec_1
[
i
],
dist_vec_1
[
i
],
top_k
,
id_vec
[
i
],
dist_vec
[
i
],
TOPK
,
nq
);
}
std
::
string
str1
=
"Method-1 "
+
std
::
to_string
(
max_thread_num
)
+
" "
+
std
::
to_string
(
nq
)
+
" "
+
std
::
to_string
(
top_k
);
milvus
::
TimeRecorder
rc1
(
str1
);
///////////////////////////////////////////////////////////////////////////////////////
/* method-1 */
for
(
i
=
0
;
i
<
index_file_num
;
i
++
)
{
ms
::
XSearchTask
::
MergeTopkToResultSet
(
id_vec_1
[
i
],
dist_vec_1
[
i
],
top_k
,
nq
,
top_k
,
ascending
,
final_result
);
ASSERT_EQ
(
final_result
.
size
(),
nq
);
}
rc1
.
RecordSection
(
"reduce done"
);
///////////////////////////////////////////////////////////////////////////////////////
/* method-2 */
std
::
vector
<
std
::
vector
<
int64_t
>>
id_vec_2
(
index_file_num
);
std
::
vector
<
std
::
vector
<
float
>>
dist_vec_2
(
index_file_num
);
std
::
vector
<
uint64_t
>
k_vec_2
(
index_file_num
);
for
(
i
=
0
;
i
<
index_file_num
;
i
++
)
{
CopyResult
(
id_vec_2
[
i
],
dist_vec_2
[
i
],
top_k
,
id_vec
[
i
],
dist_vec
[
i
],
TOPK
,
nq
);
k_vec_2
[
i
]
=
top_k
;
}
std
::
string
str2
=
"Method-2 "
+
std
::
to_string
(
max_thread_num
)
+
" "
+
std
::
to_string
(
nq
)
+
" "
+
std
::
to_string
(
top_k
);
milvus
::
TimeRecorder
rc2
(
str2
);
for
(
step
=
1
;
step
<
index_file_num
;
step
*=
2
)
{
for
(
i
=
0
;
i
+
step
<
index_file_num
;
i
+=
step
*
2
)
{
ms
::
XSearchTask
::
MergeTopkArray
(
id_vec_2
[
i
],
dist_vec_2
[
i
],
k_vec_2
[
i
],
id_vec_2
[
i
+
step
],
dist_vec_2
[
i
+
step
],
k_vec_2
[
i
+
step
],
nq
,
top_k
,
ascending
);
}
}
ms
::
XSearchTask
::
MergeTopkToResultSet
(
id_vec_2
[
0
],
dist_vec_2
[
0
],
k_vec_2
[
0
],
nq
,
top_k
,
ascending
,
final_result_2
);
ASSERT_EQ
(
final_result_2
.
size
(),
nq
);
rc2
.
RecordSection
(
"reduce done"
);
for
(
i
=
0
;
i
<
nq
;
i
++
)
{
ASSERT_EQ
(
final_result
[
i
].
size
(),
final_result_2
[
i
].
size
());
for
(
k
=
0
;
k
<
final_result
[
i
].
size
();
k
++
)
{
if
(
final_result
[
i
][
k
].
first
!=
final_result_2
[
i
][
k
].
first
)
{
std
::
cout
<<
i
<<
" "
<<
k
<<
std
::
endl
;
}
ASSERT_EQ
(
final_result
[
i
][
k
].
first
,
final_result_2
[
i
][
k
].
first
);
ASSERT_EQ
(
final_result
[
i
][
k
].
second
,
final_result_2
[
i
][
k
].
second
);
}
}
///////////////////////////////////////////////////////////////////////////////////////
/* method-3 parallel */
std
::
vector
<
std
::
vector
<
int64_t
>>
id_vec_3
(
index_file_num
);
std
::
vector
<
std
::
vector
<
float
>>
dist_vec_3
(
index_file_num
);
std
::
vector
<
uint64_t
>
k_vec_3
(
index_file_num
);
for
(
i
=
0
;
i
<
index_file_num
;
i
++
)
{
CopyResult
(
id_vec_3
[
i
],
dist_vec_3
[
i
],
top_k
,
id_vec
[
i
],
dist_vec
[
i
],
TOPK
,
nq
);
k_vec_3
[
i
]
=
top_k
;
}
std
::
string
str3
=
"Method-3 "
+
std
::
to_string
(
max_thread_num
)
+
" "
+
std
::
to_string
(
nq
)
+
" "
+
std
::
to_string
(
top_k
);
milvus
::
TimeRecorder
rc3
(
str3
);
for
(
step
=
1
;
step
<
index_file_num
;
step
*=
2
)
{
for
(
i
=
0
;
i
+
step
<
index_file_num
;
i
+=
step
*
2
)
{
threads_list
.
push_back
(
threadPool
.
enqueue
(
ms
::
XSearchTask
::
MergeTopkArray
,
std
::
ref
(
id_vec_3
[
i
]),
std
::
ref
(
dist_vec_3
[
i
]),
std
::
ref
(
k_vec_3
[
i
]),
std
::
ref
(
id_vec_3
[
i
+
step
]),
std
::
ref
(
dist_vec_3
[
i
+
step
]),
std
::
ref
(
k_vec_3
[
i
+
step
]),
nq
,
top_k
,
ascending
));
}
while
(
threads_list
.
size
()
>
0
)
{
int
nready
=
0
;
for
(
auto
it
=
threads_list
.
begin
();
it
!=
threads_list
.
end
();
it
=
it
)
{
auto
&
p
=
*
it
;
std
::
chrono
::
milliseconds
span
(
0
);
if
(
p
.
wait_for
(
span
)
==
std
::
future_status
::
ready
)
{
threads_list
.
erase
(
it
++
);
++
nready
;
}
else
{
++
it
;
}
}
if
(
nready
==
0
)
{
std
::
this_thread
::
yield
();
}
}
}
ms
::
XSearchTask
::
MergeTopkToResultSet
(
id_vec_3
[
0
],
dist_vec_3
[
0
],
k_vec_3
[
0
],
nq
,
top_k
,
ascending
,
final_result_3
);
ASSERT_EQ
(
final_result_3
.
size
(),
nq
);
rc3
.
RecordSection
(
"reduce done"
);
for
(
i
=
0
;
i
<
nq
;
i
++
)
{
ASSERT_EQ
(
final_result
[
i
].
size
(),
final_result_3
[
i
].
size
());
for
(
k
=
0
;
k
<
final_result
[
i
].
size
();
k
++
)
{
ASSERT_EQ
(
final_result
[
i
][
k
].
first
,
final_result_3
[
i
][
k
].
first
);
ASSERT_EQ
(
final_result
[
i
][
k
].
second
,
final_result_3
[
i
][
k
].
second
);
}
}
}
}
}
std
::
cout
<<
"total reduce time: "
<<
reduce_cost
/
1000
<<
" ms"
<<
std
::
endl
;
}
cpp/unittest/scheduler/test_resource_mgr.cpp
浏览文件 @
597456b4
...
...
@@ -165,7 +165,9 @@ class ResourceMgrAdvanceTest : public testing::Test {
SetUp
()
override
{
mgr1_
=
std
::
make_shared
<
ResourceMgr
>
();
disk_res
=
std
::
make_shared
<
DiskResource
>
(
"disk"
,
0
,
true
,
false
);
cpu_res
=
std
::
make_shared
<
CpuResource
>
(
"cpu"
,
0
,
true
,
true
);
mgr1_
->
Add
(
ResourcePtr
(
disk_res
));
mgr1_
->
Add
(
ResourcePtr
(
cpu_res
));
mgr1_
->
Start
();
}
...
...
@@ -176,6 +178,7 @@ class ResourceMgrAdvanceTest : public testing::Test {
ResourceMgrPtr
mgr1_
;
ResourcePtr
disk_res
;
ResourcePtr
cpu_res
;
};
TEST_F
(
ResourceMgrAdvanceTest
,
REGISTER_SUBSCRIBER
)
{
...
...
cpp/unittest/scheduler/test_scheduler.cpp
浏览文件 @
597456b4
...
...
@@ -28,18 +28,17 @@
#include "utils/Error.h"
#include "wrapper/VecIndex.h"
namespace
milvus
{
namespace
scheduler
{
class
MockVecIndex
:
public
engine
::
VecIndex
{
public:
virtual
Status
BuildAll
(
const
int64_t
&
nb
,
const
float
*
xb
,
const
int64_t
*
ids
,
const
engine
::
Config
&
cfg
,
const
int64_t
&
nt
=
0
,
const
float
*
xt
=
nullptr
)
{
virtual
Status
BuildAll
(
const
int64_t
&
nb
,
const
float
*
xb
,
const
int64_t
*
ids
,
const
engine
::
Config
&
cfg
,
const
int64_t
&
nt
=
0
,
const
float
*
xt
=
nullptr
)
{
}
engine
::
VecIndexPtr
Clone
()
override
{
...
...
@@ -54,23 +53,23 @@ class MockVecIndex : public engine::VecIndex {
return
engine
::
IndexType
::
INVALID
;
}
virtual
Status
Add
(
const
int64_t
&
nb
,
const
float
*
xb
,
const
int64_t
*
ids
,
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
virtual
Status
Add
(
const
int64_t
&
nb
,
const
float
*
xb
,
const
int64_t
*
ids
,
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
}
virtual
Status
Search
(
const
int64_t
&
nq
,
const
float
*
xq
,
float
*
dist
,
int64_t
*
ids
,
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
virtual
Status
Search
(
const
int64_t
&
nq
,
const
float
*
xq
,
float
*
dist
,
int64_t
*
ids
,
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
}
engine
::
VecIndexPtr
CopyToGpu
(
const
int64_t
&
device_id
,
const
engine
::
Config
&
cfg
)
override
{
engine
::
VecIndexPtr
CopyToGpu
(
const
int64_t
&
device_id
,
const
engine
::
Config
&
cfg
)
override
{
}
engine
::
VecIndexPtr
CopyToCpu
(
const
engine
::
Config
&
cfg
)
override
{
engine
::
VecIndexPtr
CopyToCpu
(
const
engine
::
Config
&
cfg
)
override
{
}
virtual
int64_t
Dimension
()
{
...
...
@@ -86,7 +85,7 @@ class MockVecIndex : public engine::VecIndex {
return
binset
;
}
virtual
Status
Load
(
const
knowhere
::
BinarySet
&
index_binary
)
{
virtual
Status
Load
(
const
knowhere
::
BinarySet
&
index_binary
)
{
}
public:
...
...
@@ -102,11 +101,13 @@ class SchedulerTest : public testing::Test {
cache
::
GpuCacheMgr
::
GetInstance
(
0
)
->
SetCapacity
(
cache_cap
);
cache
::
GpuCacheMgr
::
GetInstance
(
1
)
->
SetCapacity
(
cache_cap
);
ResourcePtr
disk
=
ResourceFactory
::
Create
(
"disk"
,
"DISK"
,
0
,
true
,
false
);
ResourcePtr
cpu
=
ResourceFactory
::
Create
(
"cpu"
,
"CPU"
,
0
,
true
,
false
);
ResourcePtr
gpu_0
=
ResourceFactory
::
Create
(
"gpu0"
,
"GPU"
,
0
);
ResourcePtr
gpu_1
=
ResourceFactory
::
Create
(
"gpu1"
,
"GPU"
,
1
);
res_mgr_
=
std
::
make_shared
<
ResourceMgr
>
();
disk_resource_
=
res_mgr_
->
Add
(
std
::
move
(
disk
));
cpu_resource_
=
res_mgr_
->
Add
(
std
::
move
(
cpu
));
gpu_resource_0_
=
res_mgr_
->
Add
(
std
::
move
(
gpu_0
));
gpu_resource_1_
=
res_mgr_
->
Add
(
std
::
move
(
gpu_1
));
...
...
@@ -127,6 +128,7 @@ class SchedulerTest : public testing::Test {
res_mgr_
->
Stop
();
}
ResourceWPtr
disk_resource_
;
ResourceWPtr
cpu_resource_
;
ResourceWPtr
gpu_resource_0_
;
ResourceWPtr
gpu_resource_1_
;
...
...
@@ -137,7 +139,7 @@ class SchedulerTest : public testing::Test {
void
insert_dummy_index_into_gpu_cache
(
uint64_t
device_id
)
{
MockVecIndex
*
mock_index
=
new
MockVecIndex
();
MockVecIndex
*
mock_index
=
new
MockVecIndex
();
mock_index
->
ntotal_
=
1000
;
engine
::
VecIndexPtr
index
(
mock_index
);
...
...
@@ -224,6 +226,7 @@ class SchedulerTest2 : public testing::Test {
TearDown
()
override
{
scheduler_
->
Stop
();
res_mgr_
->
Stop
();
res_mgr_
->
Clear
();
}
ResourceWPtr
disk_
;
...
...
@@ -237,22 +240,22 @@ class SchedulerTest2 : public testing::Test {
std
::
shared_ptr
<
Scheduler
>
scheduler_
;
};
TEST_F
(
SchedulerTest2
,
SPECIFIED_RESOURCE_TEST
)
{
const
uint64_t
NUM
=
10
;
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
TableFileSchemaPtr
dummy
=
std
::
make_shared
<
TableFileSchema
>
();
dummy
->
location_
=
"location"
;
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
auto
label
=
std
::
make_shared
<
DefaultLabel
>
();
std
::
shared_ptr
<
TestTask
>
task
=
std
::
make_shared
<
TestTask
>
(
dummy
,
label
);
task
->
label
()
=
std
::
make_shared
<
SpecResLabel
>
(
disk_
);
tasks
.
push_back
(
task
);
disk_
.
lock
()
->
task_table
().
Put
(
task
);
}
//
TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
// const uint64_t NUM = 2
;
//
std::vector<std::shared_ptr<TestTask>> tasks;
//
TableFileSchemaPtr dummy = std::make_shared<TableFileSchema>();
//
dummy->location_ = "location";
//
//
for (uint64_t i = 0; i < NUM; ++i) {
//
auto label = std::make_shared<DefaultLabel>();
//
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy, label);
//
task->label() = std::make_shared<SpecResLabel>(disk_);
//
tasks.push_back(task);
//
disk_.lock()->task_table().Put(task);
//
}
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
//
}
}
// namespace scheduler
}
// namespace milvus
...
...
cpp/unittest/wrapper/test_wrapper.cpp
浏览文件 @
597456b4
...
...
@@ -188,7 +188,7 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
10
,
10
),
std
::
make_tuple
(
milvus
::
engine
::
IndexType
::
FAISS_IVFSQ8_CPU
,
"Default"
,
DIM
,
NB
,
10
,
10
),
std
::
make_tuple
(
milvus
::
engine
::
IndexType
::
FAISS_IVFSQ8_GPU
,
"Default"
,
DIM
,
NB
,
10
,
10
),
//
std::make_tuple(milvus::engine::IndexType::FAISS_IVFSQ8_GPU, "Default", DIM, NB, 10, 10),
std
::
make_tuple
(
milvus
::
engine
::
IndexType
::
FAISS_IVFSQ8_MIX
,
"Default"
,
DIM
,
NB
,
10
,
10
),
// std::make_tuple(IndexType::NSG_MIX, "Default", 128, 250000, 10, 10),
// std::make_tuple(IndexType::SPTAG_KDT_RNT_CPU, "Default", 128, 250000, 10, 10),
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录