Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
b83f7103
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b83f7103
编写于
8月 23, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
差异文件
merge upstream
Former-commit-id: d53588b88aab072c9b11b0b06d05b0cae64ea6d0
上级
338e0320
41af24f1
变更
27
隐藏空白更改
内联
并排
Showing
27 changed file
with
232 addition
and
106 deletion
+232
-106
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+3
-0
cpp/src/db/DB.h
cpp/src/db/DB.h
+1
-0
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+4
-0
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+13
-22
cpp/src/db/Utils.cpp
cpp/src/db/Utils.cpp
+4
-0
cpp/src/db/Utils.h
cpp/src/db/Utils.h
+2
-0
cpp/src/db/meta/Meta.h
cpp/src/db/meta/Meta.h
+3
-0
cpp/src/db/meta/MetaTypes.h
cpp/src/db/meta/MetaTypes.h
+3
-0
cpp/src/db/meta/MySQLMetaImpl.cpp
cpp/src/db/meta/MySQLMetaImpl.cpp
+42
-1
cpp/src/db/meta/MySQLMetaImpl.h
cpp/src/db/meta/MySQLMetaImpl.h
+7
-0
cpp/src/db/meta/SqliteMetaImpl.cpp
cpp/src/db/meta/SqliteMetaImpl.cpp
+40
-11
cpp/src/db/meta/SqliteMetaImpl.h
cpp/src/db/meta/SqliteMetaImpl.h
+30
-48
cpp/src/db/scheduler/task/SearchTask.cpp
cpp/src/db/scheduler/task/SearchTask.cpp
+1
-1
cpp/src/scheduler/ResourceMgr.cpp
cpp/src/scheduler/ResourceMgr.cpp
+11
-0
cpp/src/scheduler/ResourceMgr.h
cpp/src/scheduler/ResourceMgr.h
+6
-0
cpp/src/scheduler/task/DeleteTask.cpp
cpp/src/scheduler/task/DeleteTask.cpp
+7
-2
cpp/src/scheduler/task/DeleteTask.h
cpp/src/scheduler/task/DeleteTask.h
+7
-0
cpp/src/scheduler/task/SearchTask.cpp
cpp/src/scheduler/task/SearchTask.cpp
+3
-0
cpp/src/scheduler/task/Task.h
cpp/src/scheduler/task/Task.h
+1
-0
cpp/src/scheduler/task/TaskConvert.cpp
cpp/src/scheduler/task/TaskConvert.cpp
+3
-2
cpp/src/scheduler/task/TaskConvert.h
cpp/src/scheduler/task/TaskConvert.h
+3
-1
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
+6
-1
cpp/src/server/DBWrapper.cpp
cpp/src/server/DBWrapper.cpp
+0
-6
cpp/src/server/ServerConfig.h
cpp/src/server/ServerConfig.h
+0
-9
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
+30
-2
cpp/unittest/metrics/metricbase_test.cpp
cpp/unittest/metrics/metricbase_test.cpp
+1
-0
cpp/unittest/metrics/prometheus_test.cpp
cpp/unittest/metrics/prometheus_test.cpp
+1
-0
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
b83f7103
...
...
@@ -40,7 +40,10 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-394 - Update scheduler unittest
-
MS-400 - Add timestamp record in task state change function
-
MS-402 - Add dump implementation for TaskTableItem
-
MS-406 - Add table flag for meta
-
MS-403 - Add GpuCacheMgr
-
MS-404 - Release index after search task done avoid memory increment continues
-
MS-405 - Add delete task support
-
MS-407 - Reconstruct MetricsCollector
## New Feature
...
...
cpp/src/db/DB.h
浏览文件 @
b83f7103
...
...
@@ -29,6 +29,7 @@ public:
virtual
Status
AllTables
(
std
::
vector
<
meta
::
TableSchema
>&
table_schema_array
)
=
0
;
virtual
Status
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
=
0
;
virtual
Status
PreloadTable
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
=
0
;
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id_
,
uint64_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids_
)
=
0
;
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
b83f7103
...
...
@@ -129,6 +129,10 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
return
Status
::
OK
();
}
Status
DBImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
return
meta_ptr_
->
UpdateTableFlag
(
table_id
,
flag
);
}
Status
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
return
meta_ptr_
->
Count
(
table_id
,
row_count
);
}
...
...
cpp/src/db/DBImpl.h
浏览文件 @
b83f7103
...
...
@@ -36,40 +36,32 @@ class DBImpl : public DB {
explicit
DBImpl
(
const
Options
&
options
);
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
override
;
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
Status
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
override
;
Status
AllTables
(
std
::
vector
<
meta
::
TableSchema
>
&
table_schema_array
)
override
;
Status
PreloadTable
(
const
std
::
string
&
table_id
)
override
;
Status
PreloadTable
(
const
std
::
string
&
table_id
)
override
;
Status
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
override
;
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
);
Status
InsertVectors
(
const
std
::
string
&
table_id
,
uint64_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
override
;
Status
Query
(
const
std
::
string
&
table_id
,
Status
InsertVectors
(
const
std
::
string
&
table_id
,
uint64_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
QueryResults
&
results
)
override
;
Status
Query
(
const
std
::
string
&
table_id
,
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
...
...
@@ -77,8 +69,7 @@ class DBImpl : public DB {
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
override
;
Status
Query
(
const
std
::
string
&
table_id
,
Status
Query
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
std
::
string
>
&
file_ids
,
uint64_t
k
,
uint64_t
nq
,
...
...
cpp/src/db/Utils.cpp
浏览文件 @
b83f7103
...
...
@@ -153,6 +153,10 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&&
index1
.
metric_type_
==
index2
.
metric_type_
;
}
bool
UserDefinedId
(
int64_t
flag
)
{
return
flag
&
meta
::
FLAG_MASK_USERID
;
}
}
// namespace utils
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/Utils.h
浏览文件 @
b83f7103
...
...
@@ -27,6 +27,8 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool
IsSameIndex
(
const
TableIndex
&
index1
,
const
TableIndex
&
index2
);
bool
UserDefinedId
(
int64_t
flag
);
}
// namespace utils
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/meta/Meta.h
浏览文件 @
b83f7103
...
...
@@ -42,6 +42,9 @@ class Meta {
virtual
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
=
0
;
virtual
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
)
=
0
;
...
...
cpp/src/db/meta/MetaTypes.h
浏览文件 @
b83f7103
...
...
@@ -22,6 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr
int32_t
DEFAULT_INDEX_FILE_SIZE
=
1024
*
ONE_MB
;
constexpr
int32_t
DEFAULT_METRIC_TYPE
=
(
int
)
MetricType
::
L2
;
constexpr
int64_t
FLAG_MASK_USERID
=
1
;
typedef
int
DateT
;
const
DateT
EmptyDate
=
-
1
;
typedef
std
::
vector
<
DateT
>
DatesT
;
...
...
@@ -37,6 +39,7 @@ struct TableSchema {
int32_t
state_
=
(
int
)
NORMAL
;
uint16_t
dimension_
=
0
;
int64_t
created_on_
=
0
;
int64_t
flag_
=
0
;
int32_t
engine_type_
=
DEFAULT_ENGINE_TYPE
;
int32_t
nlist_
=
DEFAULT_NLIST
;
int32_t
index_file_size_
=
DEFAULT_INDEX_FILE_SIZE
;
...
...
cpp/src/db/meta/MySQLMetaImpl.cpp
浏览文件 @
b83f7103
...
...
@@ -137,6 +137,7 @@ Status MySQLMetaImpl::Initialize() {
"state INT NOT NULL, "
<<
"dimension SMALLINT NOT NULL, "
<<
"created_on BIGINT NOT NULL, "
<<
"flag BIGINT DEFAULT 0 NOT NULL, "
<<
"engine_type INT DEFAULT 1 NOT NULL, "
<<
"nlist INT DEFAULT 16384 NOT NULL, "
<<
"index_file_size INT DEFAULT 1024 NOT NULL, "
<<
...
...
@@ -407,7 +408,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"engine_type_ = "
<<
index
.
engine_type_
<<
", "
<<
"nlist = "
<<
index
.
nlist_
<<
", "
<<
"index_file_size = "
<<
index
.
index_file_size_
*
ONE_MB
<<
", "
<<
"metric_type = "
<<
index
.
metric_type_
<<
"
,
"
<<
"metric_type = "
<<
index
.
metric_type_
<<
" "
<<
"WHERE id = "
<<
quote
<<
table_id
<<
";"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::UpdateTableIndexParam: "
<<
updateTableIndexParamQuery
.
str
();
...
...
@@ -437,6 +438,46 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
return
Status
::
OK
();
}
Status
MySQLMetaImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
try
{
server
::
MetricCollector
metric
;
{
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
);
if
(
connectionPtr
==
nullptr
)
{
return
Status
::
Error
(
"Failed to connect to database server"
);
}
Query
updateTableFlagQuery
=
connectionPtr
->
query
();
updateTableFlagQuery
<<
"UPDATE Tables "
<<
"SET flag = "
<<
flag
<<
" "
<<
"WHERE id = "
<<
quote
<<
table_id
<<
";"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::UpdateTableFlag: "
<<
updateTableFlagQuery
.
str
();
if
(
!
updateTableFlagQuery
.
exec
())
{
ENGINE_LOG_ERROR
<<
"QUERY ERROR WHEN UPDATING TABLE FLAG"
;
return
Status
::
DBTransactionError
(
"QUERY ERROR WHEN UPDATING TABLE FLAG"
,
updateTableFlagQuery
.
error
());
}
}
//Scoped Connection
}
catch
(
const
BadQuery
&
er
)
{
// Handle any query errors
ENGINE_LOG_ERROR
<<
"QUERY ERROR WHEN UPDATING TABLE FLAG"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"QUERY ERROR WHEN UPDATING TABLE FLAG"
,
er
.
what
());
}
catch
(
const
Exception
&
er
)
{
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR
<<
"GENERAL ERROR WHEN UPDATING TABLE FLAG"
<<
": "
<<
er
.
what
();
return
Status
::
DBTransactionError
(
"GENERAL ERROR WHEN UPDATING TABLE FLAG"
,
er
.
what
());
}
return
Status
::
OK
();
}
Status
MySQLMetaImpl
::
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
{
try
{
server
::
MetricCollector
metric
;
...
...
cpp/src/db/meta/MySQLMetaImpl.h
浏览文件 @
b83f7103
...
...
@@ -26,14 +26,19 @@ class MySQLMetaImpl : public Meta {
MySQLMetaImpl
(
const
DBMetaOptions
&
options_
,
const
int
&
mode
);
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
Status
DescribeTable
(
TableSchema
&
group_info_
)
override
;
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
Status
AllTables
(
std
::
vector
<
TableSchema
>
&
table_schema_array
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
)
override
;
Status
DeleteTableFiles
(
const
std
::
string
&
table_id
)
override
;
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
override
;
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
override
;
...
...
@@ -45,6 +50,8 @@ class MySQLMetaImpl : public Meta {
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
override
;
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
);
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
override
;
Status
DropTableIndex
(
const
std
::
string
&
table_id
)
override
;
...
...
cpp/src/db/meta/SqliteMetaImpl.cpp
浏览文件 @
b83f7103
...
...
@@ -44,6 +44,7 @@ inline auto StoragePrototype(const std::string &path) {
make_column
(
"state"
,
&
TableSchema
::
state_
),
make_column
(
"dimension"
,
&
TableSchema
::
dimension_
),
make_column
(
"created_on"
,
&
TableSchema
::
created_on_
),
make_column
(
"flag"
,
&
TableSchema
::
flag_
,
default_value
(
0
)),
make_column
(
"engine_type"
,
&
TableSchema
::
engine_type_
),
make_column
(
"nlist"
,
&
TableSchema
::
nlist_
),
make_column
(
"index_file_size"
,
&
TableSchema
::
index_file_size_
),
...
...
@@ -249,6 +250,7 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
&
TableSchema
::
state_
,
&
TableSchema
::
dimension_
,
&
TableSchema
::
created_on_
,
&
TableSchema
::
flag_
,
&
TableSchema
::
engine_type_
,
&
TableSchema
::
nlist_
,
&
TableSchema
::
index_file_size_
,
...
...
@@ -261,10 +263,11 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema
.
state_
=
std
::
get
<
1
>
(
groups
[
0
]);
table_schema
.
dimension_
=
std
::
get
<
2
>
(
groups
[
0
]);
table_schema
.
created_on_
=
std
::
get
<
3
>
(
groups
[
0
]);
table_schema
.
engine_type_
=
std
::
get
<
4
>
(
groups
[
0
]);
table_schema
.
nlist_
=
std
::
get
<
5
>
(
groups
[
0
]);
table_schema
.
index_file_size_
=
std
::
get
<
6
>
(
groups
[
0
]);
table_schema
.
metric_type_
=
std
::
get
<
7
>
(
groups
[
0
]);
table_schema
.
flag_
=
std
::
get
<
4
>
(
groups
[
0
]);
table_schema
.
engine_type_
=
std
::
get
<
5
>
(
groups
[
0
]);
table_schema
.
nlist_
=
std
::
get
<
6
>
(
groups
[
0
]);
table_schema
.
index_file_size_
=
std
::
get
<
7
>
(
groups
[
0
]);
table_schema
.
metric_type_
=
std
::
get
<
8
>
(
groups
[
0
]);
}
else
{
return
Status
::
NotFound
(
"Table "
+
table_schema
.
table_id_
+
" not found"
);
}
...
...
@@ -340,7 +343,8 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
auto
tables
=
ConnectorPtr
->
select
(
columns
(
&
TableSchema
::
id_
,
&
TableSchema
::
state_
,
&
TableSchema
::
dimension_
,
&
TableSchema
::
created_on_
),
&
TableSchema
::
created_on_
,
&
TableSchema
::
flag_
),
where
(
c
(
&
TableSchema
::
table_id_
)
==
table_id
and
c
(
&
TableSchema
::
state_
)
!=
(
int
)
TableSchema
::
TO_DELETE
));
...
...
@@ -351,6 +355,7 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
table_schema
.
state_
=
std
::
get
<
1
>
(
tables
[
0
]);
table_schema
.
dimension_
=
std
::
get
<
2
>
(
tables
[
0
]);
table_schema
.
created_on_
=
std
::
get
<
3
>
(
tables
[
0
]);
table_schema
.
flag_
=
std
::
get
<
4
>
(
tables
[
0
]);
table_schema
.
engine_type_
=
index
.
engine_type_
;
table_schema
.
nlist_
=
index
.
nlist_
;
table_schema
.
index_file_size_
=
index
.
index_file_size_
*
ONE_MB
;
...
...
@@ -376,6 +381,28 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
std
::
string
msg
=
"Encounter exception when update table index: table_id = "
+
table_id
;
return
HandleException
(
msg
,
e
);
}
return
Status
::
OK
();
}
Status
SqliteMetaImpl
::
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
{
try
{
server
::
MetricCollector
metric
;
//set all backup file to raw
ConnectorPtr
->
update_all
(
set
(
c
(
&
TableSchema
::
flag_
)
=
flag
),
where
(
c
(
&
TableSchema
::
table_id_
)
==
table_id
));
}
catch
(
std
::
exception
&
e
)
{
std
::
string
msg
=
"Encounter exception when update table flag: table_id = "
+
table_id
;
return
HandleException
(
msg
,
e
);
}
return
Status
::
OK
();
}
...
...
@@ -471,6 +498,7 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
&
TableSchema
::
table_id_
,
&
TableSchema
::
dimension_
,
&
TableSchema
::
created_on_
,
&
TableSchema
::
flag_
,
&
TableSchema
::
engine_type_
,
&
TableSchema
::
nlist_
,
&
TableSchema
::
index_file_size_
,
...
...
@@ -480,12 +508,13 @@ Status SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
TableSchema
schema
;
schema
.
id_
=
std
::
get
<
0
>
(
table
);
schema
.
table_id_
=
std
::
get
<
1
>
(
table
);
schema
.
created_on_
=
std
::
get
<
2
>
(
table
);
schema
.
dimension_
=
std
::
get
<
3
>
(
table
);
schema
.
engine_type_
=
std
::
get
<
4
>
(
table
);
schema
.
nlist_
=
std
::
get
<
5
>
(
table
);
schema
.
index_file_size_
=
std
::
get
<
6
>
(
table
);
schema
.
metric_type_
=
std
::
get
<
7
>
(
table
);
schema
.
dimension_
=
std
::
get
<
2
>
(
table
);
schema
.
created_on_
=
std
::
get
<
3
>
(
table
);
schema
.
flag_
=
std
::
get
<
4
>
(
table
);
schema
.
engine_type_
=
std
::
get
<
5
>
(
table
);
schema
.
nlist_
=
std
::
get
<
6
>
(
table
);
schema
.
index_file_size_
=
std
::
get
<
7
>
(
table
);
schema
.
metric_type_
=
std
::
get
<
8
>
(
table
);
table_schema_array
.
emplace_back
(
schema
);
}
...
...
cpp/src/db/meta/SqliteMetaImpl.h
浏览文件 @
b83f7103
...
...
@@ -21,82 +21,64 @@ class SqliteMetaImpl : public Meta {
public:
explicit
SqliteMetaImpl
(
const
DBMetaOptions
&
options_
);
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
Status
DescribeTable
(
TableSchema
&
group_info_
)
override
;
Status
DescribeTable
(
TableSchema
&
group_info_
)
override
;
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
Status
AllTables
(
std
::
vector
<
TableSchema
>
&
table_schema_array
)
override
;
Status
AllTables
(
std
::
vector
<
TableSchema
>
&
table_schema_array
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
)
override
;
Status
DeleteTableFiles
(
const
std
::
string
&
table_id
)
override
;
Status
DeleteTableFiles
(
const
std
::
string
&
table_id
)
override
;
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
override
;
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
override
;
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
override
;
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
const
DatesT
&
dates
)
override
;
Status
GetTableFiles
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
override
;
Status
GetTableFiles
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
TableFilesSchema
&
table_files
)
override
;
Status
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
override
;
Status
HasNonIndexFiles
(
const
std
::
string
&
table_id
,
bool
&
has
)
override
;
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
override
;
Status
UpdateTableIndexParam
(
const
std
::
string
&
table_id
,
const
TableIndex
&
index
)
override
;
Status
UpdateTableFlag
(
const
std
::
string
&
table_id
,
int64_t
flag
)
override
;
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
override
;
Status
DescribeTableIndex
(
const
std
::
string
&
table_id
,
TableIndex
&
index
)
override
;
Status
DropTableIndex
(
const
std
::
string
&
table_id
)
override
;
Status
DropTableIndex
(
const
std
::
string
&
table_id
)
override
;
Status
UpdateTableFilesToIndex
(
const
std
::
string
&
table_id
)
override
;
Status
UpdateTableFilesToIndex
(
const
std
::
string
&
table_id
)
override
;
Status
UpdateTableFile
(
TableFileSchema
&
file_schema
)
override
;
Status
UpdateTableFile
(
TableFileSchema
&
file_schema
)
override
;
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
override
;
Status
UpdateTableFiles
(
TableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToSearch
(
const
std
::
string
&
table_id
,
const
std
::
vector
<
size_t
>
&
ids
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
override
;
Status
FilesToIndex
(
TableFilesSchema
&
)
override
;
Status
FilesToIndex
(
TableFilesSchema
&
)
override
;
Status
Archive
()
override
;
Status
Archive
()
override
;
Status
Size
(
uint64_t
&
result
)
override
;
Status
Size
(
uint64_t
&
result
)
override
;
Status
CleanUp
()
override
;
Status
CleanUp
()
override
;
Status
CleanUpFilesWithTTL
(
uint16_t
seconds
)
override
;
Status
CleanUpFilesWithTTL
(
uint16_t
seconds
)
override
;
Status
DropAll
()
override
;
Status
DropAll
()
override
;
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
override
;
...
...
cpp/src/db/scheduler/task/SearchTask.cpp
浏览文件 @
b83f7103
...
...
@@ -16,7 +16,7 @@ namespace engine {
namespace
{
static
constexpr
size_t
PARALLEL_REDUCE_THRESHOLD
=
10000
;
static
constexpr
size_t
PARALLEL_REDUCE_THRESHOLD
=
10000
00
;
static
constexpr
size_t
PARALLEL_REDUCE_BATCH
=
1000
;
bool
NeedParallelReduce
(
uint64_t
nq
,
uint64_t
topk
)
{
...
...
cpp/src/scheduler/ResourceMgr.cpp
浏览文件 @
b83f7103
...
...
@@ -17,6 +17,17 @@ ResourceMgr::ResourceMgr()
}
uint64_t
ResourceMgr
::
GetNumOfComputeResource
()
{
uint64_t
count
=
0
;
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
HasExecutor
())
{
++
count
;
}
}
return
count
;
}
ResourceWPtr
ResourceMgr
::
Add
(
ResourcePtr
&&
resource
)
{
ResourceWPtr
ret
(
resource
);
...
...
cpp/src/scheduler/ResourceMgr.h
浏览文件 @
b83f7103
...
...
@@ -35,6 +35,12 @@ public:
return
disk_resources_
;
}
/*
* Return account of resource which enable executor;
*/
uint64_t
GetNumOfComputeResource
();
/*
* Add resource into Resource Management;
* Generate functions on events;
...
...
cpp/src/scheduler/task/DeleteTask.cpp
浏览文件 @
b83f7103
...
...
@@ -6,10 +6,14 @@
#include "DeleteTask.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
XDeleteTask
::
XDeleteTask
(
DeleteContextPtr
&
delete_context
)
:
delete_context_ptr_
(
delete_context
)
{}
void
XDeleteTask
::
Load
(
LoadType
type
,
uint8_t
device_id
)
{
...
...
@@ -17,12 +21,13 @@ XDeleteTask::Load(LoadType type, uint8_t device_id) {
void
XDeleteTask
::
Execute
()
{
delete_context_ptr_
->
ResourceDone
();
}
TaskPtr
XDeleteTask
::
Clone
()
{
return
nullptr
;
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
delete_context_ptr_
);
return
task
;
}
}
...
...
cpp/src/scheduler/task/DeleteTask.h
浏览文件 @
b83f7103
...
...
@@ -5,6 +5,7 @@
******************************************************************************/
#pragma once
#include <src/db/scheduler/context/DeleteContext.h>
#include "Task.h"
...
...
@@ -14,6 +15,9 @@ namespace engine {
class
XDeleteTask
:
public
Task
{
public:
explicit
XDeleteTask
(
DeleteContextPtr
&
delete_context
);
void
Load
(
LoadType
type
,
uint8_t
device_id
)
override
;
...
...
@@ -22,6 +26,9 @@ public:
TaskPtr
Clone
()
override
;
public:
DeleteContextPtr
delete_context_ptr_
;
};
}
...
...
cpp/src/scheduler/task/SearchTask.cpp
浏览文件 @
b83f7103
...
...
@@ -185,6 +185,9 @@ XSearchTask::Execute() {
}
rc
.
ElapseFromBegin
(
"totally cost"
);
// release index in resource
index_engine_
=
nullptr
;
}
TaskPtr
...
...
cpp/src/scheduler/task/Task.h
浏览文件 @
b83f7103
...
...
@@ -35,6 +35,7 @@ public:
virtual
void
Execute
()
=
0
;
// TODO: dont use this method to support task move
virtual
TaskPtr
Clone
()
=
0
;
...
...
cpp/src/scheduler/task/TaskConvert.cpp
浏览文件 @
b83f7103
...
...
@@ -22,8 +22,9 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
return
task
;
}
case
ScheduleTaskType
::
kDelete
:
{
// TODO: convert to delete task
return
nullptr
;
auto
delete_task
=
std
::
static_pointer_cast
<
DeleteTask
>
(
schedule_task
);
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
delete_task
->
context_
);
return
task
;
}
default:
{
// TODO: unexpected !!!
...
...
cpp/src/scheduler/task/TaskConvert.h
浏览文件 @
b83f7103
...
...
@@ -4,9 +4,11 @@
* Proprietary and confidential.
******************************************************************************/
#include "src/db/scheduler/task/IndexLoadTask.h"
#include "db/scheduler/task/DeleteTask.h"
#include "db/scheduler/task/IndexLoadTask.h"
#include "Task.h"
#include "SearchTask.h"
#include "DeleteTask.h"
namespace
zilliz
{
namespace
milvus
{
...
...
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
浏览文件 @
b83f7103
...
...
@@ -236,7 +236,6 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std
::
vector
<
std
::
pair
<
int64_t
,
RowRecord
>>
search_record_array
;
{
//insert vectors
std
::
vector
<
int64_t
>
record_ids
;
for
(
int
i
=
0
;
i
<
ADD_VECTOR_LOOP
;
i
++
)
{
//add vectors
std
::
vector
<
RowRecord
>
record_array
;
int64_t
begin_index
=
i
*
BATCH_ROW_COUNT
;
...
...
@@ -249,6 +248,12 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
#endif
std
::
vector
<
int64_t
>
record_ids
;
//generate user defined ids
for
(
int
k
=
0
;
k
<
BATCH_ROW_COUNT
;
k
++
)
{
record_ids
.
push_back
(
i
*
BATCH_ROW_COUNT
+
k
);
}
auto
start
=
std
::
chrono
::
high_resolution_clock
::
now
();
Status
stat
=
conn
->
Insert
(
TABLE_NAME
,
record_array
,
record_ids
);
...
...
cpp/src/server/DBWrapper.cpp
浏览文件 @
b83f7103
...
...
@@ -74,12 +74,6 @@ DBWrapper::DBWrapper() {
}
}
std
::
string
metric_type
=
engine_config
.
GetValue
(
CONFIG_METRICTYPE
,
"L2"
);
if
(
metric_type
!=
"L2"
&&
metric_type
!=
"IP"
)
{
std
::
cout
<<
"ERROR! Illegal metric type: "
<<
metric_type
<<
", available options: L2 or IP"
<<
std
::
endl
;
kill
(
0
,
SIGUSR1
);
}
//set archive config
engine
::
ArchiveConf
::
CriteriaT
criterial
;
int64_t
disk
=
db_config
.
GetInt64Value
(
CONFIG_DB_ARCHIVE_DISK
,
0
);
...
...
cpp/src/server/ServerConfig.h
浏览文件 @
b83f7103
...
...
@@ -17,7 +17,6 @@ namespace server {
static
const
char
*
CONFIG_SERVER
=
"server_config"
;
static
const
char
*
CONFIG_SERVER_ADDRESS
=
"address"
;
static
const
char
*
CONFIG_SERVER_PORT
=
"port"
;
static
const
char
*
CONFIG_SERVER_PROTOCOL
=
"transfer_protocol"
;
static
const
char
*
CONFIG_CLUSTER_MODE
=
"mode"
;
static
const
char
*
CONFIG_GPU_INDEX
=
"gpu_index"
;
...
...
@@ -41,9 +40,6 @@ static const char* CONFIG_INSERT_CACHE_IMMEDIATELY = "insert_cache_immediately";
static
const
char
*
CONFIG_GPU_IDS
=
"gpu_ids"
;
static
const
char
*
GPU_CACHE_FREE_PERCENT
=
"gpu_cache_free_percent"
;
static
const
char
*
CONFIG_LICENSE
=
"license_config"
;
static
const
char
*
CONFIG_LICENSE_PATH
=
"license_path"
;
static
const
char
*
CONFIG_METRIC
=
"metric_config"
;
static
const
char
*
CONFIG_METRIC_IS_STARTUP
=
"is_startup"
;
static
const
char
*
CONFIG_METRIC_COLLECTOR
=
"collector"
;
...
...
@@ -51,13 +47,8 @@ static const char* CONFIG_PROMETHEUS = "prometheus_config";
static
const
char
*
CONFIG_METRIC_PROMETHEUS_PORT
=
"port"
;
static
const
std
::
string
CONFIG_ENGINE
=
"engine_config"
;
static
const
std
::
string
CONFIG_NPROBE
=
"nprobe"
;
static
const
std
::
string
CONFIG_NLIST
=
"nlist"
;
static
const
std
::
string
CONFIG_DCBT
=
"use_blas_threshold"
;
static
const
std
::
string
CONFIG_METRICTYPE
=
"metric_type"
;
static
const
std
::
string
CONFIG_OMP_THREAD_NUM
=
"omp_thread_num"
;
static
const
std
::
string
CONFIG_USE_HYBRID_INDEX
=
"use_hybrid_index"
;
static
const
std
::
string
CONFIG_HYBRID_INDEX_GPU
=
"hybrid_index_gpu"
;
class
ServerConfig
{
public:
...
...
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
浏览文件 @
b83f7103
...
...
@@ -12,9 +12,12 @@
#include "../DBWrapper.h"
#include "version.h"
#include "GrpcMilvusServer.h"
#include "db/Utils.h"
#include "src/server/Server.h"
#include <string.h>
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
...
...
@@ -435,6 +438,23 @@ InsertTask::OnExecute() {
}
}
//all user provide id, or all internal id
uint64_t
row_count
=
0
;
DBWrapper
::
DB
()
->
GetTableRowCount
(
table_info
.
table_id_
,
row_count
);
bool
empty_table
=
(
row_count
==
0
);
bool
user_provide_ids
=
!
insert_param_
.
row_id_array
().
empty
();
if
(
!
empty_table
)
{
//user already provided id before, all insert action require user id
if
(
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
!
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are user defined, please provide id for this batch"
);
}
//user didn't provided id before, no need to provide user id
if
(
!
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are auto generated, no need to provide id for this batch"
);
}
}
rc
.
RecordSection
(
"check validation"
);
#ifdef MILVUS_ENABLE_PROFILING
...
...
@@ -469,8 +489,10 @@ InsertTask::OnExecute() {
//step 4: insert vectors
auto
vec_count
=
(
uint64_t
)
insert_param_
.
row_record_array_size
();
std
::
vector
<
int64_t
>
vec_ids
(
insert_param_
.
row_id_array_size
(),
0
);
for
(
auto
i
=
0
;
i
<
insert_param_
.
row_id_array_size
();
i
++
)
{
vec_ids
[
i
]
=
insert_param_
.
row_id_array
(
i
);
if
(
!
insert_param_
.
row_id_array
().
empty
())
{
const
int64_t
*
src_data
=
insert_param_
.
row_id_array
().
data
();
int64_t
*
target_data
=
vec_ids
.
data
();
memcpy
(
target_data
,
src_data
,
(
size_t
)(
sizeof
(
int64_t
)
*
insert_param_
.
row_id_array_size
()));
}
stat
=
DBWrapper
::
DB
()
->
InsertVectors
(
insert_param_
.
table_name
(),
vec_count
,
vec_f
.
data
(),
vec_ids
);
...
...
@@ -489,6 +511,12 @@ InsertTask::OnExecute() {
return
SetError
(
SERVER_ILLEGAL_VECTOR_ID
,
msg
);
}
//step 5: update table flag
if
(
empty_table
&&
user_provide_ids
)
{
stat
=
DBWrapper
::
DB
()
->
UpdateTableFlag
(
insert_param_
.
table_name
(),
table_info
.
flag_
|
engine
::
meta
::
FLAG_MASK_USERID
);
}
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop
();
#endif
...
...
cpp/unittest/metrics/metricbase_test.cpp
浏览文件 @
b83f7103
...
...
@@ -22,6 +22,7 @@ TEST(MetricbaseTest, METRICBASE_TEST){
instance
.
IndexFileSizeHistogramObserve
(
1.0
);
instance
.
BuildIndexDurationSecondsHistogramObserve
(
1.0
);
instance
.
CpuCacheUsageGaugeSet
(
1.0
);
instance
.
GpuCacheUsageGaugeSet
(
1.0
);
instance
.
MetaAccessTotalIncrement
();
instance
.
MetaAccessDurationSecondsHistogramObserve
(
1.0
);
instance
.
FaissDiskLoadDurationSecondsHistogramObserve
(
1.0
);
...
...
cpp/unittest/metrics/prometheus_test.cpp
浏览文件 @
b83f7103
...
...
@@ -23,6 +23,7 @@ TEST(PrometheusTest, PROMETHEUS_TEST){
instance
.
IndexFileSizeHistogramObserve
(
1.0
);
instance
.
BuildIndexDurationSecondsHistogramObserve
(
1.0
);
instance
.
CpuCacheUsageGaugeSet
(
1.0
);
instance
.
GpuCacheUsageGaugeSet
(
1.0
);
instance
.
MetaAccessTotalIncrement
();
instance
.
MetaAccessDurationSecondsHistogramObserve
(
1.0
);
instance
.
FaissDiskLoadDurationSecondsHistogramObserve
(
1.0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录