Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
65419a15
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
65419a15
编写于
7月 08, 2019
作者:
Z
zhiru
提交者:
jinhai
7月 14, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update
Former-commit-id: 966c56781878a740a8b31f7aeea99ed4dd562de7
上级
48626e33
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
286 addition
and
277 deletion
+286
-277
cpp/src/db/Constants.h
cpp/src/db/Constants.h
+3
-3
cpp/src/db/Factories.cpp
cpp/src/db/Factories.cpp
+10
-11
cpp/src/db/Factories.h
cpp/src/db/Factories.h
+5
-4
cpp/src/db/MemManager.cpp
cpp/src/db/MemManager.cpp
+44
-39
cpp/src/db/MemManager.h
cpp/src/db/MemManager.h
+22
-21
cpp/src/db/MemManagerAbstract.h
cpp/src/db/MemManagerAbstract.h
+6
-5
cpp/src/db/MemTable.cpp
cpp/src/db/MemTable.cpp
+35
-31
cpp/src/db/MemTable.h
cpp/src/db/MemTable.h
+7
-6
cpp/src/db/MemTableFile.cpp
cpp/src/db/MemTableFile.cpp
+28
-28
cpp/src/db/MemTableFile.h
cpp/src/db/MemTableFile.h
+5
-4
cpp/src/db/NewMemManager.cpp
cpp/src/db/NewMemManager.cpp
+30
-33
cpp/src/db/NewMemManager.h
cpp/src/db/NewMemManager.h
+12
-11
cpp/src/db/VectorSource.cpp
cpp/src/db/VectorSource.cpp
+9
-9
cpp/src/db/VectorSource.h
cpp/src/db/VectorSource.h
+10
-9
cpp/unittest/db/mem_test.cpp
cpp/unittest/db/mem_test.cpp
+50
-53
cpp/unittest/db/utils.h
cpp/unittest/db/utils.h
+10
-10
未找到文件。
cpp/src/db/Constants.h
浏览文件 @
65419a15
...
...
@@ -10,9 +10,9 @@ namespace milvus {
namespace
engine
{
const
size_t
K
=
1024UL
;
const
size_t
M
=
K
*
K
;
const
size_t
G
=
K
*
M
;
const
size_t
T
=
K
*
G
;
const
size_t
M
=
K
*
K
;
const
size_t
G
=
K
*
M
;
const
size_t
T
=
K
*
G
;
const
size_t
MAX_TABLE_FILE_MEM
=
128
*
M
;
...
...
cpp/src/db/Factories.cpp
浏览文件 @
65419a15
...
...
@@ -22,6 +22,8 @@ namespace zilliz {
namespace
milvus
{
namespace
engine
{
#define USE_NEW_MEM_MANAGER 1
DBMetaOptions
DBMetaOptionsFactory
::
Build
(
const
std
::
string
&
path
)
{
auto
p
=
path
;
if
(
p
==
""
)
{
...
...
@@ -74,17 +76,14 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
if
(
dialect
.
find
(
"mysql"
)
!=
std
::
string
::
npos
)
{
ENGINE_LOG_INFO
<<
"Using MySQL"
;
return
std
::
make_shared
<
meta
::
MySQLMetaImpl
>
(
meta
::
MySQLMetaImpl
(
metaOptions
,
mode
));
}
else
if
(
dialect
.
find
(
"sqlite"
)
!=
std
::
string
::
npos
)
{
ENGINE_LOG_DEBUG
<<
"Using SQLite"
;
}
else
if
(
dialect
.
find
(
"sqlite"
)
!=
std
::
string
::
npos
)
{
ENGINE_LOG_INFO
<<
"Using SQLite"
;
return
std
::
make_shared
<
meta
::
DBMetaImpl
>
(
meta
::
DBMetaImpl
(
metaOptions
));
}
else
{
}
else
{
ENGINE_LOG_ERROR
<<
"Invalid dialect in URI: dialect = "
<<
dialect
;
throw
InvalidArgumentException
(
"URI dialect is not mysql / sqlite"
);
}
}
else
{
}
else
{
ENGINE_LOG_ERROR
<<
"Wrong URI format: URI = "
<<
uri
;
throw
InvalidArgumentException
(
"Wrong URI format "
);
}
...
...
@@ -102,11 +101,11 @@ DB* DBFactory::Build(const Options& options) {
MemManagerAbstractPtr
MemManagerFactory
::
Build
(
const
std
::
shared_ptr
<
meta
::
Meta
>&
meta
,
const
Options
&
options
)
{
bool
useNew
=
true
;
if
(
useNew
)
{
return
std
::
make_shared
<
NewMemManager
>
(
meta
,
options
);
}
#ifdef USE_NEW_MEM_MANAGER
return
std
::
make_shared
<
NewMemManager
>
(
meta
,
options
);
#else
return
std
::
make_shared
<
MemManager
>
(
meta
,
options
);
#endif
}
}
// namespace engine
...
...
cpp/src/db/Factories.h
浏览文件 @
65419a15
...
...
@@ -15,12 +15,13 @@
#include <string>
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
struct
DBMetaOptionsFactory
{
static
DBMetaOptions
Build
(
const
std
::
string
&
path
=
""
);
static
DBMetaOptions
Build
(
const
std
::
string
&
path
=
""
);
};
struct
OptionsFactory
{
...
...
@@ -29,16 +30,16 @@ struct OptionsFactory {
struct
DBMetaImplFactory
{
static
std
::
shared_ptr
<
meta
::
DBMetaImpl
>
Build
();
static
std
::
shared_ptr
<
meta
::
Meta
>
Build
(
const
DBMetaOptions
&
metaOptions
,
const
int
&
mode
);
static
std
::
shared_ptr
<
meta
::
Meta
>
Build
(
const
DBMetaOptions
&
metaOptions
,
const
int
&
mode
);
};
struct
DBFactory
{
static
std
::
shared_ptr
<
DB
>
Build
();
static
DB
*
Build
(
const
Options
&
);
static
DB
*
Build
(
const
Options
&
);
};
struct
MemManagerFactory
{
static
MemManagerAbstractPtr
Build
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
);
static
MemManagerAbstractPtr
Build
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
);
};
}
// namespace engine
...
...
cpp/src/db/MemManager.cpp
浏览文件 @
65419a15
...
...
@@ -15,22 +15,23 @@
#include <thread>
#include <easylogging++.h>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
MemVectors
::
MemVectors
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta_ptr
,
const
meta
::
TableFileSchema
&
schema
,
const
Options
&
options
)
:
meta_
(
meta_ptr
),
options_
(
options
),
schema_
(
schema
),
id_generator_
(
new
SimpleIDGenerator
()),
active_engine_
(
EngineFactory
::
Build
(
schema_
.
dimension_
,
schema_
.
location_
,
(
EngineType
)
schema_
.
engine_type_
))
{
MemVectors
::
MemVectors
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta_ptr
,
const
meta
::
TableFileSchema
&
schema
,
const
Options
&
options
)
:
meta_
(
meta_ptr
),
options_
(
options
),
schema_
(
schema
),
id_generator_
(
new
SimpleIDGenerator
()),
active_engine_
(
EngineFactory
::
Build
(
schema_
.
dimension_
,
schema_
.
location_
,
(
EngineType
)
schema_
.
engine_type_
))
{
}
Status
MemVectors
::
Add
(
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
if
(
active_engine_
==
nullptr
)
{
Status
MemVectors
::
Add
(
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
if
(
active_engine_
==
nullptr
)
{
return
Status
::
Error
(
"index engine is null"
);
}
...
...
@@ -39,13 +40,15 @@ Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_)
Status
status
=
active_engine_
->
AddWithIds
(
n_
,
vectors_
,
vector_ids_
.
data
());
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
server
::
Metrics
::
GetInstance
().
AddVectorsPerSecondGaugeSet
(
static_cast
<
int
>
(
n_
),
static_cast
<
int
>
(
schema_
.
dimension_
),
total_time
);
server
::
Metrics
::
GetInstance
().
AddVectorsPerSecondGaugeSet
(
static_cast
<
int
>
(
n_
),
static_cast
<
int
>
(
schema_
.
dimension_
),
total_time
);
return
status
;
}
size_t
MemVectors
::
RowCount
()
const
{
if
(
active_engine_
==
nullptr
)
{
if
(
active_engine_
==
nullptr
)
{
return
0
;
}
...
...
@@ -53,15 +56,15 @@ size_t MemVectors::RowCount() const {
}
size_t
MemVectors
::
Size
()
const
{
if
(
active_engine_
==
nullptr
)
{
if
(
active_engine_
==
nullptr
)
{
return
0
;
}
return
active_engine_
->
Size
();
}
Status
MemVectors
::
Serialize
(
std
::
string
&
table_id
)
{
if
(
active_engine_
==
nullptr
)
{
Status
MemVectors
::
Serialize
(
std
::
string
&
table_id
)
{
if
(
active_engine_
==
nullptr
)
{
return
Status
::
Error
(
"index engine is null"
);
}
...
...
@@ -73,15 +76,16 @@ Status MemVectors::Serialize(std::string& table_id) {
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
schema_
.
size_
=
size
;
server
::
Metrics
::
GetInstance
().
DiskStoreIOSpeedGaugeSet
(
size
/
total_time
);
server
::
Metrics
::
GetInstance
().
DiskStoreIOSpeedGaugeSet
(
size
/
total_time
);
schema_
.
file_type_
=
(
size
>=
options_
.
index_trigger_size
)
?
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
auto
status
=
meta_
->
UpdateTableFile
(
schema_
);
LOG
(
DEBUG
)
<<
"New "
<<
((
schema_
.
file_type_
==
meta
::
TableFileSchema
::
RAW
)
?
"raw"
:
"to_index"
)
<<
" file "
<<
schema_
.
file_id_
<<
" of size "
<<
(
double
)(
active_engine_
->
Size
())
/
(
double
)
meta
::
M
<<
" M"
;
<<
" file "
<<
schema_
.
file_id_
<<
" of size "
<<
(
double
)
(
active_engine_
->
Size
())
/
(
double
)
meta
::
M
<<
" M"
;
active_engine_
->
Cache
();
...
...
@@ -99,7 +103,7 @@ MemVectors::~MemVectors() {
* MemManager
*/
MemManager
::
MemVectorsPtr
MemManager
::
GetMemByTable
(
const
std
::
string
&
table_id
)
{
const
std
::
string
&
table_id
)
{
auto
memIt
=
mem_id_map_
.
find
(
table_id
);
if
(
memIt
!=
mem_id_map_
.
end
())
{
return
memIt
->
second
;
...
...
@@ -116,22 +120,23 @@ MemManager::MemVectorsPtr MemManager::GetMemByTable(
return
mem_id_map_
[
table_id
];
}
Status
MemManager
::
InsertVectors
(
const
std
::
string
&
table_id_
,
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
Status
MemManager
::
InsertVectors
(
const
std
::
string
&
table_id_
,
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
LOG
(
DEBUG
)
<<
"MemManager::InsertVectors: mutable mem = "
<<
GetCurrentMutableMem
()
<<
", immutable mem = "
<<
GetCurrentImmutableMem
()
<<
", total mem = "
<<
GetCurrentMem
();
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
return
InsertVectorsNoLock
(
table_id_
,
n_
,
vectors_
,
vector_ids_
);
}
Status
MemManager
::
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
{
LOG
(
DEBUG
)
<<
"MemManager::InsertVectorsNoLock: mutable mem = "
<<
GetCurrentMutableMem
()
<<
", immutable mem = "
<<
GetCurrentImmutableMem
()
<<
", total mem = "
<<
GetCurrentMem
();
Status
MemManager
::
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
{
MemVectorsPtr
mem
=
GetMemByTable
(
table_id
);
if
(
mem
==
nullptr
)
{
...
...
@@ -139,7 +144,7 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
}
//makesure each file size less than index_trigger_size
if
(
mem
->
Size
()
>
options_
.
index_trigger_size
)
{
if
(
mem
->
Size
()
>
options_
.
index_trigger_size
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
serialization_mtx_
);
immu_mem_list_
.
push_back
(
mem
);
mem_id_map_
.
erase
(
table_id
);
...
...
@@ -152,8 +157,8 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
Status
MemManager
::
ToImmutable
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
MemIdMap
temp_map
;
for
(
auto
&
kv
:
mem_id_map_
)
{
if
(
kv
.
second
->
RowCount
()
==
0
)
{
for
(
auto
&
kv
:
mem_id_map_
)
{
if
(
kv
.
second
->
RowCount
()
==
0
)
{
temp_map
.
insert
(
kv
);
continue
;
//empty vector, no need to serialize
}
...
...
@@ -164,12 +169,12 @@ Status MemManager::ToImmutable() {
return
Status
::
OK
();
}
Status
MemManager
::
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
{
Status
MemManager
::
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
{
ToImmutable
();
std
::
unique_lock
<
std
::
mutex
>
lock
(
serialization_mtx_
);
std
::
string
table_id
;
table_ids
.
clear
();
for
(
auto
&
mem
:
immu_mem_list_
)
{
for
(
auto
&
mem
:
immu_mem_list_
)
{
mem
->
Serialize
(
table_id
);
table_ids
.
insert
(
table_id
);
}
...
...
@@ -177,7 +182,7 @@ Status MemManager::Serialize(std::set<std::string>& table_ids) {
return
Status
::
OK
();
}
Status
MemManager
::
EraseMemVector
(
const
std
::
string
&
table_id
)
{
Status
MemManager
::
EraseMemVector
(
const
std
::
string
&
table_id
)
{
{
//erase MemVector from rapid-insert cache
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
mem_id_map_
.
erase
(
table_id
);
...
...
@@ -186,8 +191,8 @@ Status MemManager::EraseMemVector(const std::string& table_id) {
{
//erase MemVector from serialize cache
std
::
unique_lock
<
std
::
mutex
>
lock
(
serialization_mtx_
);
MemList
temp_list
;
for
(
auto
&
mem
:
immu_mem_list_
)
{
if
(
mem
->
TableId
()
!=
table_id
)
{
for
(
auto
&
mem
:
immu_mem_list_
)
{
if
(
mem
->
TableId
()
!=
table_id
)
{
temp_list
.
push_back
(
mem
);
}
}
...
...
@@ -199,7 +204,7 @@ Status MemManager::EraseMemVector(const std::string& table_id) {
size_t
MemManager
::
GetCurrentMutableMem
()
{
size_t
totalMem
=
0
;
for
(
auto
&
kv
:
mem_id_map_
)
{
for
(
auto
&
kv
:
mem_id_map_
)
{
auto
memVector
=
kv
.
second
;
totalMem
+=
memVector
->
Size
();
}
...
...
@@ -208,7 +213,7 @@ size_t MemManager::GetCurrentMutableMem() {
size_t
MemManager
::
GetCurrentImmutableMem
()
{
size_t
totalMem
=
0
;
for
(
auto
&
memVector
:
immu_mem_list_
)
{
for
(
auto
&
memVector
:
immu_mem_list_
)
{
totalMem
+=
memVector
->
Size
();
}
return
totalMem
;
...
...
cpp/src/db/MemManager.h
浏览文件 @
65419a15
...
...
@@ -17,45 +17,46 @@
#include <memory>
#include <mutex>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
namespace
meta
{
class
Meta
;
class
Meta
;
}
class
MemVectors
{
public:
public:
using
MetaPtr
=
meta
::
Meta
::
Ptr
;
using
Ptr
=
std
::
shared_ptr
<
MemVectors
>
;
explicit
MemVectors
(
const
std
::
shared_ptr
<
meta
::
Meta
>&
,
const
meta
::
TableFileSchema
&
,
const
Options
&
);
explicit
MemVectors
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
,
const
meta
::
TableFileSchema
&
,
const
Options
&
);
Status
Add
(
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
);
Status
Add
(
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
);
size_t
RowCount
()
const
;
size_t
Size
()
const
;
Status
Serialize
(
std
::
string
&
table_id
);
Status
Serialize
(
std
::
string
&
table_id
);
~
MemVectors
();
const
std
::
string
&
Location
()
const
{
return
schema_
.
location_
;
}
const
std
::
string
&
Location
()
const
{
return
schema_
.
location_
;
}
std
::
string
TableId
()
const
{
return
schema_
.
table_id_
;
}
private:
private:
MemVectors
()
=
delete
;
MemVectors
(
const
MemVectors
&
)
=
delete
;
MemVectors
&
operator
=
(
const
MemVectors
&
)
=
delete
;
MemVectors
(
const
MemVectors
&
)
=
delete
;
MemVectors
&
operator
=
(
const
MemVectors
&
)
=
delete
;
MetaPtr
meta_
;
Options
options_
;
meta
::
TableFileSchema
schema_
;
IDGenerator
*
id_generator_
;
IDGenerator
*
id_generator_
;
ExecutionEnginePtr
active_engine_
;
};
// MemVectors
...
...
@@ -63,20 +64,20 @@ private:
class
MemManager
:
public
MemManagerAbstract
{
public:
public:
using
MetaPtr
=
meta
::
Meta
::
Ptr
;
using
MemVectorsPtr
=
typename
MemVectors
::
Ptr
;
using
Ptr
=
std
::
shared_ptr
<
MemManager
>
;
MemManager
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
MemManager
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
meta_
(
meta
),
options_
(
options
)
{}
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
override
;
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
override
;
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
override
;
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
override
;
size_t
GetCurrentMutableMem
()
override
;
...
...
@@ -84,11 +85,11 @@ public:
size_t
GetCurrentMem
()
override
;
private:
MemVectorsPtr
GetMemByTable
(
const
std
::
string
&
table_id
);
private:
MemVectorsPtr
GetMemByTable
(
const
std
::
string
&
table_id
);
Status
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
);
Status
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
);
Status
ToImmutable
();
using
MemIdMap
=
std
::
map
<
std
::
string
,
MemVectorsPtr
>
;
...
...
cpp/src/db/MemManagerAbstract.h
浏览文件 @
65419a15
...
...
@@ -2,19 +2,20 @@
#include <set>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
MemManagerAbstract
{
public:
public:
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
=
0
;
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
=
0
;
virtual
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
=
0
;
virtual
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
=
0
;
virtual
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
=
0
;
virtual
size_t
GetCurrentMutableMem
()
=
0
;
...
...
cpp/src/db/MemTable.cpp
浏览文件 @
65419a15
#include "MemTable.h"
#include "Log.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
MemTable
::
MemTable
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
table_id_
(
table_id
),
meta_
(
meta
),
options_
(
options
)
{
MemTable
::
MemTable
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
table_id_
(
table_id
),
meta_
(
meta
),
options_
(
options
)
{
}
Status
MemTable
::
Add
(
VectorSource
::
Ptr
&
source
)
{
Status
MemTable
::
Add
(
VectorSource
::
Ptr
&
source
)
{
while
(
!
source
->
AllAdded
())
{
MemTableFile
::
Ptr
currentMemTableFile
;
MemTableFile
::
Ptr
current_mem_table_file
;
if
(
!
mem_table_file_list_
.
empty
())
{
current
MemTableF
ile
=
mem_table_file_list_
.
back
();
current
_mem_table_f
ile
=
mem_table_file_list_
.
back
();
}
Status
status
;
if
(
mem_table_file_list_
.
empty
()
||
current
MemTableF
ile
->
IsFull
())
{
MemTableFile
::
Ptr
new
MemTableF
ile
=
std
::
make_shared
<
MemTableFile
>
(
table_id_
,
meta_
,
options_
);
status
=
new
MemTableF
ile
->
Add
(
source
);
if
(
mem_table_file_list_
.
empty
()
||
current
_mem_table_f
ile
->
IsFull
())
{
MemTableFile
::
Ptr
new
_mem_table_f
ile
=
std
::
make_shared
<
MemTableFile
>
(
table_id_
,
meta_
,
options_
);
status
=
new
_mem_table_f
ile
->
Add
(
source
);
if
(
status
.
ok
())
{
mem_table_file_list_
.
emplace_back
(
new
MemTableF
ile
);
mem_table_file_list_
.
emplace_back
(
new
_mem_table_f
ile
);
}
}
else
{
status
=
current_mem_table_file
->
Add
(
source
);
}
else
{
status
=
currentMemTableFile
->
Add
(
source
);
}
if
(
!
status
.
ok
())
{
std
::
string
err
M
sg
=
"MemTable::Add failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
err
M
sg
;
return
Status
::
Error
(
err
M
sg
);
std
::
string
err
_m
sg
=
"MemTable::Add failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
err
_m
sg
;
return
Status
::
Error
(
err
_m
sg
);
}
}
return
Status
::
OK
();
}
void
MemTable
::
GetCurrentMemTableFile
(
MemTableFile
::
Ptr
&
mem_table_file
)
{
void
MemTable
::
GetCurrentMemTableFile
(
MemTableFile
::
Ptr
&
mem_table_file
)
{
mem_table_file
=
mem_table_file_list_
.
back
();
}
...
...
@@ -49,15 +53,15 @@ size_t MemTable::GetTableFileCount() {
}
Status
MemTable
::
Serialize
()
{
for
(
auto
mem
TableFile
=
mem_table_file_list_
.
begin
();
memTableFile
!=
mem_table_file_list_
.
end
();
)
{
auto
status
=
(
*
mem
TableF
ile
)
->
Serialize
();
for
(
auto
mem
_table_file
=
mem_table_file_list_
.
begin
();
mem_table_file
!=
mem_table_file_list_
.
end
();
)
{
auto
status
=
(
*
mem
_table_f
ile
)
->
Serialize
();
if
(
!
status
.
ok
())
{
std
::
string
err
M
sg
=
"MemTable::Serialize failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
err
M
sg
;
return
Status
::
Error
(
err
M
sg
);
std
::
string
err
_m
sg
=
"MemTable::Serialize failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
err
_m
sg
;
return
Status
::
Error
(
err
_m
sg
);
}
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
mem
TableFile
=
mem_table_file_list_
.
erase
(
memTableF
ile
);
mem
_table_file
=
mem_table_file_list_
.
erase
(
mem_table_f
ile
);
}
return
Status
::
OK
();
}
...
...
@@ -66,17 +70,17 @@ bool MemTable::Empty() {
return
mem_table_file_list_
.
empty
();
}
const
std
::
string
&
MemTable
::
GetTableId
()
const
{
const
std
::
string
&
MemTable
::
GetTableId
()
const
{
return
table_id_
;
}
size_t
MemTable
::
GetCurrentMem
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
size_t
total
M
em
=
0
;
for
(
auto
&
memTableF
ile
:
mem_table_file_list_
)
{
total
Mem
+=
memTableF
ile
->
GetCurrentMem
();
size_t
total
_m
em
=
0
;
for
(
auto
&
mem_table_f
ile
:
mem_table_file_list_
)
{
total
_mem
+=
mem_table_f
ile
->
GetCurrentMem
();
}
return
total
M
em
;
return
total
_m
em
;
}
}
// namespace engine
...
...
cpp/src/db/MemTable.h
浏览文件 @
65419a15
...
...
@@ -6,23 +6,24 @@
#include <mutex>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
MemTable
{
public:
public:
using
Ptr
=
std
::
shared_ptr
<
MemTable
>
;
using
MemTableFileList
=
std
::
vector
<
MemTableFile
::
Ptr
>
;
using
MetaPtr
=
meta
::
Meta
::
Ptr
;
MemTable
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>&
meta
,
const
Options
&
options
);
MemTable
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
);
Status
Add
(
VectorSource
::
Ptr
&
source
);
Status
Add
(
VectorSource
::
Ptr
&
source
);
void
GetCurrentMemTableFile
(
MemTableFile
::
Ptr
&
mem_table_file
);
void
GetCurrentMemTableFile
(
MemTableFile
::
Ptr
&
mem_table_file
);
size_t
GetTableFileCount
();
...
...
@@ -30,11 +31,11 @@ public:
bool
Empty
();
const
std
::
string
&
GetTableId
()
const
;
const
std
::
string
&
GetTableId
()
const
;
size_t
GetCurrentMem
();
private:
private:
const
std
::
string
table_id_
;
MemTableFileList
mem_table_file_list_
;
...
...
cpp/src/db/MemTableFile.cpp
浏览文件 @
65419a15
...
...
@@ -6,23 +6,24 @@
#include <cmath>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
MemTableFile
::
MemTableFile
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
table_id_
(
table_id
),
meta_
(
meta
),
options_
(
options
)
{
MemTableFile
::
MemTableFile
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
table_id_
(
table_id
),
meta_
(
meta
),
options_
(
options
)
{
current_mem_
=
0
;
auto
status
=
CreateTableFile
();
if
(
status
.
ok
())
{
execution_engine_
=
EngineFactory
::
Build
(
table_file_schema_
.
dimension_
,
table_file_schema_
.
location_
,
(
EngineType
)
table_file_schema_
.
engine_type_
);
(
EngineType
)
table_file_schema_
.
engine_type_
);
}
}
...
...
@@ -33,31 +34,30 @@ Status MemTableFile::CreateTableFile() {
auto
status
=
meta_
->
CreateTableFile
(
table_file_schema
);
if
(
status
.
ok
())
{
table_file_schema_
=
table_file_schema
;
}
else
{
std
::
string
errMsg
=
"MemTableFile::CreateTableFile failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
errMsg
;
}
else
{
std
::
string
err_msg
=
"MemTableFile::CreateTableFile failed: "
+
status
.
ToString
();
ENGINE_LOG_ERROR
<<
err_msg
;
}
return
status
;
}
Status
MemTableFile
::
Add
(
const
VectorSource
::
Ptr
&
source
)
{
Status
MemTableFile
::
Add
(
const
VectorSource
::
Ptr
&
source
)
{
if
(
table_file_schema_
.
dimension_
<=
0
)
{
std
::
string
err
M
sg
=
"MemTableFile::Add: table_file_schema dimension = "
+
std
::
to_string
(
table_file_schema_
.
dimension_
)
+
", table_id = "
+
table_file_schema_
.
table_id_
;
ENGINE_LOG_ERROR
<<
err
M
sg
;
return
Status
::
Error
(
err
M
sg
);
std
::
string
err
_m
sg
=
"MemTableFile::Add: table_file_schema dimension = "
+
std
::
to_string
(
table_file_schema_
.
dimension_
)
+
", table_id = "
+
table_file_schema_
.
table_id_
;
ENGINE_LOG_ERROR
<<
err
_m
sg
;
return
Status
::
Error
(
err
_m
sg
);
}
size_t
single
VectorMemS
ize
=
table_file_schema_
.
dimension_
*
VECTOR_TYPE_SIZE
;
size_t
mem
L
eft
=
GetMemLeft
();
if
(
mem
Left
>=
singleVectorMemS
ize
)
{
size_t
num
VectorsToAdd
=
std
::
ceil
(
memLeft
/
singleVectorMemS
ize
);
size_t
num
VectorsA
dded
;
auto
status
=
source
->
Add
(
execution_engine_
,
table_file_schema_
,
num
VectorsToAdd
,
numVectorsA
dded
);
size_t
single
_vector_mem_s
ize
=
table_file_schema_
.
dimension_
*
VECTOR_TYPE_SIZE
;
size_t
mem
_l
eft
=
GetMemLeft
();
if
(
mem
_left
>=
single_vector_mem_s
ize
)
{
size_t
num
_vectors_to_add
=
std
::
ceil
(
mem_left
/
single_vector_mem_s
ize
);
size_t
num
_vectors_a
dded
;
auto
status
=
source
->
Add
(
execution_engine_
,
table_file_schema_
,
num
_vectors_to_add
,
num_vectors_a
dded
);
if
(
status
.
ok
())
{
current_mem_
+=
(
num
VectorsAdded
*
singleVectorMemS
ize
);
current_mem_
+=
(
num
_vectors_added
*
single_vector_mem_s
ize
);
}
return
status
;
}
...
...
@@ -73,8 +73,8 @@ size_t MemTableFile::GetMemLeft() {
}
bool
MemTableFile
::
IsFull
()
{
size_t
single
VectorMemS
ize
=
table_file_schema_
.
dimension_
*
VECTOR_TYPE_SIZE
;
return
(
GetMemLeft
()
<
single
VectorMemS
ize
);
size_t
single
_vector_mem_s
ize
=
table_file_schema_
.
dimension_
*
VECTOR_TYPE_SIZE
;
return
(
GetMemLeft
()
<
single
_vector_mem_s
ize
);
}
Status
MemTableFile
::
Serialize
()
{
...
...
@@ -88,15 +88,15 @@ Status MemTableFile::Serialize() {
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
table_file_schema_
.
size_
=
size
;
server
::
Metrics
::
GetInstance
().
DiskStoreIOSpeedGaugeSet
((
double
)
size
/
total_time
);
server
::
Metrics
::
GetInstance
().
DiskStoreIOSpeedGaugeSet
((
double
)
size
/
total_time
);
table_file_schema_
.
file_type_
=
(
size
>=
options_
.
index_trigger_size
)
?
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
meta
::
TableFileSchema
::
TO_INDEX
:
meta
::
TableFileSchema
::
RAW
;
auto
status
=
meta_
->
UpdateTableFile
(
table_file_schema_
);
LOG
(
DEBUG
)
<<
"New "
<<
((
table_file_schema_
.
file_type_
==
meta
::
TableFileSchema
::
RAW
)
?
"raw"
:
"to_index"
)
<<
" file "
<<
table_file_schema_
.
file_id_
<<
" of size "
<<
(
double
)
size
/
(
double
)
M
<<
" M"
;
<<
" file "
<<
table_file_schema_
.
file_id_
<<
" of size "
<<
(
double
)
size
/
(
double
)
M
<<
" M"
;
execution_engine_
->
Cache
();
...
...
cpp/src/db/MemTableFile.h
浏览文件 @
65419a15
...
...
@@ -5,20 +5,21 @@
#include "VectorSource.h"
#include "ExecutionEngine.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
MemTableFile
{
public:
public:
using
Ptr
=
std
::
shared_ptr
<
MemTableFile
>
;
using
MetaPtr
=
meta
::
Meta
::
Ptr
;
MemTableFile
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>&
meta
,
const
Options
&
options
);
MemTableFile
(
const
std
::
string
&
table_id
,
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
);
Status
Add
(
const
VectorSource
::
Ptr
&
source
);
Status
Add
(
const
VectorSource
::
Ptr
&
source
);
size_t
GetCurrentMem
();
...
...
@@ -28,7 +29,7 @@ public:
Status
Serialize
();
private:
private:
Status
CreateTableFile
();
...
...
cpp/src/db/NewMemManager.cpp
浏览文件 @
65419a15
...
...
@@ -5,11 +5,12 @@
#include <thread>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
NewMemManager
::
MemTablePtr
NewMemManager
::
GetMemByTable
(
const
std
::
string
&
table_id
)
{
NewMemManager
::
MemTablePtr
NewMemManager
::
GetMemByTable
(
const
std
::
string
&
table_id
)
{
auto
memIt
=
mem_id_map_
.
find
(
table_id
);
if
(
memIt
!=
mem_id_map_
.
end
())
{
return
memIt
->
second
;
...
...
@@ -19,27 +20,27 @@ NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table
return
mem_id_map_
[
table_id
];
}
Status
NewMemManager
::
InsertVectors
(
const
std
::
string
&
table_id_
,
Status
NewMemManager
::
InsertVectors
(
const
std
::
string
&
table_id_
,
size_t
n_
,
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
const
float
*
vectors_
,
IDNumbers
&
vector_ids_
)
{
while
(
GetCurrentMem
()
>
options_
.
maximum_memory
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
1
));
}
LOG
(
DEBUG
)
<<
"NewMemManager::InsertVectors: mutable mem = "
<<
GetCurrentMutableMem
()
<<
", immutable mem = "
<<
GetCurrentImmutableMem
()
<<
", total mem = "
<<
GetCurrentMem
();
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
return
InsertVectorsNoLock
(
table_id_
,
n_
,
vectors_
,
vector_ids_
);
}
Status
NewMemManager
::
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
Status
NewMemManager
::
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
{
LOG
(
DEBUG
)
<<
"NewMemManager::InsertVectorsNoLock: mutable mem = "
<<
GetCurrentMutableMem
()
<<
", immutable mem = "
<<
GetCurrentImmutableMem
()
<<
", total mem = "
<<
GetCurrentMem
();
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
{
MemTablePtr
mem
=
GetMemByTable
(
table_id
);
VectorSource
::
Ptr
source
=
std
::
make_shared
<
VectorSource
>
(
n
,
vectors
);
...
...
@@ -54,37 +55,33 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id,
Status
NewMemManager
::
ToImmutable
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
MemIdMap
temp_map
;
for
(
auto
&
kv
:
mem_id_map_
)
{
if
(
kv
.
second
->
Empty
())
{
for
(
auto
&
kv
:
mem_id_map_
)
{
if
(
kv
.
second
->
Empty
())
{
//empty table, no need to serialize
temp_map
.
insert
(
kv
);
continue
;
//empty table, no need to serialize
}
else
{
immu_mem_list_
.
push_back
(
kv
.
second
);
}
immu_mem_list_
.
push_back
(
kv
.
second
);
}
mem_id_map_
.
swap
(
temp_map
);
return
Status
::
OK
();
}
Status
NewMemManager
::
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
{
Status
NewMemManager
::
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
{
ToImmutable
();
std
::
unique_lock
<
std
::
mutex
>
lock
(
serialization_mtx_
);
table_ids
.
clear
();
for
(
auto
&
mem
:
immu_mem_list_
)
{
for
(
auto
&
mem
:
immu_mem_list_
)
{
mem
->
Serialize
();
table_ids
.
insert
(
mem
->
GetTableId
());
}
immu_mem_list_
.
clear
();
// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) {
// (*mem)->Serialize();
// table_ids.insert((*mem)->GetTableId());
// mem = immu_mem_list_.erase(mem);
// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size();
// }
return
Status
::
OK
();
}
Status
NewMemManager
::
EraseMemVector
(
const
std
::
string
&
table_id
)
{
Status
NewMemManager
::
EraseMemVector
(
const
std
::
string
&
table_id
)
{
{
//erase MemVector from rapid-insert cache
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
mem_id_map_
.
erase
(
table_id
);
...
...
@@ -93,8 +90,8 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) {
{
//erase MemVector from serialize cache
std
::
unique_lock
<
std
::
mutex
>
lock
(
serialization_mtx_
);
MemList
temp_list
;
for
(
auto
&
mem
:
immu_mem_list_
)
{
if
(
mem
->
GetTableId
()
!=
table_id
)
{
for
(
auto
&
mem
:
immu_mem_list_
)
{
if
(
mem
->
GetTableId
()
!=
table_id
)
{
temp_list
.
push_back
(
mem
);
}
}
...
...
@@ -105,20 +102,20 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) {
}
size_t
NewMemManager
::
GetCurrentMutableMem
()
{
size_t
total
M
em
=
0
;
for
(
auto
&
kv
:
mem_id_map_
)
{
size_t
total
_m
em
=
0
;
for
(
auto
&
kv
:
mem_id_map_
)
{
auto
memTable
=
kv
.
second
;
total
M
em
+=
memTable
->
GetCurrentMem
();
total
_m
em
+=
memTable
->
GetCurrentMem
();
}
return
total
M
em
;
return
total
_m
em
;
}
size_t
NewMemManager
::
GetCurrentImmutableMem
()
{
size_t
total
M
em
=
0
;
for
(
auto
&
memT
able
:
immu_mem_list_
)
{
total
Mem
+=
memT
able
->
GetCurrentMem
();
size_t
total
_m
em
=
0
;
for
(
auto
&
mem_t
able
:
immu_mem_list_
)
{
total
_mem
+=
mem_t
able
->
GetCurrentMem
();
}
return
total
M
em
;
return
total
_m
em
;
}
size_t
NewMemManager
::
GetCurrentMem
()
{
...
...
cpp/src/db/NewMemManager.h
浏览文件 @
65419a15
...
...
@@ -11,25 +11,26 @@
#include <memory>
#include <mutex>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
NewMemManager
:
public
MemManagerAbstract
{
public:
public:
using
MetaPtr
=
meta
::
Meta
::
Ptr
;
using
Ptr
=
std
::
shared_ptr
<
NewMemManager
>
;
using
MemTablePtr
=
typename
MemTable
::
Ptr
;
NewMemManager
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
meta_
(
meta
),
options_
(
options
)
{}
NewMemManager
(
const
std
::
shared_ptr
<
meta
::
Meta
>
&
meta
,
const
Options
&
options
)
:
meta_
(
meta
),
options_
(
options
)
{}
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
override
;
Status
Serialize
(
std
::
set
<
std
::
string
>
&
table_ids
)
override
;
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
override
;
Status
EraseMemVector
(
const
std
::
string
&
table_id
)
override
;
size_t
GetCurrentMutableMem
()
override
;
...
...
@@ -37,11 +38,11 @@ public:
size_t
GetCurrentMem
()
override
;
private:
MemTablePtr
GetMemByTable
(
const
std
::
string
&
table_id
);
private:
MemTablePtr
GetMemByTable
(
const
std
::
string
&
table_id
);
Status
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
);
Status
InsertVectorsNoLock
(
const
std
::
string
&
table_id
,
size_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
);
Status
ToImmutable
();
using
MemIdMap
=
std
::
map
<
std
::
string
,
MemTablePtr
>
;
...
...
cpp/src/db/VectorSource.cpp
浏览文件 @
65419a15
...
...
@@ -4,6 +4,7 @@
#include "Log.h"
#include "metrics/Metrics.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
...
...
@@ -11,16 +12,16 @@ namespace engine {
VectorSource
::
VectorSource
(
const
size_t
&
n
,
const
float
*
vectors
)
:
n_
(
n
),
vectors_
(
vectors
),
id_generator_
(
new
SimpleIDGenerator
())
{
n_
(
n
),
vectors_
(
vectors
),
id_generator_
(
new
SimpleIDGenerator
())
{
current_num_vectors_added
=
0
;
}
Status
VectorSource
::
Add
(
const
ExecutionEnginePtr
&
execution_engine
,
const
meta
::
TableFileSchema
&
table_file_schema
,
const
size_t
&
num_vectors_to_add
,
size_t
&
num_vectors_added
)
{
Status
VectorSource
::
Add
(
const
ExecutionEnginePtr
&
execution_engine
,
const
meta
::
TableFileSchema
&
table_file_schema
,
const
size_t
&
num_vectors_to_add
,
size_t
&
num_vectors_added
)
{
auto
start_time
=
METRICS_NOW_TIME
;
...
...
@@ -36,8 +37,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine,
vector_ids_
.
insert
(
vector_ids_
.
end
(),
std
::
make_move_iterator
(
vector_ids_to_add
.
begin
()),
std
::
make_move_iterator
(
vector_ids_to_add
.
end
()));
}
else
{
}
else
{
ENGINE_LOG_ERROR
<<
"VectorSource::Add failed: "
+
status
.
ToString
();
}
...
...
cpp/src/db/VectorSource.h
浏览文件 @
65419a15
...
...
@@ -5,22 +5,23 @@
#include "IDGenerator.h"
#include "ExecutionEngine.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
VectorSource
{
public:
public:
using
Ptr
=
std
::
shared_ptr
<
VectorSource
>
;
VectorSource
(
const
size_t
&
n
,
const
float
*
vectors
);
VectorSource
(
const
size_t
&
n
,
const
float
*
vectors
);
Status
Add
(
const
ExecutionEnginePtr
&
execution_engine
,
const
meta
::
TableFileSchema
&
table_file_schema
,
const
size_t
&
num_vectors_to_add
,
size_t
&
num_vectors_added
);
Status
Add
(
const
ExecutionEnginePtr
&
execution_engine
,
const
meta
::
TableFileSchema
&
table_file_schema
,
const
size_t
&
num_vectors_to_add
,
size_t
&
num_vectors_added
);
size_t
GetNumVectorsAdded
();
...
...
@@ -28,15 +29,15 @@ public:
IDNumbers
GetVectorIds
();
private:
private:
const
size_t
n_
;
const
float
*
vectors_
;
const
float
*
vectors_
;
IDNumbers
vector_ids_
;
size_t
current_num_vectors_added
;
IDGenerator
*
id_generator_
;
IDGenerator
*
id_generator_
;
};
//VectorSource
...
...
cpp/unittest/db/mem_test.cpp
浏览文件 @
65419a15
...
...
@@ -15,33 +15,34 @@
#include <fstream>
#include <iostream>
using
namespace
zilliz
::
milvus
;
namespace
{
static
const
std
::
string
TABLE_NAME
=
"test_group"
;
static
constexpr
int64_t
TABLE_DIM
=
256
;
static
constexpr
int64_t
VECTOR_COUNT
=
250000
;
static
constexpr
int64_t
INSERT_LOOP
=
10000
;
engine
::
meta
::
TableSchema
BuildTableSchema
()
{
engine
::
meta
::
TableSchema
table_info
;
table_info
.
dimension_
=
TABLE_DIM
;
table_info
.
table_id_
=
TABLE_NAME
;
table_info
.
engine_type_
=
(
int
)
engine
::
EngineType
::
FAISS_IDMAP
;
return
table_info
;
}
static
const
std
::
string
TABLE_NAME
=
"test_group"
;
static
constexpr
int64_t
TABLE_DIM
=
256
;
static
constexpr
int64_t
VECTOR_COUNT
=
250000
;
static
constexpr
int64_t
INSERT_LOOP
=
10000
;
engine
::
meta
::
TableSchema
BuildTableSchema
()
{
engine
::
meta
::
TableSchema
table_info
;
table_info
.
dimension_
=
TABLE_DIM
;
table_info
.
table_id_
=
TABLE_NAME
;
table_info
.
engine_type_
=
(
int
)
engine
::
EngineType
::
FAISS_IDMAP
;
return
table_info
;
}
void
BuildVectors
(
int64_t
n
,
std
::
vector
<
float
>&
vectors
)
{
vectors
.
clear
();
vectors
.
resize
(
n
*
TABLE_DIM
);
float
*
data
=
vectors
.
data
();
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
for
(
int
j
=
0
;
j
<
TABLE_DIM
;
j
++
)
data
[
TABLE_DIM
*
i
+
j
]
=
drand48
();
data
[
TABLE_DIM
*
i
]
+=
i
/
2000.
;
}
void
BuildVectors
(
int64_t
n
,
std
::
vector
<
float
>
&
vectors
)
{
vectors
.
clear
();
vectors
.
resize
(
n
*
TABLE_DIM
);
float
*
data
=
vectors
.
data
();
for
(
int
i
=
0
;
i
<
n
;
i
++
)
{
for
(
int
j
=
0
;
j
<
TABLE_DIM
;
j
++
)
data
[
TABLE_DIM
*
i
+
j
]
=
drand48
();
data
[
TABLE_DIM
*
i
]
+=
i
/
2000.
;
}
}
}
TEST_F
(
NewMemManagerTest
,
VECTOR_SOURCE_TEST
)
{
...
...
@@ -65,7 +66,7 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) {
size_t
num_vectors_added
;
engine
::
ExecutionEnginePtr
execution_engine_
=
engine
::
EngineFactory
::
Build
(
table_file_schema
.
dimension_
,
table_file_schema
.
location_
,
(
engine
::
EngineType
)
table_file_schema
.
engine_type_
);
(
engine
::
EngineType
)
table_file_schema
.
engine_type_
);
status
=
source
.
Add
(
execution_engine_
,
table_file_schema
,
50
,
num_vectors_added
);
ASSERT_TRUE
(
status
.
ok
());
...
...
@@ -82,10 +83,6 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) {
vector_ids
=
source
.
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
100
);
// for (auto& id : vector_ids) {
// std::cout << id << std::endl;
// }
status
=
impl_
->
DropAll
();
ASSERT_TRUE
(
status
.
ok
());
}
...
...
@@ -99,7 +96,7 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) {
auto
status
=
impl_
->
CreateTable
(
table_schema
);
ASSERT_TRUE
(
status
.
ok
());
engine
::
MemTableFile
mem
TableF
ile
(
TABLE_NAME
,
impl_
,
options
);
engine
::
MemTableFile
mem
_table_f
ile
(
TABLE_NAME
,
impl_
,
options
);
int64_t
n_100
=
100
;
std
::
vector
<
float
>
vectors_100
;
...
...
@@ -107,28 +104,28 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) {
engine
::
VectorSource
::
Ptr
source
=
std
::
make_shared
<
engine
::
VectorSource
>
(
n_100
,
vectors_100
.
data
());
status
=
mem
TableF
ile
.
Add
(
source
);
status
=
mem
_table_f
ile
.
Add
(
source
);
ASSERT_TRUE
(
status
.
ok
());
// std::cout << mem
TableFile.GetCurrentMem() << " " << memTableF
ile.GetMemLeft() << std::endl;
// std::cout << mem
_table_file.GetCurrentMem() << " " << mem_table_f
ile.GetMemLeft() << std::endl;
engine
::
IDNumbers
vector_ids
=
source
->
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
100
);
size_t
singleVectorMem
=
sizeof
(
float
)
*
TABLE_DIM
;
ASSERT_EQ
(
mem
TableF
ile
.
GetCurrentMem
(),
n_100
*
singleVectorMem
);
ASSERT_EQ
(
mem
_table_f
ile
.
GetCurrentMem
(),
n_100
*
singleVectorMem
);
int64_t
n_max
=
engine
::
MAX_TABLE_FILE_MEM
/
singleVectorMem
;
std
::
vector
<
float
>
vectors_128M
;
BuildVectors
(
n_max
,
vectors_128M
);
engine
::
VectorSource
::
Ptr
source_128M
=
std
::
make_shared
<
engine
::
VectorSource
>
(
n_max
,
vectors_128M
.
data
());
status
=
mem
TableF
ile
.
Add
(
source_128M
);
status
=
mem
_table_f
ile
.
Add
(
source_128M
);
vector_ids
=
source_128M
->
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
n_max
-
n_100
);
ASSERT_TRUE
(
mem
TableF
ile
.
IsFull
());
ASSERT_TRUE
(
mem
_table_f
ile
.
IsFull
());
status
=
impl_
->
DropAll
();
ASSERT_TRUE
(
status
.
ok
());
...
...
@@ -149,34 +146,34 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) {
engine
::
VectorSource
::
Ptr
source_100
=
std
::
make_shared
<
engine
::
VectorSource
>
(
n_100
,
vectors_100
.
data
());
engine
::
MemTable
mem
T
able
(
TABLE_NAME
,
impl_
,
options
);
engine
::
MemTable
mem
_t
able
(
TABLE_NAME
,
impl_
,
options
);
status
=
mem
T
able
.
Add
(
source_100
);
status
=
mem
_t
able
.
Add
(
source_100
);
ASSERT_TRUE
(
status
.
ok
());
engine
::
IDNumbers
vector_ids
=
source_100
->
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
100
);
engine
::
MemTableFile
::
Ptr
mem
TableF
ile
;
mem
Table
.
GetCurrentMemTableFile
(
memTableF
ile
);
engine
::
MemTableFile
::
Ptr
mem
_table_f
ile
;
mem
_table
.
GetCurrentMemTableFile
(
mem_table_f
ile
);
size_t
singleVectorMem
=
sizeof
(
float
)
*
TABLE_DIM
;
ASSERT_EQ
(
mem
TableF
ile
->
GetCurrentMem
(),
n_100
*
singleVectorMem
);
ASSERT_EQ
(
mem
_table_f
ile
->
GetCurrentMem
(),
n_100
*
singleVectorMem
);
int64_t
n_max
=
engine
::
MAX_TABLE_FILE_MEM
/
singleVectorMem
;
std
::
vector
<
float
>
vectors_128M
;
BuildVectors
(
n_max
,
vectors_128M
);
engine
::
VectorSource
::
Ptr
source_128M
=
std
::
make_shared
<
engine
::
VectorSource
>
(
n_max
,
vectors_128M
.
data
());
status
=
mem
T
able
.
Add
(
source_128M
);
status
=
mem
_t
able
.
Add
(
source_128M
);
ASSERT_TRUE
(
status
.
ok
());
vector_ids
=
source_128M
->
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
n_max
);
mem
Table
.
GetCurrentMemTableFile
(
memTableF
ile
);
ASSERT_EQ
(
mem
TableF
ile
->
GetCurrentMem
(),
n_100
*
singleVectorMem
);
mem
_table
.
GetCurrentMemTableFile
(
mem_table_f
ile
);
ASSERT_EQ
(
mem
_table_f
ile
->
GetCurrentMem
(),
n_100
*
singleVectorMem
);
ASSERT_EQ
(
mem
T
able
.
GetTableFileCount
(),
2
);
ASSERT_EQ
(
mem
_t
able
.
GetTableFileCount
(),
2
);
int64_t
n_1G
=
1024000
;
std
::
vector
<
float
>
vectors_1G
;
...
...
@@ -184,16 +181,16 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) {
engine
::
VectorSource
::
Ptr
source_1G
=
std
::
make_shared
<
engine
::
VectorSource
>
(
n_1G
,
vectors_1G
.
data
());
status
=
mem
T
able
.
Add
(
source_1G
);
status
=
mem
_t
able
.
Add
(
source_1G
);
ASSERT_TRUE
(
status
.
ok
());
vector_ids
=
source_1G
->
GetVectorIds
();
ASSERT_EQ
(
vector_ids
.
size
(),
n_1G
);
int
expectedTableFileCount
=
2
+
std
::
ceil
((
n_1G
-
n_100
)
*
singleVectorMem
/
engine
::
MAX_TABLE_FILE_MEM
);
ASSERT_EQ
(
mem
T
able
.
GetTableFileCount
(),
expectedTableFileCount
);
ASSERT_EQ
(
mem
_t
able
.
GetTableFileCount
(),
expectedTableFileCount
);
status
=
mem
T
able
.
Serialize
();
status
=
mem
_t
able
.
Serialize
();
ASSERT_TRUE
(
status
.
ok
());
status
=
impl_
->
DropAll
();
...
...
@@ -216,7 +213,7 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) {
ASSERT_STATS
(
stat
);
ASSERT_EQ
(
table_info_get
.
dimension_
,
TABLE_DIM
);
std
::
map
<
int64_t
,
std
::
vector
<
float
>>
search_vectors
;
std
::
map
<
int64_t
,
std
::
vector
<
float
>>
search_vectors
;
{
engine
::
IDNumbers
vector_ids
;
int64_t
nb
=
1024000
;
...
...
@@ -231,8 +228,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) {
std
::
mt19937
gen
(
rd
());
std
::
uniform_int_distribution
<
int64_t
>
dis
(
0
,
nb
-
1
);
int64_t
num
Q
uery
=
20
;
for
(
int64_t
i
=
0
;
i
<
num
Q
uery
;
++
i
)
{
int64_t
num
_q
uery
=
20
;
for
(
int64_t
i
=
0
;
i
<
num
_q
uery
;
++
i
)
{
int64_t
index
=
dis
(
gen
);
std
::
vector
<
float
>
search
;
for
(
int64_t
j
=
0
;
j
<
TABLE_DIM
;
j
++
)
{
...
...
@@ -243,8 +240,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) {
}
int
k
=
10
;
for
(
auto
&
pair
:
search_vectors
)
{
auto
&
search
=
pair
.
second
;
for
(
auto
&
pair
:
search_vectors
)
{
auto
&
search
=
pair
.
second
;
engine
::
QueryResults
results
;
stat
=
db_
->
Query
(
TABLE_NAME
,
k
,
1
,
search
.
data
(),
results
);
ASSERT_EQ
(
results
[
0
][
0
].
first
,
pair
.
first
);
...
...
@@ -329,18 +326,18 @@ TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) {
uint64_t
count
=
0
;
uint64_t
prev_count
=
0
;
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
ss
.
str
(
""
);
db_
->
Size
(
count
);
prev_count
=
count
;
START_TIMER
;
stat
=
db_
->
Query
(
TABLE_NAME
,
k
,
qb
,
qxb
.
data
(),
results
);
ss
<<
"Search "
<<
j
<<
" With Size "
<<
count
/
engine
::
meta
::
M
<<
" M"
;
ss
<<
"Search "
<<
j
<<
" With Size "
<<
count
/
engine
::
meta
::
M
<<
" M"
;
STOP_TIMER
(
ss
.
str
());
ASSERT_STATS
(
stat
);
for
(
auto
k
=
0
;
k
<
qb
;
++
k
)
{
for
(
auto
k
=
0
;
k
<
qb
;
++
k
)
{
ASSERT_EQ
(
results
[
k
][
0
].
first
,
target_ids
[
k
]);
ss
.
str
(
""
);
ss
<<
"Result ["
<<
k
<<
"]:"
;
...
...
@@ -356,8 +353,8 @@ TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) {
int
loop
=
20
;
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
if
(
i
==
0
)
{
for
(
auto
i
=
0
;
i
<
loop
;
++
i
)
{
if
(
i
==
0
)
{
db_
->
InsertVectors
(
TABLE_NAME
,
qb
,
qxb
.
data
(),
target_ids
);
ASSERT_EQ
(
target_ids
.
size
(),
qb
);
}
else
{
...
...
cpp/unittest/db/utils.h
浏览文件 @
65419a15
...
...
@@ -30,7 +30,7 @@
#define STOP_TIMER(name)
#endif
void
ASSERT_STATS
(
zilliz
::
milvus
::
engine
::
Status
&
stat
);
void
ASSERT_STATS
(
zilliz
::
milvus
::
engine
::
Status
&
stat
);
//class TestEnv : public ::testing::Environment {
//public:
...
...
@@ -54,8 +54,8 @@ void ASSERT_STATS(zilliz::milvus::engine::Status& stat);
// ::testing::AddGlobalTestEnvironment(new TestEnv);
class
DBTest
:
public
::
testing
::
Test
{
protected:
zilliz
::
milvus
::
engine
::
DB
*
db_
;
protected:
zilliz
::
milvus
::
engine
::
DB
*
db_
;
void
InitLog
();
virtual
void
SetUp
()
override
;
...
...
@@ -64,13 +64,13 @@ protected:
};
class
DBTest2
:
public
DBTest
{
protected:
protected:
virtual
zilliz
::
milvus
::
engine
::
Options
GetOptions
()
override
;
};
class
MetaTest
:
public
DBTest
{
protected:
protected:
std
::
shared_ptr
<
zilliz
::
milvus
::
engine
::
meta
::
DBMetaImpl
>
impl_
;
virtual
void
SetUp
()
override
;
...
...
@@ -78,17 +78,17 @@ protected:
};
class
MySQLTest
:
public
::
testing
::
Test
{
protected:
protected:
// std::shared_ptr<zilliz::milvus::engine::meta::MySQLMetaImpl> impl_;
zilliz
::
milvus
::
engine
::
DBMetaOptions
getDBMetaOptions
();
};
class
MySQLDBTest
:
public
::
testing
::
Test
{
protected:
class
MySQLDBTest
:
public
::
testing
::
Test
{
protected:
zilliz
::
milvus
::
engine
::
Options
GetOptions
();
};
class
NewMemManagerTest
:
public
::
testing
::
Test
{
class
NewMemManagerTest
:
public
::
testing
::
Test
{
void
InitLog
();
v
irtual
v
oid
SetUp
()
override
;
void
SetUp
()
override
;
};
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录