Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
7318786b
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7318786b
编写于
11月 20, 2019
作者:
J
JinHai-CN
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' into 0.6.0
上级
290f4ad7
f1930a56
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
138 addition
and
14 deletion
+138
-14
core/src/db/DBImpl.cpp
core/src/db/DBImpl.cpp
+9
-9
core/src/scheduler/task/BuildIndexTask.cpp
core/src/scheduler/task/BuildIndexTask.cpp
+3
-4
core/src/wrapper/VecImpl.cpp
core/src/wrapper/VecImpl.cpp
+126
-1
未找到文件。
core/src/db/DBImpl.cpp
浏览文件 @
7318786b
...
@@ -84,12 +84,12 @@ DBImpl::Start() {
...
@@ -84,12 +84,12 @@ DBImpl::Start() {
return
Status
::
OK
();
return
Status
::
OK
();
}
}
ENGINE_LOG_TRACE
<<
"DB service start"
;
//
ENGINE_LOG_TRACE << "DB service start";
shutting_down_
.
store
(
false
,
std
::
memory_order_release
);
shutting_down_
.
store
(
false
,
std
::
memory_order_release
);
// for distribute version, some nodes are read only
// for distribute version, some nodes are read only
if
(
options_
.
mode_
!=
DBOptions
::
MODE
::
CLUSTER_READONLY
)
{
if
(
options_
.
mode_
!=
DBOptions
::
MODE
::
CLUSTER_READONLY
)
{
ENGINE_LOG_TRACE
<<
"StartTimerTasks"
;
//
ENGINE_LOG_TRACE << "StartTimerTasks";
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
}
}
...
@@ -114,7 +114,7 @@ DBImpl::Stop() {
...
@@ -114,7 +114,7 @@ DBImpl::Stop() {
meta_ptr_
->
CleanUp
();
meta_ptr_
->
CleanUp
();
}
}
ENGINE_LOG_TRACE
<<
"DB service stop"
;
//
ENGINE_LOG_TRACE << "DB service stop";
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -558,7 +558,7 @@ DBImpl::StartMetricTask() {
...
@@ -558,7 +558,7 @@ DBImpl::StartMetricTask() {
return
;
return
;
}
}
ENGINE_LOG_TRACE
<<
"Start metric task"
;
//
ENGINE_LOG_TRACE << "Start metric task";
server
::
Metrics
::
GetInstance
().
KeepingAliveCounterIncrement
(
METRIC_ACTION_INTERVAL
);
server
::
Metrics
::
GetInstance
().
KeepingAliveCounterIncrement
(
METRIC_ACTION_INTERVAL
);
int64_t
cache_usage
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
int64_t
cache_usage
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
...
@@ -584,7 +584,7 @@ DBImpl::StartMetricTask() {
...
@@ -584,7 +584,7 @@ DBImpl::StartMetricTask() {
server
::
Metrics
::
GetInstance
().
GPUTemperature
();
server
::
Metrics
::
GetInstance
().
GPUTemperature
();
server
::
Metrics
::
GetInstance
().
CPUTemperature
();
server
::
Metrics
::
GetInstance
().
CPUTemperature
();
ENGINE_LOG_TRACE
<<
"Metric task finished"
;
//
ENGINE_LOG_TRACE << "Metric task finished";
}
}
Status
Status
...
@@ -756,7 +756,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
...
@@ -756,7 +756,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
void
void
DBImpl
::
BackgroundCompaction
(
std
::
set
<
std
::
string
>
table_ids
)
{
DBImpl
::
BackgroundCompaction
(
std
::
set
<
std
::
string
>
table_ids
)
{
ENGINE_LOG_TRACE
<<
"
Background compaction thread start"
;
// ENGINE_LOG_TRACE << "
Background compaction thread start";
Status
status
;
Status
status
;
for
(
auto
&
table_id
:
table_ids
)
{
for
(
auto
&
table_id
:
table_ids
)
{
...
@@ -779,7 +779,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
...
@@ -779,7 +779,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
}
}
meta_ptr_
->
CleanUpFilesWithTTL
(
ttl
);
meta_ptr_
->
CleanUpFilesWithTTL
(
ttl
);
ENGINE_LOG_TRACE
<<
"
Background compaction thread exit"
;
// ENGINE_LOG_TRACE << "
Background compaction thread exit";
}
}
void
void
...
@@ -812,7 +812,7 @@ DBImpl::StartBuildIndexTask(bool force) {
...
@@ -812,7 +812,7 @@ DBImpl::StartBuildIndexTask(bool force) {
void
void
DBImpl
::
BackgroundBuildIndex
()
{
DBImpl
::
BackgroundBuildIndex
()
{
ENGINE_LOG_TRACE
<<
"Background build index thread start"
;
//
ENGINE_LOG_TRACE << "Background build index thread start";
std
::
unique_lock
<
std
::
mutex
>
lock
(
build_index_mutex_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
build_index_mutex_
);
meta
::
TableFilesSchema
to_index_files
;
meta
::
TableFilesSchema
to_index_files
;
...
@@ -835,7 +835,7 @@ DBImpl::BackgroundBuildIndex() {
...
@@ -835,7 +835,7 @@ DBImpl::BackgroundBuildIndex() {
}
}
}
}
ENGINE_LOG_TRACE
<<
"Background build index thread exit"
;
//
ENGINE_LOG_TRACE << "Background build index thread exit";
}
}
Status
Status
...
...
core/src/scheduler/task/BuildIndexTask.cpp
浏览文件 @
7318786b
...
@@ -146,8 +146,7 @@ XBuildIndexTask::Execute() {
...
@@ -146,8 +146,7 @@ XBuildIndexTask::Execute() {
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
std
::
cout
<<
"ERROR: failed to build index, index file is too large or gpu memory is not enough"
ENGINE_LOG_ERROR
<<
"Failed to build index, index file is too large or gpu memory is not enough"
;
<<
std
::
endl
;
build_index_job
->
BuildIndexDone
(
to_index_id_
);
build_index_job
->
BuildIndexDone
(
to_index_id_
);
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
...
@@ -179,8 +178,8 @@ XBuildIndexTask::Execute() {
...
@@ -179,8 +178,8 @@ XBuildIndexTask::Execute() {
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
status
=
meta_ptr
->
UpdateTableFile
(
table_file
);
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
ENGINE_LOG_DEBUG
<<
"Failed to update file to index, mark file: "
<<
table_file
.
file_id_
<<
" to to_delete"
;
std
::
cout
<<
"ERROR: f
ailed to persist index file: "
<<
table_file
.
location_
ENGINE_LOG_ERROR
<<
"F
ailed to persist index file: "
<<
table_file
.
location_
<<
", possible out of disk space"
<<
std
::
endl
;
<<
", possible out of disk space"
;
build_index_job
->
BuildIndexDone
(
to_index_id_
);
build_index_job
->
BuildIndexDone
(
to_index_id_
);
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
build_index_job
->
GetStatus
()
=
Status
(
DB_ERROR
,
msg
);
...
...
core/src/wrapper/VecImpl.cpp
浏览文件 @
7318786b
...
@@ -31,7 +31,7 @@
...
@@ -31,7 +31,7 @@
/*
/*
* no parameter check in this layer.
* no parameter check in this layer.
* only responible for index combination
* only respon
s
ible for index combination
*/
*/
namespace
milvus
{
namespace
milvus
{
...
@@ -246,5 +246,130 @@ BFIndex::BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const
...
@@ -246,5 +246,130 @@ BFIndex::BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const
return
Status
::
OK
();
return
Status
::
OK
();
}
}
// TODO(linxj): add lock here.
Status
IVFMixIndex
::
BuildAll
(
const
int64_t
&
nb
,
const
float
*
xb
,
const
int64_t
*
ids
,
const
Config
&
cfg
,
const
int64_t
&
nt
,
const
float
*
xt
)
{
try
{
dim
=
cfg
->
d
;
auto
dataset
=
GenDatasetWithIds
(
nb
,
dim
,
xb
,
ids
);
auto
preprocessor
=
index_
->
BuildPreprocessor
(
dataset
,
cfg
);
index_
->
set_preprocessor
(
preprocessor
);
auto
model
=
index_
->
Train
(
dataset
,
cfg
);
index_
->
set_index_model
(
model
);
index_
->
Add
(
dataset
,
cfg
);
if
(
auto
device_index
=
std
::
dynamic_pointer_cast
<
knowhere
::
GPUIndex
>
(
index_
))
{
auto
host_index
=
device_index
->
CopyGpuToCpu
(
Config
());
index_
=
host_index
;
type
=
ConvertToCpuIndexType
(
type
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Build IVFMIXIndex Failed"
;
return
Status
(
KNOWHERE_ERROR
,
"Build IVFMIXIndex Failed"
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_UNEXPECTED_ERROR
,
e
.
what
());
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_ERROR
,
e
.
what
());
}
return
Status
::
OK
();
}
Status
IVFMixIndex
::
Load
(
const
knowhere
::
BinarySet
&
index_binary
)
{
index_
->
Load
(
index_binary
);
dim
=
Dimension
();
return
Status
::
OK
();
}
knowhere
::
QuantizerPtr
IVFHybridIndex
::
LoadQuantizer
(
const
Config
&
conf
)
{
// TODO(linxj): Hardcode here
if
(
auto
new_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
return
new_idx
->
LoadQuantizer
(
conf
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not supported for index type: "
<<
int
(
type
);
}
}
Status
IVFHybridIndex
::
SetQuantizer
(
const
knowhere
::
QuantizerPtr
&
q
)
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
new_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
new_idx
->
SetQuantizer
(
q
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not supported for index type: "
<<
int
(
type
);
return
Status
(
KNOWHERE_ERROR
,
"not supported"
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_UNEXPECTED_ERROR
,
e
.
what
());
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_ERROR
,
e
.
what
());
}
return
Status
::
OK
();
}
Status
IVFHybridIndex
::
UnsetQuantizer
()
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
new_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
new_idx
->
UnsetQuantizer
();
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not supported for index type: "
<<
int
(
type
);
return
Status
(
KNOWHERE_ERROR
,
"not supported"
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_UNEXPECTED_ERROR
,
e
.
what
());
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
return
Status
(
KNOWHERE_ERROR
,
e
.
what
());
}
return
Status
::
OK
();
}
VecIndexPtr
IVFHybridIndex
::
LoadData
(
const
knowhere
::
QuantizerPtr
&
q
,
const
Config
&
conf
)
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
new_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
return
std
::
make_shared
<
IVFHybridIndex
>
(
new_idx
->
LoadData
(
q
,
conf
),
type
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not supported for index type: "
<<
int
(
type
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
return
nullptr
;
}
std
::
pair
<
VecIndexPtr
,
knowhere
::
QuantizerPtr
>
IVFHybridIndex
::
CopyToGpuWithQuantizer
(
const
int64_t
&
device_id
,
const
Config
&
cfg
)
{
try
{
// TODO(linxj): Hardcode here
if
(
auto
hybrid_idx
=
std
::
dynamic_pointer_cast
<
knowhere
::
IVFSQHybrid
>
(
index_
))
{
auto
pair
=
hybrid_idx
->
CopyCpuToGpuWithQuantizer
(
device_id
,
cfg
);
auto
new_idx
=
std
::
make_shared
<
IVFHybridIndex
>
(
pair
.
first
,
type
);
return
std
::
make_pair
(
new_idx
,
pair
.
second
);
}
else
{
WRAPPER_LOG_ERROR
<<
"Hybrid mode not supported for index type: "
<<
int
(
type
);
}
}
catch
(
knowhere
::
KnowhereException
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
catch
(
std
::
exception
&
e
)
{
WRAPPER_LOG_ERROR
<<
e
.
what
();
}
return
std
::
make_pair
(
nullptr
,
nullptr
);
}
}
// namespace engine
}
// namespace engine
}
// namespace milvus
}
// namespace milvus
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录