Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
ab163356
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
ab163356
编写于
9月 09, 2019
作者:
S
starlord
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'source/branch-0.4.0' into branch-0.4.0
Former-commit-id: d1b9d0877742427d050cac21bbe4da88126f0a6c
上级
d5caefd3
c2cb745d
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
481 addition
and
363 deletion
+481
-363
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+7
-0
cpp/conf/server_config.template
cpp/conf/server_config.template
+5
-18
cpp/src/core/include/knowhere/index/vector_index/nsg/nsg.h
cpp/src/core/include/knowhere/index/vector_index/nsg/nsg.h
+2
-2
cpp/src/core/src/knowhere/index/vector_index/nsg/nsg.cpp
cpp/src/core/src/knowhere/index/vector_index/nsg/nsg.cpp
+24
-24
cpp/src/core/test/CMakeLists.txt
cpp/src/core/test/CMakeLists.txt
+1
-1
cpp/src/core/test/test_idmap.cpp
cpp/src/core/test/test_idmap.cpp
+1
-5
cpp/src/core/test/test_ivf.cpp
cpp/src/core/test/test_ivf.cpp
+16
-16
cpp/src/core/test/test_nsg/test_nsg.cpp
cpp/src/core/test/test_nsg/test_nsg.cpp
+8
-0
cpp/src/scheduler/SchedInst.cpp
cpp/src/scheduler/SchedInst.cpp
+2
-1
cpp/src/scheduler/event/Event.h
cpp/src/scheduler/event/Event.h
+2
-4
cpp/src/scheduler/task/SearchTask.cpp
cpp/src/scheduler/task/SearchTask.cpp
+67
-69
cpp/unittest/scheduler/event_test.cpp
cpp/unittest/scheduler/event_test.cpp
+54
-0
cpp/unittest/scheduler/normal_test.cpp
cpp/unittest/scheduler/normal_test.cpp
+1
-1
cpp/unittest/scheduler/resource_test.cpp
cpp/unittest/scheduler/resource_test.cpp
+40
-0
cpp/unittest/scheduler/scheduler_test.cpp
cpp/unittest/scheduler/scheduler_test.cpp
+228
-222
cpp/unittest/scheduler/task_test.cpp
cpp/unittest/scheduler/task_test.cpp
+23
-0
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
ab163356
...
@@ -101,6 +101,13 @@ Please mark all change in change log and use the ticket from JIRA.
...
@@ -101,6 +101,13 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-511 - Update resource_test in scheduler
-
MS-511 - Update resource_test in scheduler
-
MS-517 - Update resource_mgr_test in scheduler
-
MS-517 - Update resource_mgr_test in scheduler
-
MS-518 - Add schedinst_test in scheduler
-
MS-518 - Add schedinst_test in scheduler
-
MS-519 - Add event_test in scheduler
-
MS-520 - Update resource_test in scheduler
-
MS-524 - Add some unittest in event_test and resource_test
-
MS-525 - Disable parallel reduce in SearchTask
-
MS-527 - Update scheduler_test and enable it
-
MS-528 - Hide some config used future
-
MS-530 - Add unittest for SearchTask->Load
## New Feature
## New Feature
-
MS-343 - Implement ResourceMgr
-
MS-343 - Implement ResourceMgr
...
...
cpp/conf/server_config.template
浏览文件 @
ab163356
...
@@ -48,51 +48,38 @@ resource_config:
...
@@ -48,51 +48,38 @@ resource_config:
# example:
# example:
# resource_name: # resource name, just using in connections below
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# type: DISK # resource type, optional: DISK/CPU/GPU
# memory: 256 # memory size, unit: GB
# device_id: 0
# device_id: 0
# enable_loader: true # if is enable loader, optional: true, false
# enable_executor: false # if is enable executor, optional: true, false
# enable_executor: false # if is enable executor, optional: true, false
resources:
resources:
ssda:
ssda:
type: DISK
type: DISK
memory: 2048
device_id: 0
device_id: 0
enable_loader: true
enable_executor: false
enable_executor: false
cpu:
cpu:
type: CPU
type: CPU
memory: 64
device_id: 0
device_id: 0
enable_loader: true
enable_executor: false
enable_executor: false
gpu0:
gpu0:
type: GPU
type: GPU
memory: 6
device_id: 0
device_id: 0
enable_loader: true
enable_executor: true
enable_executor: true
gpu_resource_num: 2
gpu_resource_num: 2
pinned_memory: 300
pinned_memory: 300
temp_memory: 300
temp_memory: 300
# gtx1660:
# type: GPU
# memory: 6
# device_id: 1
# enable_loader: true
# enable_executor: true
# connection list, length: 0~N
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
# example:
# connection_name:
# speed: 100 # unit: MS/s
# endpoint: ${resource_name}===${resource_name}
connections:
connections:
io:
io:
speed: 500
speed: 500
endpoint: ssda===cpu
endpoint: ssda===cpu
pcie:
pcie
0
:
speed: 11000
speed: 11000
endpoint: cpu===gpu0
endpoint: cpu===gpu0
# - cpu===gtx1660
cpp/src/core/include/knowhere/index/vector_index/nsg/nsg.h
浏览文件 @
ab163356
...
@@ -138,8 +138,8 @@ class NsgIndex {
...
@@ -138,8 +138,8 @@ class NsgIndex {
void
FindUnconnectedNode
(
boost
::
dynamic_bitset
<>
&
flags
,
int64_t
&
root
);
void
FindUnconnectedNode
(
boost
::
dynamic_bitset
<>
&
flags
,
int64_t
&
root
);
private:
//
private:
void
GetKnnGraphFromFile
();
//
void GetKnnGraphFromFile();
};
};
}
}
...
...
cpp/src/core/src/knowhere/index/vector_index/nsg/nsg.cpp
浏览文件 @
ab163356
...
@@ -722,30 +722,30 @@ void NsgIndex::SetKnnGraph(Graph &g) {
...
@@ -722,30 +722,30 @@ void NsgIndex::SetKnnGraph(Graph &g) {
knng
=
std
::
move
(
g
);
knng
=
std
::
move
(
g
);
}
}
void
NsgIndex
::
GetKnnGraphFromFile
()
{
//
void NsgIndex::GetKnnGraphFromFile() {
//std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.1M.50NN.graph";
//
//std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.1M.50NN.graph";
std
::
string
filename
=
"/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.50NN.graph"
;
//
std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.50NN.graph";
//
std
::
ifstream
in
(
filename
,
std
::
ios
::
binary
);
//
std::ifstream in(filename, std::ios::binary);
unsigned
k
;
//
unsigned k;
in
.
read
((
char
*
)
&
k
,
sizeof
(
unsigned
));
//
in.read((char *) &k, sizeof(unsigned));
in
.
seekg
(
0
,
std
::
ios
::
end
);
//
in.seekg(0, std::ios::end);
std
::
ios
::
pos_type
ss
=
in
.
tellg
();
//
std::ios::pos_type ss = in.tellg();
size_t
fsize
=
(
size_t
)
ss
;
//
size_t fsize = (size_t) ss;
size_t
num
=
(
unsigned
)
(
fsize
/
(
k
+
1
)
/
4
);
//
size_t num = (unsigned) (fsize / (k + 1) / 4);
in
.
seekg
(
0
,
std
::
ios
::
beg
);
//
in.seekg(0, std::ios::beg);
//
knng
.
resize
(
num
);
//
knng.resize(num);
knng
.
reserve
(
num
);
//
knng.reserve(num);
unsigned
kk
=
(
k
+
3
)
/
4
*
4
;
//
unsigned kk = (k + 3) / 4 * 4;
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
//
for (size_t i = 0; i < num; i++) {
in
.
seekg
(
4
,
std
::
ios
::
cur
);
//
in.seekg(4, std::ios::cur);
knng
[
i
].
resize
(
k
);
//
knng[i].resize(k);
knng
[
i
].
reserve
(
kk
);
//
knng[i].reserve(kk);
in
.
read
((
char
*
)
knng
[
i
].
data
(),
k
*
sizeof
(
unsigned
));
//
in.read((char *) knng[i].data(), k * sizeof(unsigned));
}
//
}
in
.
close
();
//
in.close();
}
//
}
}
}
}
}
...
...
cpp/src/core/test/CMakeLists.txt
浏览文件 @
ab163356
...
@@ -78,6 +78,6 @@ install(TARGETS test_ivf DESTINATION unittest)
...
@@ -78,6 +78,6 @@ install(TARGETS test_ivf DESTINATION unittest)
install
(
TARGETS test_idmap DESTINATION unittest
)
install
(
TARGETS test_idmap DESTINATION unittest
)
install
(
TARGETS test_kdt DESTINATION unittest
)
install
(
TARGETS test_kdt DESTINATION unittest
)
add_subdirectory
(
faiss_ori
)
#
add_subdirectory(faiss_ori)
add_subdirectory
(
test_nsg
)
add_subdirectory
(
test_nsg
)
cpp/src/core/test/test_idmap.cpp
浏览文件 @
ab163356
...
@@ -148,10 +148,6 @@ TEST_F(IDMAPTest, copy_test) {
...
@@ -148,10 +148,6 @@ TEST_F(IDMAPTest, copy_test) {
{
{
// cpu to gpu
// cpu to gpu
static
int64_t
device_id
=
0
;
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
0
);
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
1
);
auto
clone_index
=
CopyCpuToGpu
(
index_
,
device_id
,
Config
());
auto
clone_index
=
CopyCpuToGpu
(
index_
,
device_id
,
Config
());
auto
clone_result
=
clone_index
->
Search
(
query_dataset
,
Config
::
object
{{
"k"
,
k
}});
auto
clone_result
=
clone_index
->
Search
(
query_dataset
,
Config
::
object
{{
"k"
,
k
}});
AssertAnns
(
clone_result
,
nq
,
k
);
AssertAnns
(
clone_result
,
nq
,
k
);
...
@@ -169,7 +165,7 @@ TEST_F(IDMAPTest, copy_test) {
...
@@ -169,7 +165,7 @@ TEST_F(IDMAPTest, copy_test) {
assert
(
std
::
static_pointer_cast
<
IDMAP
>
(
host_index
)
->
GetRawIds
()
!=
nullptr
);
assert
(
std
::
static_pointer_cast
<
IDMAP
>
(
host_index
)
->
GetRawIds
()
!=
nullptr
);
// gpu to gpu
// gpu to gpu
auto
device_index
=
CopyCpuToGpu
(
index_
,
1
,
Config
());
auto
device_index
=
CopyCpuToGpu
(
index_
,
device_id
,
Config
());
auto
device_result
=
device_index
->
Search
(
query_dataset
,
Config
::
object
{{
"k"
,
k
}});
auto
device_result
=
device_index
->
Search
(
query_dataset
,
Config
::
object
{{
"k"
,
k
}});
AssertAnns
(
device_result
,
nq
,
k
);
AssertAnns
(
device_result
,
nq
,
k
);
//assert(std::static_pointer_cast<GPUIDMAP>(device_index)->GetRawVectors() != nullptr);
//assert(std::static_pointer_cast<GPUIDMAP>(device_index)->GetRawVectors() != nullptr);
...
...
cpp/src/core/test/test_ivf.cpp
浏览文件 @
ab163356
...
@@ -52,9 +52,9 @@ class IVFTest
...
@@ -52,9 +52,9 @@ class IVFTest
void
SetUp
()
override
{
void
SetUp
()
override
{
std
::
tie
(
index_type
,
preprocess_cfg
,
train_cfg
,
add_cfg
,
search_cfg
)
=
GetParam
();
std
::
tie
(
index_type
,
preprocess_cfg
,
train_cfg
,
add_cfg
,
search_cfg
)
=
GetParam
();
//Init_with_default();
//Init_with_default();
Generate
(
128
,
1000000
/
5
,
10
);
Generate
(
128
,
1000000
/
100
,
10
);
index_
=
IndexFactory
(
index_type
);
index_
=
IndexFactory
(
index_type
);
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
device_id
,
1024
*
1024
*
200
,
1024
*
1024
*
3
00
,
2
);
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
device_id
,
1024
*
1024
*
200
,
1024
*
1024
*
6
00
,
2
);
}
}
void
TearDown
()
override
{
void
TearDown
()
override
{
FaissGpuResourceMgr
::
GetInstance
().
Free
();
FaissGpuResourceMgr
::
GetInstance
().
Free
();
...
@@ -77,21 +77,21 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
...
@@ -77,21 +77,21 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config
::
object
{{
"nlist"
,
100
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"nlist"
,
100
},
{
"metric_type"
,
"L2"
}},
Config
(),
Config
(),
Config
::
object
{{
"k"
,
10
}}),
Config
::
object
{{
"k"
,
10
}}),
//
std::make_tuple("IVFPQ",
std
::
make_tuple
(
"IVFPQ"
,
//
Config(),
Config
(),
//
Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config
::
object
{{
"nlist"
,
100
},
{
"M"
,
8
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
//
Config(),
Config
(),
//
Config::object{{"k", 10}}),
Config
::
object
{{
"k"
,
10
}}),
std
::
make_tuple
(
"GPUIVF"
,
std
::
make_tuple
(
"GPUIVF"
,
Config
(),
Config
(),
Config
::
object
{{
"nlist"
,
1638
},
{
"gpu_id"
,
device_id
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"nlist"
,
100
},
{
"gpu_id"
,
device_id
},
{
"metric_type"
,
"L2"
}},
Config
(),
Config
::
object
{{
"k"
,
10
}}),
std
::
make_tuple
(
"GPUIVFPQ"
,
Config
(),
Config
::
object
{{
"gpu_id"
,
device_id
},
{
"nlist"
,
100
},
{
"M"
,
8
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
(),
Config
(),
Config
::
object
{{
"k"
,
10
}}),
Config
::
object
{{
"k"
,
10
}}),
//std::make_tuple("GPUIVFPQ",
// Config(),
// Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std
::
make_tuple
(
"IVFSQ"
,
std
::
make_tuple
(
"IVFSQ"
,
Config
(),
Config
(),
Config
::
object
{{
"nlist"
,
100
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"nlist"
,
100
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
...
@@ -99,7 +99,7 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
...
@@ -99,7 +99,7 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config
::
object
{{
"k"
,
10
}}),
Config
::
object
{{
"k"
,
10
}}),
std
::
make_tuple
(
"GPUIVFSQ"
,
std
::
make_tuple
(
"GPUIVFSQ"
,
Config
(),
Config
(),
Config
::
object
{{
"gpu_id"
,
device_id
},
{
"nlist"
,
1
638
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
::
object
{{
"gpu_id"
,
device_id
},
{
"nlist"
,
1
00
},
{
"nbits"
,
8
},
{
"metric_type"
,
"L2"
}},
Config
(),
Config
(),
Config
::
object
{{
"k"
,
10
}})
Config
::
object
{{
"k"
,
10
}})
)
)
...
@@ -386,8 +386,8 @@ class GPURESTEST
...
@@ -386,8 +386,8 @@ class GPURESTEST
int64_t
elems
=
0
;
int64_t
elems
=
0
;
};
};
const
int
search_count
=
10
0
;
const
int
search_count
=
10
;
const
int
load_count
=
3
0
;
const
int
load_count
=
3
;
TEST_F
(
GPURESTEST
,
gpu_ivf_resource_test
)
{
TEST_F
(
GPURESTEST
,
gpu_ivf_resource_test
)
{
assert
(
!
xb
.
empty
());
assert
(
!
xb
.
empty
());
...
...
cpp/src/core/test/test_nsg/test_nsg.cpp
浏览文件 @
ab163356
...
@@ -7,6 +7,7 @@
...
@@ -7,6 +7,7 @@
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <memory>
#include <memory>
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "knowhere/index/vector_index/nsg_index.h"
#include "knowhere/index/vector_index/nsg_index.h"
#include "knowhere/index/vector_index/nsg/nsg_io.h"
#include "knowhere/index/vector_index/nsg/nsg_io.h"
...
@@ -18,15 +19,22 @@ using ::testing::TestWithParam;
...
@@ -18,15 +19,22 @@ using ::testing::TestWithParam;
using
::
testing
::
Values
;
using
::
testing
::
Values
;
using
::
testing
::
Combine
;
using
::
testing
::
Combine
;
constexpr
int64_t
DEVICE_ID
=
0
;
class
NSGInterfaceTest
:
public
DataGen
,
public
TestWithParam
<::
std
::
tuple
<
Config
,
Config
>>
{
class
NSGInterfaceTest
:
public
DataGen
,
public
TestWithParam
<::
std
::
tuple
<
Config
,
Config
>>
{
protected:
protected:
void
SetUp
()
override
{
void
SetUp
()
override
{
//Init_with_default();
//Init_with_default();
FaissGpuResourceMgr
::
GetInstance
().
InitDevice
(
DEVICE_ID
,
1024
*
1024
*
200
,
1024
*
1024
*
600
,
2
);
Generate
(
256
,
1000000
,
1
);
Generate
(
256
,
1000000
,
1
);
index_
=
std
::
make_shared
<
NSG
>
();
index_
=
std
::
make_shared
<
NSG
>
();
std
::
tie
(
train_cfg
,
search_cfg
)
=
GetParam
();
std
::
tie
(
train_cfg
,
search_cfg
)
=
GetParam
();
}
}
void
TearDown
()
override
{
FaissGpuResourceMgr
::
GetInstance
().
Free
();
}
protected:
protected:
std
::
shared_ptr
<
NSG
>
index_
;
std
::
shared_ptr
<
NSG
>
index_
;
Config
train_cfg
;
Config
train_cfg
;
...
...
cpp/src/scheduler/SchedInst.cpp
浏览文件 @
ab163356
...
@@ -36,7 +36,8 @@ StartSchedulerService() {
...
@@ -36,7 +36,8 @@ StartSchedulerService() {
auto
type
=
resconf
.
GetValue
(
server
::
CONFIG_RESOURCE_TYPE
);
auto
type
=
resconf
.
GetValue
(
server
::
CONFIG_RESOURCE_TYPE
);
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
auto
device_id
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_DEVICE_ID
);
auto
device_id
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_DEVICE_ID
);
auto
enable_loader
=
resconf
.
GetBoolValue
(
server
::
CONFIG_RESOURCE_ENABLE_LOADER
);
// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
auto
enable_loader
=
true
;
auto
enable_executor
=
resconf
.
GetBoolValue
(
server
::
CONFIG_RESOURCE_ENABLE_EXECUTOR
);
auto
enable_executor
=
resconf
.
GetBoolValue
(
server
::
CONFIG_RESOURCE_ENABLE_EXECUTOR
);
auto
pinned_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_PIN_MEMORY
);
auto
pinned_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_PIN_MEMORY
);
auto
temp_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_TEMP_MEMORY
);
auto
temp_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_TEMP_MEMORY
);
...
...
cpp/src/scheduler/event/Event.h
浏览文件 @
ab163356
...
@@ -32,10 +32,8 @@ public:
...
@@ -32,10 +32,8 @@ public:
return
type_
;
return
type_
;
}
}
inline
virtual
std
::
string
virtual
std
::
string
Dump
()
const
{
Dump
()
const
=
0
;
return
"<Event>"
;
}
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
out
,
const
Event
&
event
);
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
out
,
const
Event
&
event
);
...
...
cpp/src/scheduler/task/SearchTask.cpp
浏览文件 @
ab163356
...
@@ -20,47 +20,47 @@ namespace engine {
...
@@ -20,47 +20,47 @@ namespace engine {
static
constexpr
size_t
PARALLEL_REDUCE_THRESHOLD
=
10000
;
static
constexpr
size_t
PARALLEL_REDUCE_THRESHOLD
=
10000
;
static
constexpr
size_t
PARALLEL_REDUCE_BATCH
=
1000
;
static
constexpr
size_t
PARALLEL_REDUCE_BATCH
=
1000
;
bool
//
bool
NeedParallelReduce
(
uint64_t
nq
,
uint64_t
topk
)
{
//
NeedParallelReduce(uint64_t nq, uint64_t topk) {
server
::
ServerConfig
&
config
=
server
::
ServerConfig
::
GetInstance
();
//
server::ServerConfig &config = server::ServerConfig::GetInstance();
server
::
ConfigNode
&
db_config
=
config
.
GetConfig
(
server
::
CONFIG_DB
);
//
server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
bool
need_parallel
=
db_config
.
GetBoolValue
(
server
::
CONFIG_DB_PARALLEL_REDUCE
,
false
);
//
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
if
(
!
need_parallel
)
{
//
if (!need_parallel) {
return
false
;
//
return false;
}
//
}
//
return
nq
*
topk
>=
PARALLEL_REDUCE_THRESHOLD
;
//
return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
}
//
}
//
void
//
void
ParallelReduce
(
std
::
function
<
void
(
size_t
,
size_t
)
>
&
reduce_function
,
size_t
max_index
)
{
//
ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
size_t
reduce_batch
=
PARALLEL_REDUCE_BATCH
;
//
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
//
auto
thread_count
=
std
::
thread
::
hardware_concurrency
()
-
1
;
//not all core do this work
//
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
if
(
thread_count
>
0
)
{
//
if (thread_count > 0) {
reduce_batch
=
max_index
/
thread_count
+
1
;
//
reduce_batch = max_index / thread_count + 1;
}
//
}
ENGINE_LOG_DEBUG
<<
"use "
<<
thread_count
<<
//
ENGINE_LOG_DEBUG << "use " << thread_count <<
" thread parallelly do reduce, each thread process "
<<
reduce_batch
<<
" vectors"
;
//
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
//
std
::
vector
<
std
::
shared_ptr
<
std
::
thread
>
>
thread_array
;
//
std::vector<std::shared_ptr<std::thread> > thread_array;
size_t
from_index
=
0
;
//
size_t from_index = 0;
while
(
from_index
<
max_index
)
{
//
while (from_index < max_index) {
size_t
to_index
=
from_index
+
reduce_batch
;
//
size_t to_index = from_index + reduce_batch;
if
(
to_index
>
max_index
)
{
//
if (to_index > max_index) {
to_index
=
max_index
;
//
to_index = max_index;
}
//
}
//
auto
reduce_thread
=
std
::
make_shared
<
std
::
thread
>
(
reduce_function
,
from_index
,
to_index
);
//
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
thread_array
.
push_back
(
reduce_thread
);
//
thread_array.push_back(reduce_thread);
//
from_index
=
to_index
;
//
from_index = to_index;
}
//
}
//
for
(
auto
&
thread_ptr
:
thread_array
)
{
//
for (auto &thread_ptr : thread_array) {
thread_ptr
->
join
();
//
thread_ptr->join();
}
//
}
}
//
}
void
void
CollectFileMetrics
(
int
file_type
,
size_t
file_size
)
{
CollectFileMetrics
(
int
file_type
,
size_t
file_size
)
{
...
@@ -96,33 +96,31 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file)
...
@@ -96,33 +96,31 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file)
void
void
XSearchTask
::
Load
(
LoadType
type
,
uint8_t
device_id
)
{
XSearchTask
::
Load
(
LoadType
type
,
uint8_t
device_id
)
{
server
::
TimeRecorder
rc
(
""
);
server
::
TimeRecorder
rc
(
""
);
Status
stat
=
Status
::
OK
();
std
::
string
error_msg
;
try
{
try
{
if
(
type
==
LoadType
::
DISK2CPU
)
{
if
(
type
==
LoadType
::
DISK2CPU
)
{
auto
stat
=
index_engine_
->
Load
();
stat
=
index_engine_
->
Load
();
if
(
!
stat
.
ok
())
{
//typical error: file not available
ENGINE_LOG_ERROR
<<
"Failed to load index file: file not available"
;
for
(
auto
&
context
:
search_contexts_
)
{
context
->
IndexSearchDone
(
file_
->
id_
);
//mark as done avoid dead lock, even failed
}
return
;
}
}
else
if
(
type
==
LoadType
::
CPU2GPU
)
{
}
else
if
(
type
==
LoadType
::
CPU2GPU
)
{
index_engine_
->
CopyToGpu
(
device_id
);
stat
=
index_engine_
->
CopyToGpu
(
device_id
);
}
else
if
(
type
==
LoadType
::
GPU2CPU
)
{
}
else
if
(
type
==
LoadType
::
GPU2CPU
)
{
index_engine_
->
CopyToCpu
();
stat
=
index_engine_
->
CopyToCpu
();
}
else
{
}
else
{
// TODO: exception
error_msg
=
"Wrong load type"
;
std
::
string
msg
=
"Wrong load type"
;
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
ENGINE_LOG_ERROR
<<
msg
;
}
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
//typical error: out of disk space or permition denied
//typical error: out of disk space or permition denied
std
::
string
msg
=
"Failed to load index file: "
+
std
::
string
(
ex
.
what
());
error_msg
=
"Failed to load index file: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
if
(
!
stat
.
ok
())
{
if
(
error_msg
.
empty
())
error_msg
=
std
::
string
(
"Failed to load index file: file not available"
);
//typical error: file not available
ENGINE_LOG_ERROR
<<
error_msg
;
for
(
auto
&
context
:
search_contexts_
)
{
for
(
auto
&
context
:
search_contexts_
)
{
context
->
IndexSearchDone
(
file_
->
id_
);
//mark as done avoid dead lock, even failed
context
->
IndexSearchDone
(
file_
->
id_
);
//mark as done avoid dead lock, even failed
...
@@ -238,11 +236,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
...
@@ -238,11 +236,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
}
}
};
};
if
(
NeedParallelReduce
(
nq
,
topk
))
{
//
if (NeedParallelReduce(nq, topk)) {
ParallelReduce
(
reduce_worker
,
nq
);
//
ParallelReduce(reduce_worker, nq);
}
else
{
//
} else {
reduce_worker
(
0
,
nq
);
reduce_worker
(
0
,
nq
);
}
//
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -343,11 +341,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
...
@@ -343,11 +341,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
}
}
};
};
if
(
NeedParallelReduce
(
result_src
.
size
(),
topk
))
{
//
if (NeedParallelReduce(result_src.size(), topk)) {
ParallelReduce
(
ReduceWorker
,
result_src
.
size
());
//
ParallelReduce(ReduceWorker, result_src.size());
}
else
{
//
} else {
ReduceWorker
(
0
,
result_src
.
size
());
ReduceWorker
(
0
,
result_src
.
size
());
}
//
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
...
cpp/unittest/scheduler/event_test.cpp
0 → 100644
浏览文件 @
ab163356
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
#include "scheduler/resource/Resource.h"
#include "scheduler/event/Event.h"
#include "scheduler/event/StartUpEvent.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
TEST
(
EventTest
,
start_up_event
)
{
ResourceWPtr
res
(
ResourcePtr
(
nullptr
));
auto
event
=
std
::
make_shared
<
StartUpEvent
>
(
res
);
ASSERT_FALSE
(
event
->
Dump
().
empty
());
std
::
cout
<<
*
event
;
std
::
cout
<<
*
EventPtr
(
event
);
}
TEST
(
EventTest
,
load_completed_event
)
{
ResourceWPtr
res
(
ResourcePtr
(
nullptr
));
auto
event
=
std
::
make_shared
<
LoadCompletedEvent
>
(
res
,
nullptr
);
ASSERT_FALSE
(
event
->
Dump
().
empty
());
std
::
cout
<<
*
event
;
std
::
cout
<<
*
EventPtr
(
event
);
}
TEST
(
EventTest
,
finish_task_event
)
{
ResourceWPtr
res
(
ResourcePtr
(
nullptr
));
auto
event
=
std
::
make_shared
<
FinishTaskEvent
>
(
res
,
nullptr
);
ASSERT_FALSE
(
event
->
Dump
().
empty
());
std
::
cout
<<
*
event
;
std
::
cout
<<
*
EventPtr
(
event
);
}
TEST
(
EventTest
,
tasktable_updated_event
)
{
ResourceWPtr
res
(
ResourcePtr
(
nullptr
));
auto
event
=
std
::
make_shared
<
TaskTableUpdatedEvent
>
(
res
);
ASSERT_FALSE
(
event
->
Dump
().
empty
());
std
::
cout
<<
*
event
;
std
::
cout
<<
*
EventPtr
(
event
);
}
}
}
}
cpp/unittest/scheduler/normal_test.cpp
浏览文件 @
ab163356
...
@@ -11,7 +11,7 @@
...
@@ -11,7 +11,7 @@
using
namespace
zilliz
::
milvus
::
engine
;
using
namespace
zilliz
::
milvus
::
engine
;
TEST
(
normal_test
,
DISABLED_
inst_test
)
{
TEST
(
normal_test
,
inst_test
)
{
// ResourceMgr only compose resources, provide unified event
// ResourceMgr only compose resources, provide unified event
auto
res_mgr
=
ResMgrInst
::
GetInstance
();
auto
res_mgr
=
ResMgrInst
::
GetInstance
();
...
...
cpp/unittest/scheduler/resource_test.cpp
浏览文件 @
ab163356
...
@@ -81,6 +81,16 @@ TEST_F(ResourceBaseTest, has_executor) {
...
@@ -81,6 +81,16 @@ TEST_F(ResourceBaseTest, has_executor) {
ASSERT_FALSE
(
both_disable_
->
HasExecutor
());
ASSERT_FALSE
(
both_disable_
->
HasExecutor
());
}
}
TEST_F
(
ResourceBaseTest
,
dump
)
{
ASSERT_FALSE
(
only_loader_
->
Dump
().
empty
());
ASSERT_FALSE
(
only_executor_
->
Dump
().
empty
());
ASSERT_FALSE
(
both_enable_
->
Dump
().
empty
());
ASSERT_FALSE
(
both_disable_
->
Dump
().
empty
());
std
::
stringstream
ss
;
ss
<<
only_loader_
<<
only_executor_
<<
both_enable_
<<
both_disable_
;
ASSERT_FALSE
(
ss
.
str
().
empty
());
}
/************ ResourceAdvanceTest ************/
/************ ResourceAdvanceTest ************/
class
ResourceAdvanceTest
:
public
testing
::
Test
{
class
ResourceAdvanceTest
:
public
testing
::
Test
{
...
@@ -90,9 +100,11 @@ protected:
...
@@ -90,9 +100,11 @@ protected:
disk_resource_
=
ResourceFactory
::
Create
(
"ssd"
,
"DISK"
,
0
);
disk_resource_
=
ResourceFactory
::
Create
(
"ssd"
,
"DISK"
,
0
);
cpu_resource_
=
ResourceFactory
::
Create
(
"cpu"
,
"CPU"
,
0
);
cpu_resource_
=
ResourceFactory
::
Create
(
"cpu"
,
"CPU"
,
0
);
gpu_resource_
=
ResourceFactory
::
Create
(
"gpu"
,
"GPU"
,
0
);
gpu_resource_
=
ResourceFactory
::
Create
(
"gpu"
,
"GPU"
,
0
);
test_resource_
=
std
::
make_shared
<
TestResource
>
(
"test"
,
0
,
true
,
true
);
resources_
.
push_back
(
disk_resource_
);
resources_
.
push_back
(
disk_resource_
);
resources_
.
push_back
(
cpu_resource_
);
resources_
.
push_back
(
cpu_resource_
);
resources_
.
push_back
(
gpu_resource_
);
resources_
.
push_back
(
gpu_resource_
);
resources_
.
push_back
(
test_resource_
);
auto
subscriber
=
[
&
](
EventPtr
event
)
{
auto
subscriber
=
[
&
](
EventPtr
event
)
{
if
(
event
->
Type
()
==
EventType
::
LOAD_COMPLETED
)
{
if
(
event
->
Type
()
==
EventType
::
LOAD_COMPLETED
)
{
...
@@ -115,10 +127,12 @@ protected:
...
@@ -115,10 +127,12 @@ protected:
disk_resource_
->
RegisterSubscriber
(
subscriber
);
disk_resource_
->
RegisterSubscriber
(
subscriber
);
cpu_resource_
->
RegisterSubscriber
(
subscriber
);
cpu_resource_
->
RegisterSubscriber
(
subscriber
);
gpu_resource_
->
RegisterSubscriber
(
subscriber
);
gpu_resource_
->
RegisterSubscriber
(
subscriber
);
test_resource_
->
RegisterSubscriber
(
subscriber
);
disk_resource_
->
Start
();
disk_resource_
->
Start
();
cpu_resource_
->
Start
();
cpu_resource_
->
Start
();
gpu_resource_
->
Start
();
gpu_resource_
->
Start
();
test_resource_
->
Start
();
}
}
void
void
...
@@ -126,6 +140,7 @@ protected:
...
@@ -126,6 +140,7 @@ protected:
disk_resource_
->
Stop
();
disk_resource_
->
Stop
();
cpu_resource_
->
Stop
();
cpu_resource_
->
Stop
();
gpu_resource_
->
Stop
();
gpu_resource_
->
Stop
();
test_resource_
->
Stop
();
}
}
void
void
...
@@ -143,6 +158,7 @@ protected:
...
@@ -143,6 +158,7 @@ protected:
ResourcePtr
disk_resource_
;
ResourcePtr
disk_resource_
;
ResourcePtr
cpu_resource_
;
ResourcePtr
cpu_resource_
;
ResourcePtr
gpu_resource_
;
ResourcePtr
gpu_resource_
;
ResourcePtr
test_resource_
;
std
::
vector
<
ResourcePtr
>
resources_
;
std
::
vector
<
ResourcePtr
>
resources_
;
uint64_t
load_count_
=
0
;
uint64_t
load_count_
=
0
;
uint64_t
exec_count_
=
0
;
uint64_t
exec_count_
=
0
;
...
@@ -226,6 +242,30 @@ TEST_F(ResourceAdvanceTest, gpu_resource_test) {
...
@@ -226,6 +242,30 @@ TEST_F(ResourceAdvanceTest, gpu_resource_test) {
}
}
}
}
TEST_F
(
ResourceAdvanceTest
,
test_resource_test
)
{
const
uint64_t
NUM
=
100
;
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
TableFileSchemaPtr
dummy
=
nullptr
;
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
auto
task
=
std
::
make_shared
<
TestTask
>
(
dummy
);
tasks
.
push_back
(
task
);
test_resource_
->
task_table
().
Put
(
task
);
}
test_resource_
->
WakeupLoader
();
WaitLoader
(
NUM
);
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
ASSERT_EQ
(
tasks
[
i
]
->
load_count_
,
1
);
}
test_resource_
->
WakeupExecutor
();
WaitExecutor
(
NUM
);
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
ASSERT_EQ
(
tasks
[
i
]
->
exec_count_
,
1
);
}
}
}
}
}
}
...
...
cpp/unittest/scheduler/scheduler_test.cpp
浏览文件 @
ab163356
...
@@ -6,6 +6,7 @@
...
@@ -6,6 +6,7 @@
#include "scheduler/Scheduler.h"
#include "scheduler/Scheduler.h"
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <src/scheduler/tasklabel/DefaultLabel.h>
#include <src/scheduler/tasklabel/DefaultLabel.h>
#include <src/server/ServerConfig.h>
#include "cache/DataObj.h"
#include "cache/DataObj.h"
#include "cache/GpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/task/TestTask.h"
...
@@ -15,233 +16,238 @@
...
@@ -15,233 +16,238 @@
#include "wrapper/knowhere/vec_index.h"
#include "wrapper/knowhere/vec_index.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace
zilliz
{
namespace
zilliz
{
namespace
milvus
{
namespace
milvus
{
namespace
engine
{
namespace
engine
{
//class MockVecIndex : public engine::VecIndex {
class
MockVecIndex
:
public
engine
::
VecIndex
{
//public:
public:
// virtual server::KnowhereError BuildAll(const long &nb,
virtual
ErrorCode
BuildAll
(
const
long
&
nb
,
// const float *xb,
const
float
*
xb
,
// const long *ids,
const
long
*
ids
,
// const engine::Config &cfg,
const
engine
::
Config
&
cfg
,
// const long &nt = 0,
const
long
&
nt
=
0
,
// const float *xt = nullptr) {
const
float
*
xt
=
nullptr
)
{
//
// }
}
//
// engine::VecIndexPtr Clone() override {
engine
::
VecIndexPtr
Clone
()
override
{
// return zilliz::milvus::engine::VecIndexPtr();
return
zilliz
::
milvus
::
engine
::
VecIndexPtr
();
// }
}
//
// int64_t GetDeviceId() override {
int64_t
GetDeviceId
()
override
{
// return 0;
return
0
;
// }
}
//
// engine::IndexType GetType() override {
engine
::
IndexType
GetType
()
override
{
// return engine::IndexType::INVALID;
return
engine
::
IndexType
::
INVALID
;
// }
}
//
// virtual server::KnowhereError Add(const long &nb,
virtual
ErrorCode
Add
(
const
long
&
nb
,
// const float *xb,
const
float
*
xb
,
// const long *ids,
const
long
*
ids
,
// const engine::Config &cfg = engine::Config()) {
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
//
// }
}
//
// virtual server::KnowhereError Search(const long &nq,
virtual
ErrorCode
Search
(
const
long
&
nq
,
// const float *xq,
const
float
*
xq
,
// float *dist,
float
*
dist
,
// long *ids,
long
*
ids
,
// const engine::Config &cfg = engine::Config()) {
const
engine
::
Config
&
cfg
=
engine
::
Config
())
{
//
// }
}
//
// engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
engine
::
VecIndexPtr
CopyToGpu
(
const
int64_t
&
device_id
,
const
engine
::
Config
&
cfg
)
override
{
//
// }
}
//
// engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
engine
::
VecIndexPtr
CopyToCpu
(
const
engine
::
Config
&
cfg
)
override
{
//
// }
}
//
// virtual int64_t Dimension() {
virtual
int64_t
Dimension
()
{
// return dimension_;
return
dimension_
;
// }
}
//
// virtual int64_t Count() {
virtual
int64_t
Count
()
{
// return ntotal_;
return
ntotal_
;
// }
}
//
// virtual zilliz::knowhere::BinarySet Serialize() {
virtual
zilliz
::
knowhere
::
BinarySet
Serialize
()
{
// zilliz::knowhere::BinarySet binset;
zilliz
::
knowhere
::
BinarySet
binset
;
// return binset;
return
binset
;
// }
}
//
// virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
virtual
ErrorCode
Load
(
const
zilliz
::
knowhere
::
BinarySet
&
index_binary
)
{
//
// }
}
//
//public:
public:
// int64_t dimension_ = 512;
int64_t
dimension_
=
512
;
// int64_t ntotal_ = 0;
int64_t
ntotal_
=
0
;
//};
};
//
//
//class SchedulerTest : public testing::Test {
class
SchedulerTest
:
public
testing
::
Test
{
//protected:
protected:
// void
void
// SetUp() override {
SetUp
()
override
{
// ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
server
::
ConfigNode
&
config
=
server
::
ServerConfig
::
GetInstance
().
GetConfig
(
server
::
CONFIG_CACHE
);
// ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
config
.
AddSequenceItem
(
server
::
CONFIG_GPU_IDS
,
"0"
);
// ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
config
.
AddSequenceItem
(
server
::
CONFIG_GPU_IDS
,
"1"
);
//
// res_mgr_ = std::make_shared<ResourceMgr>();
ResourcePtr
cpu
=
ResourceFactory
::
Create
(
"cpu"
,
"CPU"
,
0
,
true
,
false
);
// cpu_resource_ = res_mgr_->Add(std::move(cpu));
ResourcePtr
gpu_0
=
ResourceFactory
::
Create
(
"gpu0"
,
"GPU"
,
0
);
// gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
ResourcePtr
gpu_1
=
ResourceFactory
::
Create
(
"gpu1"
,
"GPU"
,
1
);
// gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
//
res_mgr_
=
std
::
make_shared
<
ResourceMgr
>
();
// auto PCIE = Connection("IO", 11000.0);
cpu_resource_
=
res_mgr_
->
Add
(
std
::
move
(
cpu
));
// res_mgr_->Connect("cpu", "gpu0", PCIE);
gpu_resource_0_
=
res_mgr_
->
Add
(
std
::
move
(
gpu_0
));
// res_mgr_->Connect("cpu", "gpu1", PCIE);
gpu_resource_1_
=
res_mgr_
->
Add
(
std
::
move
(
gpu_1
));
//
// scheduler_ = std::make_shared<Scheduler>(res_mgr_);
auto
PCIE
=
Connection
(
"IO"
,
11000.0
);
//
res_mgr_
->
Connect
(
"cpu"
,
"gpu0"
,
PCIE
);
// res_mgr_->Start();
res_mgr_
->
Connect
(
"cpu"
,
"gpu1"
,
PCIE
);
// scheduler_->Start();
// }
scheduler_
=
std
::
make_shared
<
Scheduler
>
(
res_mgr_
);
//
// void
res_mgr_
->
Start
();
// TearDown() override {
scheduler_
->
Start
();
// scheduler_->Stop();
}
// res_mgr_->Stop();
// }
void
//
TearDown
()
override
{
// ResourceWPtr cpu_resource_;
scheduler_
->
Stop
();
// ResourceWPtr gpu_resource_0_;
res_mgr_
->
Stop
();
// ResourceWPtr gpu_resource_1_;
}
//
// ResourceMgrPtr res_mgr_;
ResourceWPtr
cpu_resource_
;
// std::shared_ptr<Scheduler> scheduler_;
ResourceWPtr
gpu_resource_0_
;
//};
ResourceWPtr
gpu_resource_1_
;
//
//void
ResourceMgrPtr
res_mgr_
;
//insert_dummy_index_into_gpu_cache(uint64_t device_id) {
std
::
shared_ptr
<
Scheduler
>
scheduler_
;
// MockVecIndex* mock_index = new MockVecIndex();
};
// mock_index->ntotal_ = 1000;
// engine::VecIndexPtr index(mock_index);
void
//
insert_dummy_index_into_gpu_cache
(
uint64_t
device_id
)
{
// cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
MockVecIndex
*
mock_index
=
new
MockVecIndex
();
//
mock_index
->
ntotal_
=
1000
;
// cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
engine
::
VecIndexPtr
index
(
mock_index
);
//}
//
cache
::
DataObjPtr
obj
=
std
::
make_shared
<
cache
::
DataObj
>
(
index
);
//TEST_F(SchedulerTest, OnCopyCompleted) {
// const uint64_t NUM = 10;
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
InsertItem
(
"location"
,
obj
);
// std::vector<std::shared_ptr<TestTask>> tasks;
}
// TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
// dummy->location_ = "location";
TEST_F
(
SchedulerTest
,
OnLoadCompleted
)
{
//
const
uint64_t
NUM
=
10
;
// insert_dummy_index_into_gpu_cache(1);
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
//
TableFileSchemaPtr
dummy
=
std
::
make_shared
<
meta
::
TableFileSchema
>
();
// for (uint64_t i = 0; i < NUM; ++i) {
dummy
->
location_
=
"location"
;
// auto task = std::make_shared<TestTask>(dummy);
// task->label() = std::make_shared<DefaultLabel>();
insert_dummy_index_into_gpu_cache
(
1
);
// tasks.push_back(task);
// cpu_resource_.lock()->task_table().Put(task);
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
// }
auto
task
=
std
::
make_shared
<
TestTask
>
(
dummy
);
//
task
->
label
()
=
std
::
make_shared
<
DefaultLabel
>
();
// sleep(3);
tasks
.
push_back
(
task
);
cpu_resource_
.
lock
()
->
task_table
().
Put
(
task
);
}
sleep
(
3
);
ASSERT_EQ
(
res_mgr_
->
GetResource
(
ResourceType
::
GPU
,
1
)
->
task_table
().
Size
(),
NUM
);
}
TEST_F
(
SchedulerTest
,
PushTaskToNeighbourRandomlyTest
)
{
const
uint64_t
NUM
=
10
;
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
TableFileSchemaPtr
dummy1
=
std
::
make_shared
<
meta
::
TableFileSchema
>
();
dummy1
->
location_
=
"location"
;
tasks
.
clear
();
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
auto
task
=
std
::
make_shared
<
TestTask
>
(
dummy1
);
task
->
label
()
=
std
::
make_shared
<
DefaultLabel
>
();
tasks
.
push_back
(
task
);
cpu_resource_
.
lock
()
->
task_table
().
Put
(
task
);
}
sleep
(
3
);
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
//
}
//}
//
class
SchedulerTest2
:
public
testing
::
Test
{
//TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
protected:
// const uint64_t NUM = 10;
void
// std::vector<std::shared_ptr<TestTask>> tasks;
SetUp
()
override
{
// TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
ResourcePtr
disk
=
ResourceFactory
::
Create
(
"disk"
,
"DISK"
,
0
,
true
,
false
);
// dummy1->location_ = "location";
ResourcePtr
cpu0
=
ResourceFactory
::
Create
(
"cpu0"
,
"CPU"
,
0
,
true
,
false
);
//
ResourcePtr
cpu1
=
ResourceFactory
::
Create
(
"cpu1"
,
"CPU"
,
1
,
true
,
false
);
// tasks.clear();
ResourcePtr
cpu2
=
ResourceFactory
::
Create
(
"cpu2"
,
"CPU"
,
2
,
true
,
false
);
//
ResourcePtr
gpu0
=
ResourceFactory
::
Create
(
"gpu0"
,
"GPU"
,
0
,
true
,
true
);
// for (uint64_t i = 0; i < NUM; ++i) {
ResourcePtr
gpu1
=
ResourceFactory
::
Create
(
"gpu1"
,
"GPU"
,
1
,
true
,
true
);
// auto task = std::make_shared<TestTask>(dummy1);
// task->label() = std::make_shared<DefaultLabel>();
res_mgr_
=
std
::
make_shared
<
ResourceMgr
>
();
// tasks.push_back(task);
disk_
=
res_mgr_
->
Add
(
std
::
move
(
disk
));
// cpu_resource_.lock()->task_table().Put(task);
cpu_0_
=
res_mgr_
->
Add
(
std
::
move
(
cpu0
));
// }
cpu_1_
=
res_mgr_
->
Add
(
std
::
move
(
cpu1
));
//
cpu_2_
=
res_mgr_
->
Add
(
std
::
move
(
cpu2
));
// sleep(3);
gpu_0_
=
res_mgr_
->
Add
(
std
::
move
(
gpu0
));
//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
gpu_1_
=
res_mgr_
->
Add
(
std
::
move
(
gpu1
));
//}
auto
IO
=
Connection
(
"IO"
,
5.0
);
//
auto
PCIE1
=
Connection
(
"PCIE"
,
11.0
);
//class SchedulerTest2 : public testing::Test {
auto
PCIE2
=
Connection
(
"PCIE"
,
20.0
);
// protected:
res_mgr_
->
Connect
(
"disk"
,
"cpu0"
,
IO
);
// void
res_mgr_
->
Connect
(
"cpu0"
,
"cpu1"
,
IO
);
// SetUp() override {
res_mgr_
->
Connect
(
"cpu1"
,
"cpu2"
,
IO
);
// ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
res_mgr_
->
Connect
(
"cpu0"
,
"cpu2"
,
IO
);
// ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
res_mgr_
->
Connect
(
"cpu1"
,
"gpu0"
,
PCIE1
);
// ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
res_mgr_
->
Connect
(
"cpu2"
,
"gpu1"
,
PCIE2
);
// ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
// ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
scheduler_
=
std
::
make_shared
<
Scheduler
>
(
res_mgr_
);
// ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
//
res_mgr_
->
Start
();
// res_mgr_ = std::make_shared<ResourceMgr>();
scheduler_
->
Start
();
// disk_ = res_mgr_->Add(std::move(disk));
}
// cpu_0_ = res_mgr_->Add(std::move(cpu0));
// cpu_1_ = res_mgr_->Add(std::move(cpu1));
void
// cpu_2_ = res_mgr_->Add(std::move(cpu2));
TearDown
()
override
{
// gpu_0_ = res_mgr_->Add(std::move(gpu0));
scheduler_
->
Stop
();
// gpu_1_ = res_mgr_->Add(std::move(gpu1));
res_mgr_
->
Stop
();
// auto IO = Connection("IO", 5.0);
}
// auto PCIE1 = Connection("PCIE", 11.0);
// auto PCIE2 = Connection("PCIE", 20.0);
ResourceWPtr
disk_
;
// res_mgr_->Connect("disk", "cpu0", IO);
ResourceWPtr
cpu_0_
;
// res_mgr_->Connect("cpu0", "cpu1", IO);
ResourceWPtr
cpu_1_
;
// res_mgr_->Connect("cpu1", "cpu2", IO);
ResourceWPtr
cpu_2_
;
// res_mgr_->Connect("cpu0", "cpu2", IO);
ResourceWPtr
gpu_0_
;
// res_mgr_->Connect("cpu1", "gpu0", PCIE1);
ResourceWPtr
gpu_1_
;
// res_mgr_->Connect("cpu2", "gpu1", PCIE2);
ResourceMgrPtr
res_mgr_
;
//
// scheduler_ = std::make_shared<Scheduler>(res_mgr_);
std
::
shared_ptr
<
Scheduler
>
scheduler_
;
//
};
// res_mgr_->Start();
// scheduler_->Start();
// }
TEST_F
(
SchedulerTest2
,
SpecifiedResourceTest
)
{
//
const
uint64_t
NUM
=
10
;
// void
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
// TearDown() override {
TableFileSchemaPtr
dummy
=
std
::
make_shared
<
meta
::
TableFileSchema
>
();
// scheduler_->Stop();
dummy
->
location_
=
"location"
;
// res_mgr_->Stop();
// }
for
(
uint64_t
i
=
0
;
i
<
NUM
;
++
i
)
{
//
std
::
shared_ptr
<
TestTask
>
task
=
std
::
make_shared
<
TestTask
>
(
dummy
);
// ResourceWPtr disk_;
task
->
label
()
=
std
::
make_shared
<
SpecResLabel
>
(
disk_
);
// ResourceWPtr cpu_0_;
tasks
.
push_back
(
task
);
// ResourceWPtr cpu_1_;
disk_
.
lock
()
->
task_table
().
Put
(
task
);
// ResourceWPtr cpu_2_;
}
// ResourceWPtr gpu_0_;
// ResourceWPtr gpu_1_;
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
// ResourceMgrPtr res_mgr_;
}
//
// std::shared_ptr<Scheduler> scheduler_;
//};
//
//
//TEST_F(SchedulerTest2, SpecifiedResourceTest) {
// const uint64_t NUM = 10;
// std::vector<std::shared_ptr<TestTask>> tasks;
// TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
// dummy->location_ = "location";
//
// for (uint64_t i = 0; i < NUM; ++i) {
// std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
// task->label() = std::make_shared<SpecResLabel>(disk_);
// tasks.push_back(task);
// disk_.lock()->task_table().Put(task);
// }
//
//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
//}
}
}
}
}
...
...
cpp/unittest/scheduler/task_test.cpp
0 → 100644
浏览文件 @
ab163356
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/task/SearchTask.h"
#include <gtest/gtest.h>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
TEST
(
TaskTest
,
invalid_index
)
{
auto
search_task
=
std
::
make_shared
<
XSearchTask
>
(
nullptr
);
search_task
->
Load
(
LoadType
::
TEST
,
10
);
}
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录