Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
3eb79168
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3eb79168
编写于
10月 10, 2019
作者:
S
starlord
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'source/branch-0.5.0' into branch-0.5.0
Former-commit-id: 17efb1fd89f4391d198817dd668843d7377ff61d
上级
e4889de8
844d8024
变更
15
显示空白变更内容
内联
并排
Showing
15 changed file
with
601 addition
and
98 deletion
+601
-98
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+78
-61
cpp/src/db/engine/ExecutionEngine.h
cpp/src/db/engine/ExecutionEngine.h
+3
-0
cpp/src/db/engine/ExecutionEngineImpl.cpp
cpp/src/db/engine/ExecutionEngineImpl.cpp
+11
-0
cpp/src/db/engine/ExecutionEngineImpl.h
cpp/src/db/engine/ExecutionEngineImpl.h
+3
-0
cpp/src/scheduler/TaskCreator.cpp
cpp/src/scheduler/TaskCreator.cpp
+24
-3
cpp/src/scheduler/TaskCreator.h
cpp/src/scheduler/TaskCreator.h
+4
-0
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
+51
-26
cpp/src/scheduler/job/BuildIndexJob.cpp
cpp/src/scheduler/job/BuildIndexJob.cpp
+56
-0
cpp/src/scheduler/job/BuildIndexJob.h
cpp/src/scheduler/job/BuildIndexJob.h
+91
-0
cpp/src/scheduler/task/BuildIndexTask.cpp
cpp/src/scheduler/task/BuildIndexTask.cpp
+222
-0
cpp/src/scheduler/task/BuildIndexTask.h
cpp/src/scheduler/task/BuildIndexTask.h
+47
-0
cpp/src/scheduler/task/Task.h
cpp/src/scheduler/task/Task.h
+1
-0
cpp/src/scheduler/tasklabel/SpecResLabel.h
cpp/src/scheduler/tasklabel/SpecResLabel.h
+4
-3
cpp/unittest/metrics/utils.cpp
cpp/unittest/metrics/utils.cpp
+1
-0
cpp/unittest/server/test_rpc.cpp
cpp/unittest/server/test_rpc.cpp
+5
-5
未找到文件。
cpp/src/db/DBImpl.cpp
浏览文件 @
3eb79168
...
@@ -28,6 +28,7 @@
...
@@ -28,6 +28,7 @@
#include "scheduler/SchedInst.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/TimeRecorder.h"
...
@@ -50,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
...
@@ -50,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
}
// namespace
}
// namespace
DBImpl
::
DBImpl
(
const
DBOptions
&
options
)
DBImpl
::
DBImpl
(
const
DBOptions
&
options
)
:
options_
(
options
),
shutting_down_
(
true
),
compact_thread_pool_
(
1
,
1
),
index_thread_pool_
(
1
,
1
)
{
:
options_
(
options
),
shutting_down_
(
true
),
compact_thread_pool_
(
1
,
1
),
index_thread_pool_
(
1
,
1
)
{
meta_ptr_
=
MetaFactory
::
Build
(
options
.
meta_
,
options
.
mode_
);
meta_ptr_
=
MetaFactory
::
Build
(
options
.
meta_
,
options
.
mode_
);
mem_mgr_
=
MemManagerFactory
::
Build
(
meta_ptr_
,
options_
);
mem_mgr_
=
MemManagerFactory
::
Build
(
meta_ptr_
,
options_
);
...
@@ -110,7 +111,7 @@ DBImpl::DropAll() {
...
@@ -110,7 +111,7 @@ DBImpl::DropAll() {
}
}
Status
Status
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -121,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) {
...
@@ -121,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) {
}
}
Status
Status
DBImpl
::
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
{
DBImpl
::
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -146,7 +147,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
...
@@ -146,7 +147,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
}
}
Status
Status
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -157,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) {
...
@@ -157,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) {
}
}
Status
Status
DBImpl
::
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
{
DBImpl
::
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -166,7 +167,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
...
@@ -166,7 +167,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
}
}
Status
Status
DBImpl
::
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
{
DBImpl
::
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -175,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
...
@@ -175,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
}
}
Status
Status
DBImpl
::
PreloadTable
(
const
std
::
string
&
table_id
)
{
DBImpl
::
PreloadTable
(
const
std
::
string
&
table_id
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -194,11 +195,11 @@ DBImpl::PreloadTable(const std::string& table_id) {
...
@@ -194,11 +195,11 @@ DBImpl::PreloadTable(const std::string& table_id) {
int64_t
cache_usage
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
int64_t
cache_usage
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
int64_t
available_size
=
cache_total
-
cache_usage
;
int64_t
available_size
=
cache_total
-
cache_usage
;
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
ExecutionEnginePtr
engine
=
ExecutionEnginePtr
engine
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
if
(
engine
==
nullptr
)
{
if
(
engine
==
nullptr
)
{
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
...
@@ -211,7 +212,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
...
@@ -211,7 +212,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
try
{
try
{
// step 1: load index
// step 1: load index
engine
->
Load
(
true
);
engine
->
Load
(
true
);
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"Pre-load table encounter exception: "
+
std
::
string
(
ex
.
what
());
std
::
string
msg
=
"Pre-load table encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
ENGINE_LOG_ERROR
<<
msg
;
return
Status
(
DB_ERROR
,
msg
);
return
Status
(
DB_ERROR
,
msg
);
...
@@ -223,7 +224,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
...
@@ -223,7 +224,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
}
}
Status
Status
DBImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
DBImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -232,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
...
@@ -232,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
}
}
Status
Status
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -260,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
...
@@ -260,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
}
}
Status
Status
DBImpl
::
CreateIndex
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
{
DBImpl
::
CreateIndex
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
{
{
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
build_index_mutex_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
build_index_mutex_
);
...
@@ -315,7 +316,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
...
@@ -315,7 +316,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
while
(
!
file_ids
.
empty
())
{
while
(
!
file_ids
.
empty
())
{
ENGINE_LOG_DEBUG
<<
"Non index files detected! Will build index "
<<
times
;
ENGINE_LOG_DEBUG
<<
"Non index files detected! Will build index "
<<
times
;
if
(
index
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
if
(
index
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
status
=
meta_ptr_
->
UpdateTableFilesToIndex
(
table_id
);
status
=
meta_ptr_
->
UpdateTableFilesToIndex
(
table_id
);
}
}
...
@@ -328,19 +329,19 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
...
@@ -328,19 +329,19 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
}
}
Status
Status
DBImpl
::
DescribeIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
{
DBImpl
::
DescribeIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
{
return
meta_ptr_
->
DescribeTableIndex
(
table_id
,
index
);
return
meta_ptr_
->
DescribeTableIndex
(
table_id
,
index
);
}
}
Status
Status
DBImpl
::
DropIndex
(
const
std
::
string
&
table_id
)
{
DBImpl
::
DropIndex
(
const
std
::
string
&
table_id
)
{
ENGINE_LOG_DEBUG
<<
"Drop index for table: "
<<
table_id
;
ENGINE_LOG_DEBUG
<<
"Drop index for table: "
<<
table_id
;
return
meta_ptr_
->
DropTableIndex
(
table_id
);
return
meta_ptr_
->
DropTableIndex
(
table_id
);
}
}
Status
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
QueryResults
&
results
)
{
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -352,8 +353,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
...
@@ -352,8 +353,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
}
Status
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -369,8 +370,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
...
@@ -369,8 +370,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
}
meta
::
TableFilesSchema
file_id_array
;
meta
::
TableFilesSchema
file_id_array
;
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
file_id_array
.
push_back
(
file
);
file_id_array
.
push_back
(
file
);
}
}
}
}
...
@@ -382,8 +383,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
...
@@ -382,8 +383,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
}
Status
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
std
::
string
>&
file_ids
,
uint64_t
k
,
uint64_t
nq
,
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
std
::
string
>
&
file_ids
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -392,7 +393,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
...
@@ -392,7 +393,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
// get specified files
// get specified files
std
::
vector
<
size_t
>
ids
;
std
::
vector
<
size_t
>
ids
;
for
(
auto
&
id
:
file_ids
)
{
for
(
auto
&
id
:
file_ids
)
{
meta
::
TableFileSchema
table_file
;
meta
::
TableFileSchema
table_file
;
table_file
.
table_id_
=
table_id
;
table_file
.
table_id_
=
table_id
;
std
::
string
::
size_type
sz
;
std
::
string
::
size_type
sz
;
...
@@ -406,8 +407,8 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
...
@@ -406,8 +407,8 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
}
}
meta
::
TableFilesSchema
file_id_array
;
meta
::
TableFilesSchema
file_id_array
;
for
(
auto
&
day_files
:
files_array
)
{
for
(
auto
&
day_files
:
files_array
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
file_id_array
.
push_back
(
file
);
file_id_array
.
push_back
(
file
);
}
}
}
}
...
@@ -423,7 +424,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
...
@@ -423,7 +424,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
}
}
Status
Status
DBImpl
::
Size
(
uint64_t
&
result
)
{
DBImpl
::
Size
(
uint64_t
&
result
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
}
...
@@ -435,8 +436,8 @@ DBImpl::Size(uint64_t& result) {
...
@@ -435,8 +436,8 @@ DBImpl::Size(uint64_t& result) {
// internal methods
// internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
Status
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
server
::
CollectQueryMetrics
metrics
(
nq
);
server
::
CollectQueryMetrics
metrics
(
nq
);
TimeRecorder
rc
(
""
);
TimeRecorder
rc
(
""
);
...
@@ -445,7 +446,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
...
@@ -445,7 +446,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
ENGINE_LOG_DEBUG
<<
"Engine query begin, index file count: "
<<
files
.
size
()
ENGINE_LOG_DEBUG
<<
"Engine query begin, index file count: "
<<
files
.
size
()
<<
" date range count: "
<<
dates
.
size
();
<<
" date range count: "
<<
dates
.
size
();
scheduler
::
SearchJobPtr
job
=
std
::
make_shared
<
scheduler
::
SearchJob
>
(
0
,
k
,
nq
,
nprobe
,
vectors
);
scheduler
::
SearchJobPtr
job
=
std
::
make_shared
<
scheduler
::
SearchJob
>
(
0
,
k
,
nq
,
nprobe
,
vectors
);
for
(
auto
&
file
:
files
)
{
for
(
auto
&
file
:
files
)
{
scheduler
::
TableFileSchemaPtr
file_ptr
=
std
::
make_shared
<
meta
::
TableFileSchema
>
(
file
);
scheduler
::
TableFileSchemaPtr
file_ptr
=
std
::
make_shared
<
meta
::
TableFileSchema
>
(
file
);
job
->
AddIndexFile
(
file_ptr
);
job
->
AddIndexFile
(
file_ptr
);
}
}
...
@@ -513,7 +514,7 @@ DBImpl::BackgroundTimerTask() {
...
@@ -513,7 +514,7 @@ DBImpl::BackgroundTimerTask() {
void
void
DBImpl
::
WaitMergeFileFinish
()
{
DBImpl
::
WaitMergeFileFinish
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
compact_result_mutex_
);
std
::
lock_guard
<
std
::
mutex
>
lck
(
compact_result_mutex_
);
for
(
auto
&
iter
:
compact_thread_results_
)
{
for
(
auto
&
iter
:
compact_thread_results_
)
{
iter
.
wait
();
iter
.
wait
();
}
}
}
}
...
@@ -521,7 +522,7 @@ DBImpl::WaitMergeFileFinish() {
...
@@ -521,7 +522,7 @@ DBImpl::WaitMergeFileFinish() {
void
void
DBImpl
::
WaitBuildIndexFinish
()
{
DBImpl
::
WaitBuildIndexFinish
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
index_result_mutex_
);
std
::
lock_guard
<
std
::
mutex
>
lck
(
index_result_mutex_
);
for
(
auto
&
iter
:
index_thread_results_
)
{
for
(
auto
&
iter
:
index_thread_results_
)
{
iter
.
wait
();
iter
.
wait
();
}
}
}
}
...
@@ -562,7 +563,7 @@ DBImpl::MemSerialize() {
...
@@ -562,7 +563,7 @@ DBImpl::MemSerialize() {
std
::
lock_guard
<
std
::
mutex
>
lck
(
mem_serialize_mutex_
);
std
::
lock_guard
<
std
::
mutex
>
lck
(
mem_serialize_mutex_
);
std
::
set
<
std
::
string
>
temp_table_ids
;
std
::
set
<
std
::
string
>
temp_table_ids
;
mem_mgr_
->
Serialize
(
temp_table_ids
);
mem_mgr_
->
Serialize
(
temp_table_ids
);
for
(
auto
&
id
:
temp_table_ids
)
{
for
(
auto
&
id
:
temp_table_ids
)
{
compact_table_ids_
.
insert
(
id
);
compact_table_ids_
.
insert
(
id
);
}
}
...
@@ -607,7 +608,7 @@ DBImpl::StartCompactionTask() {
...
@@ -607,7 +608,7 @@ DBImpl::StartCompactionTask() {
}
}
Status
Status
DBImpl
::
MergeFiles
(
const
std
::
string
&
table_id
,
const
meta
::
DateT
&
date
,
const
meta
::
TableFilesSchema
&
files
)
{
DBImpl
::
MergeFiles
(
const
std
::
string
&
table_id
,
const
meta
::
DateT
&
date
,
const
meta
::
TableFilesSchema
&
files
)
{
ENGINE_LOG_DEBUG
<<
"Merge files for table: "
<<
table_id
;
ENGINE_LOG_DEBUG
<<
"Merge files for table: "
<<
table_id
;
// step 1: create table file
// step 1: create table file
...
@@ -624,13 +625,13 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
...
@@ -624,13 +625,13 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 2: merge files
// step 2: merge files
ExecutionEnginePtr
index
=
ExecutionEnginePtr
index
=
EngineFactory
::
Build
(
table_file
.
dimension_
,
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
,
EngineFactory
::
Build
(
table_file
.
dimension_
,
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
,
(
MetricType
)
table_file
.
metric_type_
,
table_file
.
nlist_
);
(
MetricType
)
table_file
.
metric_type_
,
table_file
.
nlist_
);
meta
::
TableFilesSchema
updated
;
meta
::
TableFilesSchema
updated
;
int64_t
index_size
=
0
;
int64_t
index_size
=
0
;
for
(
auto
&
file
:
files
)
{
for
(
auto
&
file
:
files
)
{
server
::
CollectMergeFilesMetrics
metrics
;
server
::
CollectMergeFilesMetrics
metrics
;
index
->
Merge
(
file
.
location_
);
index
->
Merge
(
file
.
location_
);
...
@@ -648,7 +649,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
...
@@ -648,7 +649,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 3: serialize to disk
// step 3: serialize to disk
try
{
try
{
index
->
Serialize
();
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize merged index encounter exception: "
+
std
::
string
(
ex
.
what
());
std
::
string
msg
=
"Serialize merged index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
ENGINE_LOG_ERROR
<<
msg
;
...
@@ -666,7 +667,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
...
@@ -666,7 +667,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 4: update table files state
// step 4: update table files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
// else set file type to RAW, no need to build index
// else set file type to RAW, no need to build index
if
(
table_file
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
if
(
table_file
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
table_file
.
file_type_
=
(
index
->
PhysicalSize
()
>=
table_file
.
index_file_size_
)
?
meta
::
TableFileSchema
::
TO_INDEX
table_file
.
file_type_
=
(
index
->
PhysicalSize
()
>=
table_file
.
index_file_size_
)
?
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
:
meta
::
TableFileSchema
::
RAW
;
}
else
{
}
else
{
...
@@ -686,7 +687,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
...
@@ -686,7 +687,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
}
}
Status
Status
DBImpl
::
BackgroundMergeFiles
(
const
std
::
string
&
table_id
)
{
DBImpl
::
BackgroundMergeFiles
(
const
std
::
string
&
table_id
)
{
meta
::
DatePartionedTableFilesSchema
raw_files
;
meta
::
DatePartionedTableFilesSchema
raw_files
;
auto
status
=
meta_ptr_
->
FilesToMerge
(
table_id
,
raw_files
);
auto
status
=
meta_ptr_
->
FilesToMerge
(
table_id
,
raw_files
);
if
(
!
status
.
ok
())
{
if
(
!
status
.
ok
())
{
...
@@ -695,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
...
@@ -695,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
}
bool
has_merge
=
false
;
bool
has_merge
=
false
;
for
(
auto
&
kv
:
raw_files
)
{
for
(
auto
&
kv
:
raw_files
)
{
auto
files
=
kv
.
second
;
auto
files
=
kv
.
second
;
if
(
files
.
size
()
<
options_
.
merge_trigger_number_
)
{
if
(
files
.
size
()
<
options_
.
merge_trigger_number_
)
{
ENGINE_LOG_DEBUG
<<
"Files number not greater equal than merge trigger number, skip merge action"
;
ENGINE_LOG_DEBUG
<<
"Files number not greater equal than merge trigger number, skip merge action"
;
...
@@ -718,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
...
@@ -718,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE
<<
" Background compaction thread start"
;
ENGINE_LOG_TRACE
<<
" Background compaction thread start"
;
Status
status
;
Status
status
;
for
(
auto
&
table_id
:
table_ids
)
{
for
(
auto
&
table_id
:
table_ids
)
{
status
=
BackgroundMergeFiles
(
table_id
);
status
=
BackgroundMergeFiles
(
table_id
);
if
(
!
status
.
ok
())
{
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Merge files for table "
<<
table_id
<<
" failed: "
<<
status
.
ToString
();
ENGINE_LOG_ERROR
<<
"Merge files for table "
<<
table_id
<<
" failed: "
<<
status
.
ToString
();
...
@@ -770,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) {
...
@@ -770,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) {
}
}
Status
Status
DBImpl
::
BuildIndex
(
const
meta
::
TableFileSchema
&
file
)
{
DBImpl
::
BuildIndex
(
const
meta
::
TableFileSchema
&
file
)
{
ExecutionEnginePtr
to_index
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
ExecutionEnginePtr
to_index
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
if
(
to_index
==
nullptr
)
{
if
(
to_index
==
nullptr
)
{
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
...
@@ -803,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
...
@@ -803,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
try
{
try
{
server
::
CollectBuildIndexMetrics
metrics
;
server
::
CollectBuildIndexMetrics
metrics
;
index
=
to_index
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
index
=
to_index
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
if
(
index
==
nullptr
)
{
if
(
index
==
nullptr
)
{
table_file
.
file_type_
=
meta
::
TableFileSchema
::
TO_DELETE
;
table_file
.
file_type_
=
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
...
@@ -812,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
...
@@ -812,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
return
status
;
return
status
;
}
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of gpu memory
// typical error: out of gpu memory
std
::
string
msg
=
"BuildIndex encounter exception: "
+
std
::
string
(
ex
.
what
());
std
::
string
msg
=
"BuildIndex encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
ENGINE_LOG_ERROR
<<
msg
;
...
@@ -838,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
...
@@ -838,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
// step 5: save index file
// step 5: save index file
try
{
try
{
index
->
Serialize
();
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize index encounter exception: "
+
std
::
string
(
ex
.
what
());
std
::
string
msg
=
"Serialize index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
ENGINE_LOG_ERROR
<<
msg
;
...
@@ -881,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
...
@@ -881,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
}
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"Build index encounter exception: "
+
std
::
string
(
ex
.
what
());
std
::
string
msg
=
"Build index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
ENGINE_LOG_ERROR
<<
msg
;
return
Status
(
DB_ERROR
,
msg
);
return
Status
(
DB_ERROR
,
msg
);
...
@@ -898,18 +899,34 @@ DBImpl::BackgroundBuildIndex() {
...
@@ -898,18 +899,34 @@ DBImpl::BackgroundBuildIndex() {
meta
::
TableFilesSchema
to_index_files
;
meta
::
TableFilesSchema
to_index_files
;
meta_ptr_
->
FilesToIndex
(
to_index_files
);
meta_ptr_
->
FilesToIndex
(
to_index_files
);
Status
status
;
Status
status
;
for
(
auto
&
file
:
to_index_files
)
{
status
=
BuildIndex
(
file
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Building index for "
<<
file
.
id_
<<
" failed: "
<<
status
.
ToString
();
}
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
scheduler
::
BuildIndexJobPtr
ENGINE_LOG_DEBUG
<<
"Server will shutdown, skip build index action"
;
job
=
std
::
make_shared
<
scheduler
::
BuildIndexJob
>
(
0
,
meta_ptr_
,
options_
);
break
;
// step 2: put build index task to scheduler
for
(
auto
&
file
:
to_index_files
)
{
scheduler
::
TableFileSchemaPtr
file_ptr
=
std
::
make_shared
<
meta
::
TableFileSchema
>
(
file
);
job
->
AddToIndexFiles
(
file_ptr
);
}
}
scheduler
::
JobMgrInst
::
GetInstance
()
->
Put
(
job
);
job
->
WaitBuildIndexFinish
();
if
(
!
job
->
GetStatus
().
ok
())
{
Status
status
=
job
->
GetStatus
();
ENGINE_LOG_ERROR
<<
"Building index failed: "
<<
status
.
ToString
();
}
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE
<<
"Background build index thread exit"
;
ENGINE_LOG_TRACE
<<
"Background build index thread exit"
;
}
}
...
...
cpp/src/db/engine/ExecutionEngine.h
浏览文件 @
3eb79168
...
@@ -66,6 +66,9 @@ class ExecutionEngine {
...
@@ -66,6 +66,9 @@ class ExecutionEngine {
virtual
Status
virtual
Status
CopyToGpu
(
uint64_t
device_id
)
=
0
;
CopyToGpu
(
uint64_t
device_id
)
=
0
;
virtual
Status
CopyToIndexFileToGpu
(
uint64_t
device_id
)
=
0
;
virtual
Status
virtual
Status
CopyToCpu
()
=
0
;
CopyToCpu
()
=
0
;
...
...
cpp/src/db/engine/ExecutionEngineImpl.cpp
浏览文件 @
3eb79168
...
@@ -187,6 +187,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
...
@@ -187,6 +187,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return
Status
::
OK
();
return
Status
::
OK
();
}
}
Status
ExecutionEngineImpl
::
CopyToIndexFileToGpu
(
uint64_t
device_id
)
{
auto
index
=
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
GetIndex
(
location_
);
bool
already_in_cache
=
(
index
!=
nullptr
);
if
(
!
already_in_cache
)
{
cache
::
DataObjPtr
obj
=
std
::
make_shared
<
cache
::
DataObj
>
(
nullptr
,
PhysicalSize
());
milvus
::
cache
::
GpuCacheMgr
::
GetInstance
(
device_id
)
->
InsertItem
(
location_
,
obj
);
}
return
Status
::
OK
();
}
Status
Status
ExecutionEngineImpl
::
CopyToCpu
()
{
ExecutionEngineImpl
::
CopyToCpu
()
{
auto
index
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
GetIndex
(
location_
);
auto
index
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
GetIndex
(
location_
);
...
...
cpp/src/db/engine/ExecutionEngineImpl.h
浏览文件 @
3eb79168
...
@@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
...
@@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
Status
CopyToGpu
(
uint64_t
device_id
)
override
;
CopyToGpu
(
uint64_t
device_id
)
override
;
Status
CopyToIndexFileToGpu
(
uint64_t
device_id
)
override
;
Status
Status
CopyToCpu
()
override
;
CopyToCpu
()
override
;
...
...
cpp/src/scheduler/TaskCreator.cpp
浏览文件 @
3eb79168
...
@@ -15,15 +15,18 @@
...
@@ -15,15 +15,18 @@
// specific language governing permissions and limitations
// specific language governing permissions and limitations
// under the License.
// under the License.
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "scheduler/TaskCreator.h"
#include "scheduler/TaskCreator.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "SchedInst.h"
namespace
milvus
{
namespace
milvus
{
namespace
scheduler
{
namespace
scheduler
{
std
::
vector
<
TaskPtr
>
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
JobPtr
&
job
)
{
TaskCreator
::
Create
(
const
JobPtr
&
job
)
{
switch
(
job
->
type
())
{
switch
(
job
->
type
())
{
case
JobType
::
SEARCH
:
{
case
JobType
::
SEARCH
:
{
return
Create
(
std
::
static_pointer_cast
<
SearchJob
>
(
job
));
return
Create
(
std
::
static_pointer_cast
<
SearchJob
>
(
job
));
...
@@ -31,6 +34,9 @@ TaskCreator::Create(const JobPtr& job) {
...
@@ -31,6 +34,9 @@ TaskCreator::Create(const JobPtr& job) {
case
JobType
::
DELETE
:
{
case
JobType
::
DELETE
:
{
return
Create
(
std
::
static_pointer_cast
<
DeleteJob
>
(
job
));
return
Create
(
std
::
static_pointer_cast
<
DeleteJob
>
(
job
));
}
}
case
JobType
::
BUILD
:
{
return
Create
(
std
::
static_pointer_cast
<
BuildIndexJob
>
(
job
));
}
default:
{
default:
{
// TODO(wxyu): error
// TODO(wxyu): error
return
std
::
vector
<
TaskPtr
>
();
return
std
::
vector
<
TaskPtr
>
();
...
@@ -39,7 +45,7 @@ TaskCreator::Create(const JobPtr& job) {
...
@@ -39,7 +45,7 @@ TaskCreator::Create(const JobPtr& job) {
}
}
std
::
vector
<
TaskPtr
>
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
SearchJobPtr
&
job
)
{
TaskCreator
::
Create
(
const
SearchJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
std
::
vector
<
TaskPtr
>
tasks
;
for
(
auto
&
index_file
:
job
->
index_files
())
{
for
(
auto
&
index_file
:
job
->
index_files
())
{
auto
label
=
std
::
make_shared
<
DefaultLabel
>
();
auto
label
=
std
::
make_shared
<
DefaultLabel
>
();
...
@@ -52,7 +58,7 @@ TaskCreator::Create(const SearchJobPtr& job) {
...
@@ -52,7 +58,7 @@ TaskCreator::Create(const SearchJobPtr& job) {
}
}
std
::
vector
<
TaskPtr
>
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
DeleteJobPtr
&
job
)
{
TaskCreator
::
Create
(
const
DeleteJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
std
::
vector
<
TaskPtr
>
tasks
;
auto
label
=
std
::
make_shared
<
BroadcastLabel
>
();
auto
label
=
std
::
make_shared
<
BroadcastLabel
>
();
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
job
,
label
);
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
job
,
label
);
...
@@ -62,5 +68,20 @@ TaskCreator::Create(const DeleteJobPtr& job) {
...
@@ -62,5 +68,20 @@ TaskCreator::Create(const DeleteJobPtr& job) {
return
tasks
;
return
tasks
;
}
}
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
BuildIndexJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
//TODO(yukun): remove "disk" hardcode here
ResourcePtr
res_ptr
=
ResMgrInst
::
GetInstance
()
->
GetResource
(
"disk"
);
for
(
auto
&
to_index_file
:
job
->
to_index_files
())
{
auto
label
=
std
::
make_shared
<
SpecResLabel
>
(
std
::
weak_ptr
<
Resource
>
(
res_ptr
));
auto
task
=
std
::
make_shared
<
XBuildIndexTask
>
(
to_index_file
.
second
,
label
);
task
->
job_
=
job
;
tasks
.
emplace_back
(
task
);
}
return
tasks
;
}
}
// namespace scheduler
}
// namespace scheduler
}
// namespace milvus
}
// namespace milvus
cpp/src/scheduler/TaskCreator.h
浏览文件 @
3eb79168
...
@@ -32,6 +32,7 @@
...
@@ -32,6 +32,7 @@
#include "job/SearchJob.h"
#include "job/SearchJob.h"
#include "task/DeleteTask.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/SearchTask.h"
#include "task/BuildIndexTask.h"
#include "task/Task.h"
#include "task/Task.h"
namespace
milvus
{
namespace
milvus
{
...
@@ -48,6 +49,9 @@ class TaskCreator {
...
@@ -48,6 +49,9 @@ class TaskCreator {
static
std
::
vector
<
TaskPtr
>
static
std
::
vector
<
TaskPtr
>
Create
(
const
DeleteJobPtr
&
job
);
Create
(
const
DeleteJobPtr
&
job
);
static
std
::
vector
<
TaskPtr
>
Create
(
const
BuildIndexJobPtr
&
job
);
};
};
}
// namespace scheduler
}
// namespace scheduler
...
...
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
3eb79168
...
@@ -20,14 +20,16 @@
...
@@ -20,14 +20,16 @@
#include "../Algorithm.h"
#include "../Algorithm.h"
#include "Action.h"
#include "Action.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
namespace
milvus
{
namespace
milvus
{
namespace
scheduler
{
namespace
scheduler
{
std
::
vector
<
ResourcePtr
>
std
::
vector
<
ResourcePtr
>
get_neighbours
(
const
ResourcePtr
&
self
)
{
get_neighbours
(
const
ResourcePtr
&
self
)
{
std
::
vector
<
ResourcePtr
>
neighbours
;
std
::
vector
<
ResourcePtr
>
neighbours
;
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
if
(
not
node
)
if
(
not
node
)
continue
;
continue
;
...
@@ -41,9 +43,9 @@ get_neighbours(const ResourcePtr& self) {
...
@@ -41,9 +43,9 @@ get_neighbours(const ResourcePtr& self) {
}
}
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
get_neighbours_with_connetion
(
const
ResourcePtr
&
self
)
{
get_neighbours_with_connetion
(
const
ResourcePtr
&
self
)
{
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
neighbours
;
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
neighbours
;
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
if
(
not
node
)
if
(
not
node
)
continue
;
continue
;
...
@@ -57,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
...
@@ -57,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
}
void
void
Action
::
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours_with_connetion
(
self
);
auto
neighbours
=
get_neighbours_with_connetion
(
self
);
if
(
not
neighbours
.
empty
())
{
if
(
not
neighbours
.
empty
())
{
std
::
vector
<
uint64_t
>
speeds
;
std
::
vector
<
uint64_t
>
speeds
;
uint64_t
total_speed
=
0
;
uint64_t
total_speed
=
0
;
for
(
auto
&
neighbour
:
neighbours
)
{
for
(
auto
&
neighbour
:
neighbours
)
{
uint64_t
speed
=
neighbour
.
second
.
speed
();
uint64_t
speed
=
neighbour
.
second
.
speed
();
speeds
.
emplace_back
(
speed
);
speeds
.
emplace_back
(
speed
);
total_speed
+=
speed
;
total_speed
+=
speed
;
...
@@ -87,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
...
@@ -87,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
}
void
void
Action
::
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours
(
self
);
auto
neighbours
=
get_neighbours
(
self
);
for
(
auto
&
neighbour
:
neighbours
)
{
for
(
auto
&
neighbour
:
neighbours
)
{
neighbour
->
task_table
().
Put
(
task
);
neighbour
->
task_table
().
Put
(
task
);
}
}
}
}
void
void
Action
::
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
)
{
Action
::
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
)
{
dest
->
task_table
().
Put
(
task
);
dest
->
task_table
().
Put
(
task
);
}
}
...
@@ -137,13 +139,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
...
@@ -137,13 +139,13 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
auto
compute_resources
=
res_mgr
.
lock
()
->
GetComputeResources
();
auto
compute_resources
=
res_mgr
.
lock
()
->
GetComputeResources
();
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
std
::
vector
<
uint64_t
>
transport_costs
;
std
::
vector
<
uint64_t
>
transport_costs
;
for
(
auto
&
res
:
compute_resources
)
{
for
(
auto
&
res
:
compute_resources
)
{
std
::
vector
<
std
::
string
>
path
;
std
::
vector
<
std
::
string
>
path
;
uint64_t
transport_cost
=
ShortestPath
(
resource
,
res
,
res_mgr
.
lock
(),
path
);
uint64_t
transport_cost
=
ShortestPath
(
resource
,
res
,
res_mgr
.
lock
(),
path
);
transport_costs
.
push_back
(
transport_cost
);
transport_costs
.
push_back
(
transport_cost
);
paths
.
emplace_back
(
path
);
paths
.
emplace_back
(
path
);
}
}
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
SEARCH
)
{
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t
min_cost
=
std
::
numeric_limits
<
uint64_t
>::
max
();
uint64_t
min_cost
=
std
::
numeric_limits
<
uint64_t
>::
max
();
uint64_t
min_cost_idx
=
0
;
uint64_t
min_cost_idx
=
0
;
...
@@ -163,6 +165,29 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
...
@@ -163,6 +165,29 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
// step 3: set path in task
// step 3: set path in task
Path
task_path
(
paths
[
min_cost_idx
],
paths
[
min_cost_idx
].
size
()
-
1
);
Path
task_path
(
paths
[
min_cost_idx
],
paths
[
min_cost_idx
].
size
()
-
1
);
task
->
path
()
=
task_path
;
task
->
path
()
=
task_path
;
}
else
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
BUILD
)
{
//step2: Read device id in config
//get build index gpu resource
server
::
Config
&
config
=
server
::
Config
::
GetInstance
();
int32_t
build_index_gpu
;
Status
stat
=
config
.
GetDBConfigBuildIndexGPU
(
build_index_gpu
);
bool
find_gpu_res
=
false
;
for
(
uint64_t
i
=
0
;
i
<
compute_resources
.
size
();
++
i
)
{
if
(
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
build_index_gpu
)
!=
nullptr
)
{
if
(
compute_resources
[
i
]
->
name
()
==
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
build_index_gpu
)
->
name
())
{
find_gpu_res
=
true
;
Path
task_path
(
paths
[
i
],
paths
[
i
].
size
()
-
1
);
task
->
path
()
=
task_path
;
break
;
}
}
}
if
(
not
find_gpu_res
)
{
task
->
path
()
=
Path
(
paths
[
0
],
paths
[
0
].
size
()
-
1
);
}
}
}
}
if
(
resource
->
name
()
==
task
->
path
().
Last
())
{
if
(
resource
->
name
()
==
task
->
path
().
Last
())
{
...
...
cpp/src/scheduler/job/BuildIndexJob.cpp
0 → 100644
浏览文件 @
3eb79168
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexJob.h"
#include "utils/Log.h"
namespace
milvus
{
namespace
scheduler
{
BuildIndexJob
::
BuildIndexJob
(
JobId
id
,
engine
::
meta
::
MetaPtr
meta_ptr
,
engine
::
DBOptions
options
)
:
Job
(
id
,
JobType
::
BUILD
),
meta_ptr_
(
std
::
move
(
meta_ptr
)),
options_
(
std
::
move
(
options
))
{
}
bool
BuildIndexJob
::
AddToIndexFiles
(
const
engine
::
meta
::
TableFileSchemaPtr
&
to_index_file
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
if
(
to_index_file
==
nullptr
||
to_index_files_
.
find
(
to_index_file
->
id_
)
!=
to_index_files_
.
end
())
{
return
false
;
}
SERVER_LOG_DEBUG
<<
"BuildIndexJob "
<<
id
()
<<
" add to_index file: "
<<
to_index_file
->
id_
;
to_index_files_
[
to_index_file
->
id_
]
=
to_index_file
;
}
Status
&
BuildIndexJob
::
WaitBuildIndexFinish
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
cv_
.
wait
(
lock
,
[
this
]
{
return
to_index_files_
.
empty
();
});
SERVER_LOG_DEBUG
<<
"BuildIndexJob "
<<
id
()
<<
" all done"
;
}
void
BuildIndexJob
::
BuildIndexDone
(
size_t
to_index_id
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
to_index_files_
.
erase
(
to_index_id
);
cv_
.
notify_all
();
SERVER_LOG_DEBUG
<<
"BuildIndexJob "
<<
id
()
<<
" finish index file: "
<<
to_index_id
;
}
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/job/BuildIndexJob.h
0 → 100644
浏览文件 @
3eb79168
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include <vector>
#include <list>
#include <queue>
#include <deque>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include "Job.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
namespace
milvus
{
namespace
scheduler
{
using
engine
::
meta
::
TableFileSchemaPtr
;
using
Id2ToIndexMap
=
std
::
unordered_map
<
size_t
,
TableFileSchemaPtr
>
;
using
Id2ToTableFileMap
=
std
::
unordered_map
<
size_t
,
TableFileSchema
>
;
class
BuildIndexJob
:
public
Job
{
public:
explicit
BuildIndexJob
(
JobId
id
,
engine
::
meta
::
MetaPtr
meta_ptr
,
engine
::
DBOptions
options
);
public:
bool
AddToIndexFiles
(
const
TableFileSchemaPtr
&
to_index_file
);
Status
&
WaitBuildIndexFinish
();
void
BuildIndexDone
(
size_t
to_index_id
);
public:
Status
&
GetStatus
()
{
return
status_
;
}
Id2ToIndexMap
&
to_index_files
()
{
return
to_index_files_
;
}
engine
::
meta
::
MetaPtr
meta
()
const
{
return
meta_ptr_
;
}
engine
::
DBOptions
options
()
const
{
return
options_
;
}
private:
Id2ToIndexMap
to_index_files_
;
engine
::
meta
::
MetaPtr
meta_ptr_
;
engine
::
DBOptions
options_
;
Status
status_
;
std
::
mutex
mutex_
;
std
::
condition_variable
cv_
;
};
using
BuildIndexJobPtr
=
std
::
shared_ptr
<
BuildIndexJob
>
;
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/task/BuildIndexTask.cpp
0 → 100644
浏览文件 @
3eb79168
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexTask.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <string>
#include <thread>
#include <utility>
namespace
milvus
{
namespace
scheduler
{
XBuildIndexTask
::
XBuildIndexTask
(
TableFileSchemaPtr
file
,
TaskLabelPtr
label
)
:
Task
(
TaskType
::
BuildIndexTask
,
std
::
move
(
label
)),
file_
(
file
)
{
if
(
file_
)
{
to_index_engine_
=
EngineFactory
::
Build
(
file_
->
dimension_
,
file_
->
location_
,
(
EngineType
)
file_
->
engine_type_
,
(
MetricType
)
file_
->
metric_type_
,
file_
->
nlist_
);
}
}
void
XBuildIndexTask
::
Load
(
milvus
::
scheduler
::
LoadType
type
,
uint8_t
device_id
)
{
TimeRecorder
rc
(
""
);
Status
stat
=
Status
::
OK
();
std
::
string
error_msg
;
std
::
string
type_str
;
if
(
auto
job
=
job_
.
lock
())
{
auto
build_index_job
=
std
::
static_pointer_cast
<
scheduler
::
BuildIndexJob
>
(
job
);
auto
options
=
build_index_job
->
options
();
try
{
if
(
type
==
LoadType
::
DISK2CPU
)
{
stat
=
to_index_engine_
->
Load
(
options
.
insert_cache_immediately_
);
type_str
=
"DISK2CPU"
;
}
else
if
(
type
==
LoadType
::
CPU2GPU
)
{
stat
=
to_index_engine_
->
CopyToIndexFileToGpu
(
device_id
);
type_str
=
"CPU2GPU"
;
}
else
if
(
type
==
LoadType
::
GPU2CPU
)
{
stat
=
to_index_engine_
->
CopyToCpu
();
type_str
=
"GPU2CPU"
;
}
else
{
error_msg
=
"Wrong load type"
;
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
error_msg
=
"Failed to load to_index file: "
+
std
::
string
(
ex
.
what
());
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
if
(
!
stat
.
ok
())
{
Status
s
;
if
(
stat
.
ToString
().
find
(
"out of memory"
)
!=
std
::
string
::
npos
)
{
error_msg
=
"out of memory: "
+
type_str
;
s
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
else
{
error_msg
=
"Failed to load to_index file: "
+
type_str
;
s
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
if
(
auto
job
=
job_
.
lock
())
{
auto
build_index_job
=
std
::
static_pointer_cast
<
scheduler
::
BuildIndexJob
>
(
job
);
build_index_job
->
BuildIndexDone
(
file_
->
id_
);
}
return
;
}
size_t
file_size
=
to_index_engine_
->
PhysicalSize
();
std
::
string
info
=
"Load file id:"
+
std
::
to_string
(
file_
->
id_
)
+
" file type:"
+
std
::
to_string
(
file_
->
file_type_
)
+
" size:"
+
std
::
to_string
(
file_size
)
+
" bytes from location: "
+
file_
->
location_
+
" totally cost"
;
double
span
=
rc
.
ElapseFromBegin
(
info
);
to_index_id_
=
file_
->
id_
;
to_index_type_
=
file_
->
file_type_
;
}
}
void
XBuildIndexTask
::
Execute
()
{
if
(
to_index_engine_
==
nullptr
)
{
return
;
}
TimeRecorder
rc
(
"DoBuildIndex file id:"
+
std
::
to_string
(
to_index_id_
));
if
(
auto
job
=
job_
.
lock
())
{
auto
build_index_job
=
std
::
static_pointer_cast
<
scheduler
::
BuildIndexJob
>
(
job
);
std
::
string
location
=
file_
->
location_
;
EngineType
engine_type
=
(
EngineType
)
file_
->
engine_type_
;
std
::
shared_ptr
<
engine
::
ExecutionEngine
>
index
;
// step 2: create table file
engine
::
meta
::
TableFileSchema
table_file
;
table_file
.
table_id_
=
file_
->
table_id_
;
table_file
.
date_
=
file_
->
date_
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
NEW_INDEX
;
engine
::
meta
::
MetaPtr
meta_ptr
=
build_index_job
->
meta
();
Status
status
=
build_index_job
->
meta
()
->
CreateTableFile
(
table_file
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Failed to create table file: "
<<
status
.
ToString
();
build_index_job
->
BuildIndexDone
(
to_index_id_
);
build_index_job
->
GetStatus
()
=
status
;
return
;
}
// step 3: build index
try
{
index
=
to_index_engine_
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
if
(
index
==
nullptr
)
{
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
return
;
}
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"BuildIndex encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
std
::
cout
<<
"ERROR: failed to build index, index file is too large or gpu memory is not enough"
<<
std
::
endl
;
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
return
;
}
// step 4: if table has been deleted, dont save index file
bool
has_table
=
false
;
meta_ptr
->
HasTable
(
file_
->
table_id_
,
has_table
);
if
(
!
has_table
)
{
meta_ptr
->
DeleteTableFiles
(
file_
->
table_id_
);
return
;
}
// step 5: save index file
try
{
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
std
::
cout
<<
"ERROR: failed to persist index file: "
<<
table_file
.
location_
<<
", possible out of disk space"
<<
std
::
endl
;
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
return
;
}
// step 6: update meta
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
INDEX
;
table_file
.
file_size_
=
index
->
PhysicalSize
();
table_file
.
row_count_
=
index
->
Count
();
auto
origin_file
=
*
file_
;
origin_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
BACKUP
;
engine
::
meta
::
TableFilesSchema
update_files
=
{
table_file
,
origin_file
};
status
=
meta_ptr
->
UpdateTableFiles
(
update_files
);
if
(
status
.
ok
())
{
ENGINE_LOG_DEBUG
<<
"New index file "
<<
table_file
.
file_id_
<<
" of size "
<<
index
->
PhysicalSize
()
<<
" bytes"
<<
" from file "
<<
origin_file
.
file_id_
;
// index->Cache();
}
else
{
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_INDEX
;
status
=
meta_ptr
->
UpdateTableFile
(
origin_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
origin_file
.
file_id_
<<
" to to_index"
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to up date file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
}
build_index_job
->
BuildIndexDone
(
to_index_id_
);
}
rc
.
ElapseFromBegin
(
"totally cost"
);
to_index_engine_
=
nullptr
;
}
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/task/BuildIndexTask.h
0 → 100644
浏览文件 @
3eb79168
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "Task.h"
#include "scheduler/Definition.h"
#include "scheduler/job/BuildIndexJob.h"
namespace
milvus
{
namespace
scheduler
{
class
XBuildIndexTask
:
public
Task
{
public:
explicit
XBuildIndexTask
(
TableFileSchemaPtr
file
,
TaskLabelPtr
label
);
void
Load
(
LoadType
type
,
uint8_t
device_id
)
override
;
void
Execute
()
override
;
public:
TableFileSchemaPtr
file_
;
TableFileSchema
table_file_
;
size_t
to_index_id_
=
0
;
int
to_index_type_
=
0
;
ExecutionEnginePtr
to_index_engine_
=
nullptr
;
};
}
// namespace scheduler
}
// namespace milvus
cpp/src/scheduler/task/Task.h
浏览文件 @
3eb79168
...
@@ -39,6 +39,7 @@ enum class LoadType {
...
@@ -39,6 +39,7 @@ enum class LoadType {
enum
class
TaskType
{
enum
class
TaskType
{
SearchTask
,
SearchTask
,
DeleteTask
,
DeleteTask
,
BuildIndexTask
,
TestTask
,
TestTask
,
};
};
...
...
cpp/src/scheduler/tasklabel/SpecResLabel.h
浏览文件 @
3eb79168
...
@@ -18,13 +18,14 @@
...
@@ -18,13 +18,14 @@
#pragma once
#pragma once
#include "TaskLabel.h"
#include "TaskLabel.h"
#include "scheduler/ResourceMgr.h"
#include <memory>
#include <memory>
#include <string>
#include <string>
class
Resource
;
//
class Resource;
//
using
ResourceWPtr
=
std
::
weak_ptr
<
Resource
>
;
//
using ResourceWPtr = std::weak_ptr<Resource>;
namespace
milvus
{
namespace
milvus
{
namespace
scheduler
{
namespace
scheduler
{
...
...
cpp/unittest/metrics/utils.cpp
浏览文件 @
3eb79168
...
@@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() {
...
@@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() {
}
}
void
MetricTest
::
SetUp
()
{
void
MetricTest
::
SetUp
()
{
boost
::
filesystem
::
remove_all
(
"/tmp/milvus_test"
);
InitLog
();
InitLog
();
auto
options
=
GetOptions
();
auto
options
=
GetOptions
();
db_
=
ms
::
engine
::
DBFactory
::
Build
(
options
);
db_
=
ms
::
engine
::
DBFactory
::
Build
(
options
);
...
...
cpp/unittest/server/test_rpc.cpp
浏览文件 @
3eb79168
...
@@ -351,11 +351,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
...
@@ -351,11 +351,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
handler
->
Insert
(
&
context
,
&
request
,
&
vector_ids
);
handler
->
Insert
(
&
context
,
&
request
,
&
vector_ids
);
//Show table
//show tables
//
::milvus::grpc::Command cmd;
::
milvus
::
grpc
::
Command
cmd
;
// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer
;
::
milvus
::
grpc
::
TableNameList
table_name_list
;
// status = handler->ShowTables(&context, &cmd, writer
);
status
=
handler
->
ShowTables
(
&
context
,
&
cmd
,
&
table_name_list
);
//
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
ASSERT_EQ
(
status
.
error_code
(),
::
grpc
::
Status
::
OK
.
error_code
());
//Count Table
//Count Table
::
milvus
::
grpc
::
TableRowCount
count
;
::
milvus
::
grpc
::
TableRowCount
count
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录