Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
3983b9d2
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3983b9d2
编写于
10月 10, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
using clang-format-6.0
Former-commit-id: 170a69db3779fdbb7cccacb35f19c1cc8cc8be67
上级
22371d4c
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
121 addition
and
126 deletion
+121
-126
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+66
-67
cpp/src/scheduler/TaskCreator.cpp
cpp/src/scheduler/TaskCreator.cpp
+8
-9
cpp/src/scheduler/TaskCreator.h
cpp/src/scheduler/TaskCreator.h
+1
-1
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
+15
-16
cpp/src/scheduler/job/BuildIndexJob.cpp
cpp/src/scheduler/job/BuildIndexJob.cpp
+4
-2
cpp/src/scheduler/job/BuildIndexJob.h
cpp/src/scheduler/job/BuildIndexJob.h
+11
-12
cpp/src/scheduler/task/BuildIndexTask.cpp
cpp/src/scheduler/task/BuildIndexTask.cpp
+14
-16
cpp/src/scheduler/task/BuildIndexTask.h
cpp/src/scheduler/task/BuildIndexTask.h
+0
-1
cpp/src/scheduler/tasklabel/SpecResLabel.h
cpp/src/scheduler/tasklabel/SpecResLabel.h
+2
-2
未找到文件。
cpp/src/db/DBImpl.cpp
浏览文件 @
3983b9d2
...
...
@@ -26,9 +26,9 @@
#include "meta/SqliteMetaImpl.h"
#include "metrics/Metrics.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/BuildIndexJob.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
...
...
@@ -51,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
}
// namespace
DBImpl
::
DBImpl
(
const
DBOptions
&
options
)
DBImpl
::
DBImpl
(
const
DBOptions
&
options
)
:
options_
(
options
),
shutting_down_
(
true
),
compact_thread_pool_
(
1
,
1
),
index_thread_pool_
(
1
,
1
)
{
meta_ptr_
=
MetaFactory
::
Build
(
options
.
meta_
,
options
.
mode_
);
mem_mgr_
=
MemManagerFactory
::
Build
(
meta_ptr_
,
options_
);
...
...
@@ -111,7 +111,7 @@ DBImpl::DropAll() {
}
Status
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -122,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema &table_schema) {
}
Status
DBImpl
::
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
{
DBImpl
::
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -147,7 +147,7 @@ DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
}
Status
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -158,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema &table_schema) {
}
Status
DBImpl
::
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
{
DBImpl
::
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -167,7 +167,7 @@ DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
}
Status
DBImpl
::
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
{
DBImpl
::
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -176,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
}
Status
DBImpl
::
PreloadTable
(
const
std
::
string
&
table_id
)
{
DBImpl
::
PreloadTable
(
const
std
::
string
&
table_id
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -195,11 +195,11 @@ DBImpl::PreloadTable(const std::string &table_id) {
int64_t
cache_usage
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
int64_t
available_size
=
cache_total
-
cache_usage
;
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
ExecutionEnginePtr
engine
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
if
(
engine
==
nullptr
)
{
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
...
...
@@ -212,7 +212,7 @@ DBImpl::PreloadTable(const std::string &table_id) {
try
{
// step 1: load index
engine
->
Load
(
true
);
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"Pre-load table encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
return
Status
(
DB_ERROR
,
msg
);
...
...
@@ -224,7 +224,7 @@ DBImpl::PreloadTable(const std::string &table_id) {
}
Status
DBImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
DBImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -233,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
}
Status
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -261,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
}
Status
DBImpl
::
CreateIndex
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
{
DBImpl
::
CreateIndex
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
{
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
build_index_mutex_
);
...
...
@@ -316,7 +316,7 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
while
(
!
file_ids
.
empty
())
{
ENGINE_LOG_DEBUG
<<
"Non index files detected! Will build index "
<<
times
;
if
(
index
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
if
(
index
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
status
=
meta_ptr_
->
UpdateTableFilesToIndex
(
table_id
);
}
...
...
@@ -329,19 +329,19 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
}
Status
DBImpl
::
DescribeIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
{
DBImpl
::
DescribeIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
{
return
meta_ptr_
->
DescribeTableIndex
(
table_id
,
index
);
}
Status
DBImpl
::
DropIndex
(
const
std
::
string
&
table_id
)
{
DBImpl
::
DropIndex
(
const
std
::
string
&
table_id
)
{
ENGINE_LOG_DEBUG
<<
"Drop index for table: "
<<
table_id
;
return
meta_ptr_
->
DropTableIndex
(
table_id
);
}
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
QueryResults
&
results
)
{
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -353,8 +353,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
}
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -370,8 +370,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
}
meta
::
TableFilesSchema
file_id_array
;
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
file_id_array
.
push_back
(
file
);
}
}
...
...
@@ -383,8 +383,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
}
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
std
::
string
>
&
file_ids
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
std
::
string
>&
file_ids
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -393,7 +393,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
// get specified files
std
::
vector
<
size_t
>
ids
;
for
(
auto
&
id
:
file_ids
)
{
for
(
auto
&
id
:
file_ids
)
{
meta
::
TableFileSchema
table_file
;
table_file
.
table_id_
=
table_id
;
std
::
string
::
size_type
sz
;
...
...
@@ -407,8 +407,8 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
}
meta
::
TableFilesSchema
file_id_array
;
for
(
auto
&
day_files
:
files_array
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
for
(
auto
&
day_files
:
files_array
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
file_id_array
.
push_back
(
file
);
}
}
...
...
@@ -424,7 +424,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
}
Status
DBImpl
::
Size
(
uint64_t
&
result
)
{
DBImpl
::
Size
(
uint64_t
&
result
)
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
))
{
return
Status
(
DB_ERROR
,
"Milsvus server is shutdown!"
);
}
...
...
@@ -436,8 +436,8 @@ DBImpl::Size(uint64_t &result) {
// internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
server
::
CollectQueryMetrics
metrics
(
nq
);
TimeRecorder
rc
(
""
);
...
...
@@ -446,7 +446,7 @@ DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &fi
ENGINE_LOG_DEBUG
<<
"Engine query begin, index file count: "
<<
files
.
size
()
<<
" date range count: "
<<
dates
.
size
();
scheduler
::
SearchJobPtr
job
=
std
::
make_shared
<
scheduler
::
SearchJob
>
(
0
,
k
,
nq
,
nprobe
,
vectors
);
for
(
auto
&
file
:
files
)
{
for
(
auto
&
file
:
files
)
{
scheduler
::
TableFileSchemaPtr
file_ptr
=
std
::
make_shared
<
meta
::
TableFileSchema
>
(
file
);
job
->
AddIndexFile
(
file_ptr
);
}
...
...
@@ -514,7 +514,7 @@ DBImpl::BackgroundTimerTask() {
void
DBImpl
::
WaitMergeFileFinish
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
compact_result_mutex_
);
for
(
auto
&
iter
:
compact_thread_results_
)
{
for
(
auto
&
iter
:
compact_thread_results_
)
{
iter
.
wait
();
}
}
...
...
@@ -522,7 +522,7 @@ DBImpl::WaitMergeFileFinish() {
void
DBImpl
::
WaitBuildIndexFinish
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
index_result_mutex_
);
for
(
auto
&
iter
:
index_thread_results_
)
{
for
(
auto
&
iter
:
index_thread_results_
)
{
iter
.
wait
();
}
}
...
...
@@ -563,7 +563,7 @@ DBImpl::MemSerialize() {
std
::
lock_guard
<
std
::
mutex
>
lck
(
mem_serialize_mutex_
);
std
::
set
<
std
::
string
>
temp_table_ids
;
mem_mgr_
->
Serialize
(
temp_table_ids
);
for
(
auto
&
id
:
temp_table_ids
)
{
for
(
auto
&
id
:
temp_table_ids
)
{
compact_table_ids_
.
insert
(
id
);
}
...
...
@@ -608,7 +608,7 @@ DBImpl::StartCompactionTask() {
}
Status
DBImpl
::
MergeFiles
(
const
std
::
string
&
table_id
,
const
meta
::
DateT
&
date
,
const
meta
::
TableFilesSchema
&
files
)
{
DBImpl
::
MergeFiles
(
const
std
::
string
&
table_id
,
const
meta
::
DateT
&
date
,
const
meta
::
TableFilesSchema
&
files
)
{
ENGINE_LOG_DEBUG
<<
"Merge files for table: "
<<
table_id
;
// step 1: create table file
...
...
@@ -625,13 +625,13 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 2: merge files
ExecutionEnginePtr
index
=
EngineFactory
::
Build
(
table_file
.
dimension_
,
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
,
(
MetricType
)
table_file
.
metric_type_
,
table_file
.
nlist_
);
EngineFactory
::
Build
(
table_file
.
dimension_
,
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
,
(
MetricType
)
table_file
.
metric_type_
,
table_file
.
nlist_
);
meta
::
TableFilesSchema
updated
;
int64_t
index_size
=
0
;
for
(
auto
&
file
:
files
)
{
for
(
auto
&
file
:
files
)
{
server
::
CollectMergeFilesMetrics
metrics
;
index
->
Merge
(
file
.
location_
);
...
...
@@ -649,7 +649,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 3: serialize to disk
try
{
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize merged index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
...
...
@@ -667,7 +667,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 4: update table files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
// else set file type to RAW, no need to build index
if
(
table_file
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
if
(
table_file
.
engine_type_
!=
(
int
)
EngineType
::
FAISS_IDMAP
)
{
table_file
.
file_type_
=
(
index
->
PhysicalSize
()
>=
table_file
.
index_file_size_
)
?
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
}
else
{
...
...
@@ -687,7 +687,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
}
Status
DBImpl
::
BackgroundMergeFiles
(
const
std
::
string
&
table_id
)
{
DBImpl
::
BackgroundMergeFiles
(
const
std
::
string
&
table_id
)
{
meta
::
DatePartionedTableFilesSchema
raw_files
;
auto
status
=
meta_ptr_
->
FilesToMerge
(
table_id
,
raw_files
);
if
(
!
status
.
ok
())
{
...
...
@@ -696,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string &table_id) {
}
bool
has_merge
=
false
;
for
(
auto
&
kv
:
raw_files
)
{
for
(
auto
&
kv
:
raw_files
)
{
auto
files
=
kv
.
second
;
if
(
files
.
size
()
<
options_
.
merge_trigger_number_
)
{
ENGINE_LOG_DEBUG
<<
"Files number not greater equal than merge trigger number, skip merge action"
;
...
...
@@ -719,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE
<<
" Background compaction thread start"
;
Status
status
;
for
(
auto
&
table_id
:
table_ids
)
{
for
(
auto
&
table_id
:
table_ids
)
{
status
=
BackgroundMergeFiles
(
table_id
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Merge files for table "
<<
table_id
<<
" failed: "
<<
status
.
ToString
();
...
...
@@ -771,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) {
}
Status
DBImpl
::
BuildIndex
(
const
meta
::
TableFileSchema
&
file
)
{
ExecutionEnginePtr
to_index
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
DBImpl
::
BuildIndex
(
const
meta
::
TableFileSchema
&
file
)
{
ExecutionEnginePtr
to_index
=
EngineFactory
::
Build
(
file
.
dimension_
,
file
.
location_
,
(
EngineType
)
file
.
engine_type_
,
(
MetricType
)
file
.
metric_type_
,
file
.
nlist_
);
if
(
to_index
==
nullptr
)
{
ENGINE_LOG_ERROR
<<
"Invalid engine type"
;
return
Status
(
DB_ERROR
,
"Invalid engine type"
);
...
...
@@ -804,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
try
{
server
::
CollectBuildIndexMetrics
metrics
;
index
=
to_index
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
index
=
to_index
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
if
(
index
==
nullptr
)
{
table_file
.
file_type_
=
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
...
...
@@ -813,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
return
status
;
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of gpu memory
std
::
string
msg
=
"BuildIndex encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
...
...
@@ -839,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
// step 5: save index file
try
{
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
...
...
@@ -882,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
status
=
meta_ptr_
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"Build index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
return
Status
(
DB_ERROR
,
msg
);
...
...
@@ -900,11 +900,10 @@ DBImpl::BackgroundBuildIndex() {
meta_ptr_
->
FilesToIndex
(
to_index_files
);
Status
status
;
scheduler
::
BuildIndexJobPtr
job
=
std
::
make_shared
<
scheduler
::
BuildIndexJob
>
(
0
,
meta_ptr_
,
options_
);
scheduler
::
BuildIndexJobPtr
job
=
std
::
make_shared
<
scheduler
::
BuildIndexJob
>
(
0
,
meta_ptr_
,
options_
);
// step 2: put build index task to scheduler
for
(
auto
&
file
:
to_index_files
)
{
for
(
auto
&
file
:
to_index_files
)
{
scheduler
::
TableFileSchemaPtr
file_ptr
=
std
::
make_shared
<
meta
::
TableFileSchema
>
(
file
);
job
->
AddToIndexFiles
(
file_ptr
);
}
...
...
@@ -915,17 +914,17 @@ DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_ERROR
<<
"Building index failed: "
<<
status
.
ToString
();
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE
<<
"Background build index thread exit"
;
}
...
...
cpp/src/scheduler/TaskCreator.cpp
浏览文件 @
3983b9d2
...
...
@@ -15,18 +15,17 @@
// specific language governing permissions and limitations
// under the License.
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "scheduler/TaskCreator.h"
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "SchedInst.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "SchedInst.h"
namespace
milvus
{
namespace
scheduler
{
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
JobPtr
&
job
)
{
TaskCreator
::
Create
(
const
JobPtr
&
job
)
{
switch
(
job
->
type
())
{
case
JobType
::
SEARCH
:
{
return
Create
(
std
::
static_pointer_cast
<
SearchJob
>
(
job
));
...
...
@@ -45,7 +44,7 @@ TaskCreator::Create(const JobPtr &job) {
}
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
SearchJobPtr
&
job
)
{
TaskCreator
::
Create
(
const
SearchJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
for
(
auto
&
index_file
:
job
->
index_files
())
{
auto
label
=
std
::
make_shared
<
DefaultLabel
>
();
...
...
@@ -58,7 +57,7 @@ TaskCreator::Create(const SearchJobPtr &job) {
}
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
DeleteJobPtr
&
job
)
{
TaskCreator
::
Create
(
const
DeleteJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
auto
label
=
std
::
make_shared
<
BroadcastLabel
>
();
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
job
,
label
);
...
...
@@ -69,12 +68,12 @@ TaskCreator::Create(const DeleteJobPtr &job) {
}
std
::
vector
<
TaskPtr
>
TaskCreator
::
Create
(
const
BuildIndexJobPtr
&
job
)
{
TaskCreator
::
Create
(
const
BuildIndexJobPtr
&
job
)
{
std
::
vector
<
TaskPtr
>
tasks
;
//TODO(yukun): remove "disk" hardcode here
//
TODO(yukun): remove "disk" hardcode here
ResourcePtr
res_ptr
=
ResMgrInst
::
GetInstance
()
->
GetResource
(
"disk"
);
for
(
auto
&
to_index_file
:
job
->
to_index_files
())
{
for
(
auto
&
to_index_file
:
job
->
to_index_files
())
{
auto
label
=
std
::
make_shared
<
SpecResLabel
>
(
std
::
weak_ptr
<
Resource
>
(
res_ptr
));
auto
task
=
std
::
make_shared
<
XBuildIndexTask
>
(
to_index_file
.
second
,
label
);
task
->
job_
=
job
;
...
...
cpp/src/scheduler/TaskCreator.h
浏览文件 @
3983b9d2
...
...
@@ -30,9 +30,9 @@
#include "job/DeleteJob.h"
#include "job/Job.h"
#include "job/SearchJob.h"
#include "task/BuildIndexTask.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/BuildIndexTask.h"
#include "task/Task.h"
namespace
milvus
{
...
...
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
3983b9d2
...
...
@@ -22,14 +22,13 @@
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
namespace
milvus
{
namespace
scheduler
{
std
::
vector
<
ResourcePtr
>
get_neighbours
(
const
ResourcePtr
&
self
)
{
get_neighbours
(
const
ResourcePtr
&
self
)
{
std
::
vector
<
ResourcePtr
>
neighbours
;
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
if
(
not
node
)
continue
;
...
...
@@ -43,9 +42,9 @@ get_neighbours(const ResourcePtr &self) {
}
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
get_neighbours_with_connetion
(
const
ResourcePtr
&
self
)
{
get_neighbours_with_connetion
(
const
ResourcePtr
&
self
)
{
std
::
vector
<
std
::
pair
<
ResourcePtr
,
Connection
>>
neighbours
;
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
for
(
auto
&
neighbour_node
:
self
->
GetNeighbours
())
{
auto
node
=
neighbour_node
.
neighbour_node
.
lock
();
if
(
not
node
)
continue
;
...
...
@@ -59,12 +58,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) {
}
void
Action
::
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours_with_connetion
(
self
);
if
(
not
neighbours
.
empty
())
{
std
::
vector
<
uint64_t
>
speeds
;
uint64_t
total_speed
=
0
;
for
(
auto
&
neighbour
:
neighbours
)
{
for
(
auto
&
neighbour
:
neighbours
)
{
uint64_t
speed
=
neighbour
.
second
.
speed
();
speeds
.
emplace_back
(
speed
);
total_speed
+=
speed
;
...
...
@@ -89,15 +88,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self
}
void
Action
::
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours
(
self
);
for
(
auto
&
neighbour
:
neighbours
)
{
for
(
auto
&
neighbour
:
neighbours
)
{
neighbour
->
task_table
().
Put
(
task
);
}
}
void
Action
::
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
)
{
Action
::
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
)
{
dest
->
task_table
().
Put
(
task
);
}
...
...
@@ -139,7 +138,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
auto
compute_resources
=
res_mgr
.
lock
()
->
GetComputeResources
();
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
std
::
vector
<
uint64_t
>
transport_costs
;
for
(
auto
&
res
:
compute_resources
)
{
for
(
auto
&
res
:
compute_resources
)
{
std
::
vector
<
std
::
string
>
path
;
uint64_t
transport_cost
=
ShortestPath
(
resource
,
res
,
res_mgr
.
lock
(),
path
);
transport_costs
.
push_back
(
transport_cost
);
...
...
@@ -166,17 +165,17 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
Path
task_path
(
paths
[
min_cost_idx
],
paths
[
min_cost_idx
].
size
()
-
1
);
task
->
path
()
=
task_path
;
}
else
if
(
task
->
job_
.
lock
()
->
type
()
==
JobType
::
BUILD
)
{
//step2: Read device id in config
//get build index gpu resource
server
::
Config
&
config
=
server
::
Config
::
GetInstance
();
//
step2: Read device id in config
//
get build index gpu resource
server
::
Config
&
config
=
server
::
Config
::
GetInstance
();
int32_t
build_index_gpu
;
Status
stat
=
config
.
GetDBConfigBuildIndexGPU
(
build_index_gpu
);
bool
find_gpu_res
=
false
;
for
(
uint64_t
i
=
0
;
i
<
compute_resources
.
size
();
++
i
)
{
if
(
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
build_index_gpu
)
!=
nullptr
)
{
if
(
compute_resources
[
i
]
->
name
()
==
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
build_index_gpu
)
->
name
())
{
if
(
compute_resources
[
i
]
->
name
()
==
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
build_index_gpu
)
->
name
())
{
find_gpu_res
=
true
;
Path
task_path
(
paths
[
i
],
paths
[
i
].
size
()
-
1
);
task
->
path
()
=
task_path
;
...
...
cpp/src/scheduler/job/BuildIndexJob.cpp
浏览文件 @
3983b9d2
...
...
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexJob.h"
#include "
scheduler/job/
BuildIndexJob.h"
#include "utils/Log.h"
#include <utility>
namespace
milvus
{
namespace
scheduler
{
...
...
@@ -26,7 +28,7 @@ BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::D
}
bool
BuildIndexJob
::
AddToIndexFiles
(
const
engine
::
meta
::
TableFileSchemaPtr
&
to_index_file
)
{
BuildIndexJob
::
AddToIndexFiles
(
const
engine
::
meta
::
TableFileSchemaPtr
&
to_index_file
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
if
(
to_index_file
==
nullptr
||
to_index_files_
.
find
(
to_index_file
->
id_
)
!=
to_index_files_
.
end
())
{
return
false
;
...
...
cpp/src/scheduler/job/BuildIndexJob.h
浏览文件 @
3983b9d2
...
...
@@ -16,22 +16,21 @@
// under the License.
#pragma once
#include <
string
>
#include <
vector
>
#include <
condition_variable
>
#include <
deque
>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <deque>
#include <unordered_map>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <vector>
#include "Job.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
namespace
milvus
{
namespace
scheduler
{
...
...
@@ -46,21 +45,21 @@ class BuildIndexJob : public Job {
public:
bool
AddToIndexFiles
(
const
TableFileSchemaPtr
&
to_index_file
);
AddToIndexFiles
(
const
TableFileSchemaPtr
&
to_index_file
);
Status
&
Status
&
WaitBuildIndexFinish
();
void
BuildIndexDone
(
size_t
to_index_id
);
public:
Status
&
Status
&
GetStatus
()
{
return
status_
;
}
Id2ToIndexMap
&
Id2ToIndexMap
&
to_index_files
()
{
return
to_index_files_
;
}
...
...
cpp/src/scheduler/task/BuildIndexTask.cpp
浏览文件 @
3983b9d2
...
...
@@ -15,27 +15,26 @@
// specific language governing permissions and limitations
// under the License.
#include "BuildIndexTask.h"
#include "scheduler/task/BuildIndexTask.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <memory>
#include <string>
#include <thread>
#include <utility>
namespace
milvus
{
namespace
scheduler
{
XBuildIndexTask
::
XBuildIndexTask
(
TableFileSchemaPtr
file
,
TaskLabelPtr
label
)
:
Task
(
TaskType
::
BuildIndexTask
,
std
::
move
(
label
)),
file_
(
file
)
{
if
(
file_
)
{
to_index_engine_
=
EngineFactory
::
Build
(
file_
->
dimension_
,
file_
->
location_
,
(
EngineType
)
file_
->
engine_type_
,
(
MetricType
)
file_
->
metric_type_
,
file_
->
nlist_
);
to_index_engine_
=
EngineFactory
::
Build
(
file_
->
dimension_
,
file_
->
location_
,
(
EngineType
)
file_
->
engine_type_
,
(
MetricType
)
file_
->
metric_type_
,
file_
->
nlist_
);
}
}
...
...
@@ -63,7 +62,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
error_msg
=
"Wrong load type"
;
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
error_msg
=
"Failed to load to_index file: "
+
std
::
string
(
ex
.
what
());
stat
=
Status
(
SERVER_UNEXPECTED_ERROR
,
error_msg
);
...
...
@@ -89,9 +88,9 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
size_t
file_size
=
to_index_engine_
->
PhysicalSize
();
std
::
string
info
=
"Load file id:"
+
std
::
to_string
(
file_
->
id_
)
+
" file type:"
+
std
::
to_string
(
file_
->
file_type_
)
+
" size:"
+
std
::
to_string
(
file_size
)
+
" bytes from location: "
+
file_
->
location_
+
" totally cost"
;
std
::
string
info
=
"Load file id:"
+
std
::
to_string
(
file_
->
id_
)
+
" file type:"
+
std
::
to_string
(
file_
->
file_type_
)
+
" size:"
+
std
::
to_string
(
file_size
)
+
" bytes from location: "
+
file_
->
location_
+
" totally cost"
;
double
span
=
rc
.
ElapseFromBegin
(
info
);
to_index_id_
=
file_
->
id_
;
...
...
@@ -110,15 +109,14 @@ XBuildIndexTask::Execute() {
if
(
auto
job
=
job_
.
lock
())
{
auto
build_index_job
=
std
::
static_pointer_cast
<
scheduler
::
BuildIndexJob
>
(
job
);
std
::
string
location
=
file_
->
location_
;
EngineType
engine_type
=
(
EngineType
)
file_
->
engine_type_
;
EngineType
engine_type
=
(
EngineType
)
file_
->
engine_type_
;
std
::
shared_ptr
<
engine
::
ExecutionEngine
>
index
;
// step 2: create table file
engine
::
meta
::
TableFileSchema
table_file
;
table_file
.
table_id_
=
file_
->
table_id_
;
table_file
.
date_
=
file_
->
date_
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
NEW_INDEX
;
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
NEW_INDEX
;
engine
::
meta
::
MetaPtr
meta_ptr
=
build_index_job
->
meta
();
Status
status
=
build_index_job
->
meta
()
->
CreateTableFile
(
table_file
);
...
...
@@ -131,7 +129,7 @@ XBuildIndexTask::Execute() {
// step 3: build index
try
{
index
=
to_index_engine_
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
index
=
to_index_engine_
->
BuildIndex
(
table_file
.
location_
,
(
EngineType
)
table_file
.
engine_type_
);
if
(
index
==
nullptr
)
{
table_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_DELETE
;
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
...
...
@@ -140,7 +138,7 @@ XBuildIndexTask::Execute() {
return
;
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
std
::
string
msg
=
"BuildIndex encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
...
...
@@ -166,7 +164,7 @@ XBuildIndexTask::Execute() {
// step 5: save index file
try
{
index
->
Serialize
();
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
// typical error: out of disk space or permition denied
std
::
string
msg
=
"Serialize index encounter exception: "
+
std
::
string
(
ex
.
what
());
ENGINE_LOG_ERROR
<<
msg
;
...
...
@@ -197,7 +195,7 @@ XBuildIndexTask::Execute() {
<<
" bytes"
<<
" from file "
<<
origin_file
.
file_id_
;
// index->Cache();
// index->Cache();
}
else
{
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file
.
file_type_
=
engine
::
meta
::
TableFileSchema
::
TO_INDEX
;
...
...
cpp/src/scheduler/task/BuildIndexTask.h
浏览文件 @
3983b9d2
...
...
@@ -21,7 +21,6 @@
#include "scheduler/Definition.h"
#include "scheduler/job/BuildIndexJob.h"
namespace
milvus
{
namespace
scheduler
{
...
...
cpp/src/scheduler/tasklabel/SpecResLabel.h
浏览文件 @
3983b9d2
...
...
@@ -23,9 +23,9 @@
#include <memory>
#include <string>
//class Resource;
//
class Resource;
//
//using ResourceWPtr = std::weak_ptr<Resource>;
//
using ResourceWPtr = std::weak_ptr<Resource>;
namespace
milvus
{
namespace
scheduler
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录