Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
db3f0e9b
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
db3f0e9b
编写于
6月 11, 2019
作者:
G
groot
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support delete table
Former-commit-id: 3b728c4dfc0b9a6c8803538c89961e4b43c6ca67
上级
f5066e41
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
370 addition
and
106 deletion
+370
-106
cpp/src/db/DB.h
cpp/src/db/DB.h
+8
-5
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+50
-13
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+9
-6
cpp/src/db/DBMetaImpl.cpp
cpp/src/db/DBMetaImpl.cpp
+162
-45
cpp/src/db/DBMetaImpl.h
cpp/src/db/DBMetaImpl.h
+8
-2
cpp/src/db/Meta.h
cpp/src/db/Meta.h
+8
-2
cpp/src/sdk/examples/simple/src/ClientTest.cpp
cpp/src/sdk/examples/simple/src/ClientTest.cpp
+8
-6
cpp/src/sdk/include/Status.h
cpp/src/sdk/include/Status.h
+2
-2
cpp/src/sdk/src/client/ClientProxy.cpp
cpp/src/sdk/src/client/ClientProxy.cpp
+3
-1
cpp/src/sdk/src/interface/ConnectionImpl.cpp
cpp/src/sdk/src/interface/ConnectionImpl.cpp
+2
-1
cpp/src/server/MegasearchTask.cpp
cpp/src/server/MegasearchTask.cpp
+105
-18
cpp/unittest/db/db_tests.cpp
cpp/unittest/db/db_tests.cpp
+3
-3
cpp/unittest/metrics/metrics_test.cpp
cpp/unittest/metrics/metrics_test.cpp
+2
-2
未找到文件。
cpp/src/db/DB.h
浏览文件 @
db3f0e9b
...
@@ -23,19 +23,22 @@ public:
...
@@ -23,19 +23,22 @@ public:
static
void
Open
(
const
Options
&
options
,
DB
**
dbptr
);
static
void
Open
(
const
Options
&
options
,
DB
**
dbptr
);
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
=
0
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id_
,
bool
&
has_or_not_
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not_
)
=
0
;
virtual
Status
AllTables
(
std
::
vector
<
meta
::
TableSchema
>&
table_schema_array
)
=
0
;
virtual
Status
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
=
0
;
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id_
,
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id_
,
size
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids_
)
=
0
;
uint64
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids_
)
=
0
;
virtual
Status
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
virtual
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
QueryResults
&
results
)
=
0
;
const
float
*
vectors
,
QueryResults
&
results
)
=
0
;
virtual
Status
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
virtual
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
=
0
;
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
=
0
;
virtual
Status
Size
(
long
&
result
)
=
0
;
virtual
Status
Size
(
uint64_t
&
result
)
=
0
;
virtual
Status
DropAll
()
=
0
;
virtual
Status
DropAll
()
=
0
;
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
db3f0e9b
...
@@ -6,6 +6,7 @@
...
@@ -6,6 +6,7 @@
#include "DBImpl.h"
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "Env.h"
#include "Log.h"
#include "EngineFactory.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
#include "scheduler/SearchScheduler.h"
...
@@ -15,8 +16,8 @@
...
@@ -15,8 +16,8 @@
#include <thread>
#include <thread>
#include <iostream>
#include <iostream>
#include <cstring>
#include <cstring>
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
namespace
zilliz
{
namespace
zilliz
{
namespace
vecwise
{
namespace
vecwise
{
...
@@ -88,6 +89,34 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
...
@@ -88,6 +89,34 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
return
pMeta_
->
CreateTable
(
table_schema
);
return
pMeta_
->
CreateTable
(
table_schema
);
}
}
Status
DBImpl
::
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
{
meta
::
DatePartionedTableFilesSchema
files
;
auto
status
=
pMeta_
->
FilesToDelete
(
table_id
,
dates
,
files
);
if
(
!
status
.
ok
())
{
return
status
;
}
for
(
auto
&
day_files
:
files
)
{
for
(
auto
&
file
:
day_files
.
second
)
{
boost
::
filesystem
::
remove
(
file
.
location_
);
}
}
//dates empty means delete all files of the table
if
(
dates
.
empty
())
{
meta
::
TableSchema
table_schema
;
table_schema
.
table_id_
=
table_id
;
status
=
DescribeTable
(
table_schema
);
pMeta_
->
DeleteTable
(
table_id
);
boost
::
system
::
error_code
ec
;
boost
::
filesystem
::
remove_all
(
table_schema
.
location_
,
ec
);
if
(
ec
.
failed
())
{
ENGINE_LOG_WARNING
<<
"Failed to remove table folder"
;
}
}
return
Status
::
OK
();
}
Status
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
Status
DBImpl
::
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
{
return
pMeta_
->
DescribeTable
(
table_schema
);
return
pMeta_
->
DescribeTable
(
table_schema
);
}
}
...
@@ -96,8 +125,16 @@ Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
...
@@ -96,8 +125,16 @@ Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
return
pMeta_
->
HasTable
(
table_id
,
has_or_not
);
return
pMeta_
->
HasTable
(
table_id
,
has_or_not
);
}
}
Status
DBImpl
::
AllTables
(
std
::
vector
<
meta
::
TableSchema
>&
table_schema_array
)
{
return
pMeta_
->
AllTables
(
table_schema_array
);
}
Status
DBImpl
::
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
{
return
pMeta_
->
Count
(
table_id
,
row_count
);
}
Status
DBImpl
::
InsertVectors
(
const
std
::
string
&
table_id_
,
Status
DBImpl
::
InsertVectors
(
const
std
::
string
&
table_id_
,
size
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids_
)
{
uint64
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids_
)
{
auto
start_time
=
METRICS_NOW_TIME
;
auto
start_time
=
METRICS_NOW_TIME
;
Status
status
=
pMemMgr_
->
InsertVectors
(
table_id_
,
n
,
vectors
,
vector_ids_
);
Status
status
=
pMemMgr_
->
InsertVectors
(
table_id_
,
n
,
vectors
,
vector_ids_
);
...
@@ -110,7 +147,7 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
...
@@ -110,7 +147,7 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
return
status
;
return
status
;
}
}
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
QueryResults
&
results
)
{
const
float
*
vectors
,
QueryResults
&
results
)
{
auto
start_time
=
METRICS_NOW_TIME
;
auto
start_time
=
METRICS_NOW_TIME
;
meta
::
DatesT
dates
=
{
meta
::
Meta
::
GetDate
()};
meta
::
DatesT
dates
=
{
meta
::
Meta
::
GetDate
()};
...
@@ -122,7 +159,7 @@ Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
...
@@ -122,7 +159,7 @@ Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
return
result
;
return
result
;
}
}
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
DBImpl
::
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
#if 0
#if 0
return QuerySync(table_id, k, nq, vectors, dates, results);
return QuerySync(table_id, k, nq, vectors, dates, results);
...
@@ -131,13 +168,13 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
...
@@ -131,13 +168,13 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
#endif
#endif
}
}
Status
DBImpl
::
QuerySync
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
DBImpl
::
QuerySync
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
meta
::
DatePartionedTableFilesSchema
files
;
meta
::
DatePartionedTableFilesSchema
files
;
auto
status
=
pMeta_
->
FilesToSearch
(
table_id
,
dates
,
files
);
auto
status
=
pMeta_
->
FilesToSearch
(
table_id
,
dates
,
files
);
if
(
!
status
.
ok
())
{
return
status
;
}
if
(
!
status
.
ok
())
{
return
status
;
}
LOG
(
DEBUG
)
<<
"Search DateT Size=
"
<<
files
.
size
();
ENGINE_LOG_DEBUG
<<
"Search DateT Size =
"
<<
files
.
size
();
meta
::
TableFilesSchema
index_files
;
meta
::
TableFilesSchema
index_files
;
meta
::
TableFilesSchema
raw_files
;
meta
::
TableFilesSchema
raw_files
;
...
@@ -154,7 +191,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
...
@@ -154,7 +191,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
}
else
if
(
!
raw_files
.
empty
())
{
}
else
if
(
!
raw_files
.
empty
())
{
dim
=
raw_files
[
0
].
dimension_
;
dim
=
raw_files
[
0
].
dimension_
;
}
else
{
}
else
{
LOG
(
DEBUG
)
<<
"no files to search"
;
ENGINE_LOG_DEBUG
<<
"no files to search"
;
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -190,7 +227,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
...
@@ -190,7 +227,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
auto
file_size
=
index
->
PhysicalSize
();
auto
file_size
=
index
->
PhysicalSize
();
search_set_size
+=
file_size
;
search_set_size
+=
file_size
;
LOG
(
DEBUG
)
<<
"Search file_type "
<<
file
.
file_type_
<<
" Of Size: "
ENGINE_LOG_DEBUG
<<
"Search file_type "
<<
file
.
file_type_
<<
" Of Size: "
<<
file_size
/
(
1024
*
1024
)
<<
" M"
;
<<
file_size
/
(
1024
*
1024
)
<<
" M"
;
int
inner_k
=
index
->
Count
()
<
k
?
index
->
Count
()
:
k
;
int
inner_k
=
index
->
Count
()
<
k
?
index
->
Count
()
:
k
;
...
@@ -252,7 +289,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
...
@@ -252,7 +289,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
search_in_index
(
raw_files
);
search_in_index
(
raw_files
);
search_in_index
(
index_files
);
search_in_index
(
index_files
);
LOG
(
DEBUG
)
<<
"Search Overall Set Size=
"
<<
search_set_size
<<
" M"
;
ENGINE_LOG_DEBUG
<<
"Search Overall Set Size =
"
<<
search_set_size
<<
" M"
;
cluster_topk
();
cluster_topk
();
free
(
output_distence
);
free
(
output_distence
);
...
@@ -265,7 +302,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
...
@@ -265,7 +302,7 @@ Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
return
Status
::
OK
();
return
Status
::
OK
();
}
}
Status
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
DBImpl
::
QueryAsync
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
{
//step 1: get files to search
//step 1: get files to search
...
@@ -273,7 +310,7 @@ Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
...
@@ -273,7 +310,7 @@ Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
auto
status
=
pMeta_
->
FilesToSearch
(
table_id
,
dates
,
files
);
auto
status
=
pMeta_
->
FilesToSearch
(
table_id
,
dates
,
files
);
if
(
!
status
.
ok
())
{
return
status
;
}
if
(
!
status
.
ok
())
{
return
status
;
}
LOG
(
DEBUG
)
<<
"Search DateT Size="
<<
files
.
size
();
ENGINE_LOG_DEBUG
<<
"Search DateT Size="
<<
files
.
size
();
SearchContextPtr
context
=
std
::
make_shared
<
SearchContext
>
(
k
,
nq
,
vectors
);
SearchContextPtr
context
=
std
::
make_shared
<
SearchContext
>
(
k
,
nq
,
vectors
);
...
@@ -312,7 +349,7 @@ void DBImpl::BackgroundTimerTask(int interval) {
...
@@ -312,7 +349,7 @@ void DBImpl::BackgroundTimerTask(int interval) {
int64_t
cache_total
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
int64_t
cache_total
=
cache
::
CpuCacheMgr
::
GetInstance
()
->
CacheUsage
();
LOG
(
DEBUG
)
<<
"Cache usage "
<<
cache_total
;
LOG
(
DEBUG
)
<<
"Cache usage "
<<
cache_total
;
server
::
Metrics
::
GetInstance
().
CacheUsageGaugeSet
(
static_cast
<
double
>
(
cache_total
));
server
::
Metrics
::
GetInstance
().
CacheUsageGaugeSet
(
static_cast
<
double
>
(
cache_total
));
long
size
;
uint64_t
size
;
Size
(
size
);
Size
(
size
);
server
::
Metrics
::
GetInstance
().
DataFileSizeGaugeSet
(
size
);
server
::
Metrics
::
GetInstance
().
DataFileSizeGaugeSet
(
size
);
TrySchedule
();
TrySchedule
();
...
@@ -509,7 +546,7 @@ Status DBImpl::DropAll() {
...
@@ -509,7 +546,7 @@ Status DBImpl::DropAll() {
return
pMeta_
->
DropAll
();
return
pMeta_
->
DropAll
();
}
}
Status
DBImpl
::
Size
(
long
&
result
)
{
Status
DBImpl
::
Size
(
uint64_t
&
result
)
{
return
pMeta_
->
Size
(
result
);
return
pMeta_
->
Size
(
result
);
}
}
...
...
cpp/src/db/DBImpl.h
浏览文件 @
db3f0e9b
...
@@ -33,29 +33,32 @@ public:
...
@@ -33,29 +33,32 @@ public:
DBImpl
(
const
Options
&
options
);
DBImpl
(
const
Options
&
options
);
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
override
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
override
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema
)
override
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
virtual
Status
AllTables
(
std
::
vector
<
meta
::
TableSchema
>&
table_schema_array
)
override
;
virtual
Status
GetTableRowCount
(
const
std
::
string
&
table_id
,
uint64_t
&
row_count
)
override
;
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id
,
virtual
Status
InsertVectors
(
const
std
::
string
&
table_id
,
size
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
uint64
_t
n
,
const
float
*
vectors
,
IDNumbers
&
vector_ids
)
override
;
virtual
Status
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
virtual
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
QueryResults
&
results
)
override
;
const
float
*
vectors
,
QueryResults
&
results
)
override
;
virtual
Status
Query
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
virtual
Status
Query
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
override
;
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
)
override
;
virtual
Status
DropAll
()
override
;
virtual
Status
DropAll
()
override
;
virtual
Status
Size
(
long
&
result
)
override
;
virtual
Status
Size
(
uint64_t
&
result
)
override
;
virtual
~
DBImpl
();
virtual
~
DBImpl
();
private:
private:
Status
QuerySync
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
QuerySync
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
Status
QueryAsync
(
const
std
::
string
&
table_id
,
size_t
k
,
size
_t
nq
,
Status
QueryAsync
(
const
std
::
string
&
table_id
,
uint64_t
k
,
uint64
_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
...
...
cpp/src/db/DBMetaImpl.cpp
浏览文件 @
db3f0e9b
...
@@ -6,6 +6,7 @@
...
@@ -6,6 +6,7 @@
#include "DBMetaImpl.h"
#include "DBMetaImpl.h"
#include "IDGenerator.h"
#include "IDGenerator.h"
#include "Utils.h"
#include "Utils.h"
#include "Log.h"
#include "MetaConsts.h"
#include "MetaConsts.h"
#include "Factories.h"
#include "Factories.h"
#include "metrics/Metrics.h"
#include "metrics/Metrics.h"
...
@@ -17,7 +18,6 @@
...
@@ -17,7 +18,6 @@
#include <chrono>
#include <chrono>
#include <fstream>
#include <fstream>
#include <sqlite_orm.h>
#include <sqlite_orm.h>
#include <easylogging++.h>
namespace
zilliz
{
namespace
zilliz
{
...
@@ -27,6 +27,15 @@ namespace meta {
...
@@ -27,6 +27,15 @@ namespace meta {
using
namespace
sqlite_orm
;
using
namespace
sqlite_orm
;
namespace
{
void
HandleException
(
std
::
exception
&
e
)
{
ENGINE_LOG_DEBUG
<<
"Engine meta exception: "
<<
e
.
what
();
throw
e
;
}
}
inline
auto
StoragePrototype
(
const
std
::
string
&
path
)
{
inline
auto
StoragePrototype
(
const
std
::
string
&
path
)
{
return
make_storage
(
path
,
return
make_storage
(
path
,
make_table
(
"Table"
,
make_table
(
"Table"
,
...
@@ -100,7 +109,7 @@ Status DBMetaImpl::Initialize() {
...
@@ -100,7 +109,7 @@ Status DBMetaImpl::Initialize() {
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
if
(
!
ret
)
{
if
(
!
ret
)
{
LOG
(
ERROR
)
<<
"Create directory "
<<
options_
.
path
<<
" Error"
;
ENGINE_LOG_ERROR
<<
"Create directory "
<<
options_
.
path
<<
" Error"
;
}
}
assert
(
ret
);
assert
(
ret
);
}
}
...
@@ -148,8 +157,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
...
@@ -148,8 +157,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
in
(
&
TableFileSchema
::
date_
,
dates
)
in
(
&
TableFileSchema
::
date_
,
dates
)
));
));
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -175,12 +183,12 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
...
@@ -175,12 +183,12 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
server
::
Metrics
::
GetInstance
().
MetaAccessDurationSecondsHistogramObserve
(
total_time
);
server
::
Metrics
::
GetInstance
().
MetaAccessDurationSecondsHistogramObserve
(
total_time
);
auto
group
_path
=
GetTablePath
(
table_schema
.
table_id_
);
auto
table
_path
=
GetTablePath
(
table_schema
.
table_id_
);
table_schema
.
location_
=
table_path
;
if
(
!
boost
::
filesystem
::
is_directory
(
group
_path
))
{
if
(
!
boost
::
filesystem
::
is_directory
(
table
_path
))
{
auto
ret
=
boost
::
filesystem
::
create_directories
(
group
_path
);
auto
ret
=
boost
::
filesystem
::
create_directories
(
table
_path
);
if
(
!
ret
)
{
if
(
!
ret
)
{
LOG
(
ERROR
)
<<
"Create directory "
<<
group
_path
<<
" Error"
;
ENGINE_LOG_ERROR
<<
"Create directory "
<<
table
_path
<<
" Error"
;
}
}
assert
(
ret
);
assert
(
ret
);
}
}
...
@@ -188,6 +196,21 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
...
@@ -188,6 +196,21 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
return
Status
::
OK
();
return
Status
::
OK
();
}
}
Status
DBMetaImpl
::
DeleteTable
(
const
std
::
string
&
table_id
)
{
try
{
//drop the table from meta
auto
tables
=
ConnectorPtr
->
select
(
columns
(
&
TableSchema
::
id_
),
where
(
c
(
&
TableSchema
::
table_id_
)
==
table_id
));
for
(
auto
&
table
:
tables
)
{
ConnectorPtr
->
remove
<
TableSchema
>
(
std
::
get
<
0
>
(
table
));
}
}
catch
(
std
::
exception
&
e
)
{
HandleException
(
e
);
}
return
Status
::
OK
();
}
Status
DBMetaImpl
::
DescribeTable
(
TableSchema
&
table_schema
)
{
Status
DBMetaImpl
::
DescribeTable
(
TableSchema
&
table_schema
)
{
try
{
try
{
server
::
Metrics
::
GetInstance
().
MetaAccessTotalIncrement
();
server
::
Metrics
::
GetInstance
().
MetaAccessTotalIncrement
();
...
@@ -212,9 +235,12 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
...
@@ -212,9 +235,12 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
}
else
{
}
else
{
return
Status
::
NotFound
(
"Table "
+
table_schema
.
table_id_
+
" not found"
);
return
Status
::
NotFound
(
"Table "
+
table_schema
.
table_id_
+
" not found"
);
}
}
auto
table_path
=
GetTablePath
(
table_schema
.
table_id_
);
table_schema
.
location_
=
table_path
;
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -237,9 +263,39 @@ Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
...
@@ -237,9 +263,39 @@ Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not
=
false
;
has_or_not
=
false
;
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
return
Status
::
OK
();
}
Status
DBMetaImpl
::
AllTables
(
std
::
vector
<
TableSchema
>&
table_schema_array
)
{
try
{
server
::
Metrics
::
GetInstance
().
MetaAccessTotalIncrement
();
auto
start_time
=
METRICS_NOW_TIME
;
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableSchema
::
id_
,
&
TableSchema
::
table_id_
,
&
TableSchema
::
files_cnt_
,
&
TableSchema
::
dimension_
,
&
TableSchema
::
engine_type_
,
&
TableSchema
::
store_raw_data_
));
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
server
::
Metrics
::
GetInstance
().
MetaAccessDurationSecondsHistogramObserve
(
total_time
);
for
(
auto
&
table
:
selected
)
{
TableSchema
schema
;
schema
.
id_
=
std
::
get
<
0
>
(
table
);
schema
.
table_id_
=
std
::
get
<
1
>
(
table
);
schema
.
files_cnt_
=
std
::
get
<
2
>
(
table
);
schema
.
dimension_
=
std
::
get
<
3
>
(
table
);
schema
.
engine_type_
=
std
::
get
<
4
>
(
table
);
schema
.
store_raw_data_
=
std
::
get
<
5
>
(
table
);
table_schema_array
.
emplace_back
(
schema
);
}
}
catch
(
std
::
exception
&
e
)
{
HandleException
(
e
);
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -282,7 +338,7 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
...
@@ -282,7 +338,7 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
if
(
!
boost
::
filesystem
::
is_directory
(
partition_path
))
{
if
(
!
boost
::
filesystem
::
is_directory
(
partition_path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
partition_path
);
auto
ret
=
boost
::
filesystem
::
create_directory
(
partition_path
);
if
(
!
ret
)
{
if
(
!
ret
)
{
LOG
(
ERROR
)
<<
"Create directory "
<<
partition_path
<<
" Error"
;
ENGINE_LOG_ERROR
<<
"Create directory "
<<
partition_path
<<
" Error"
;
}
}
assert
(
ret
);
assert
(
ret
);
}
}
...
@@ -336,8 +392,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
...
@@ -336,8 +392,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
files
.
push_back
(
table_file
);
files
.
push_back
(
table_file
);
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -438,8 +493,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
...
@@ -438,8 +493,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -488,8 +542,79 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
...
@@ -488,8 +542,79 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
files
[
table_file
.
date_
].
push_back
(
table_file
);
files
[
table_file
.
date_
].
push_back
(
table_file
);
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
return
Status
::
OK
();
}
Status
DBMetaImpl
::
FilesToDelete
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
{
auto
now
=
utils
::
GetMicroSecTimeStamp
();
try
{
if
(
partition
.
empty
())
{
//step 1: get table files by dates
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
&
TableFileSchema
::
table_id_
,
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
size_
,
&
TableFileSchema
::
date_
),
where
(
c
(
&
TableFileSchema
::
file_type_
)
!=
(
int
)
TableFileSchema
::
TO_DELETE
and
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
));
//step 2: erase table files from meta
for
(
auto
&
file
:
selected
)
{
TableFileSchema
table_file
;
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
table_id_
=
std
::
get
<
1
>
(
file
);
table_file
.
file_id_
=
std
::
get
<
2
>
(
file
);
table_file
.
size_
=
std
::
get
<
3
>
(
file
);
table_file
.
date_
=
std
::
get
<
4
>
(
file
);
GetTableFilePath
(
table_file
);
auto
dateItr
=
files
.
find
(
table_file
.
date_
);
if
(
dateItr
==
files
.
end
())
{
files
[
table_file
.
date_
]
=
TableFilesSchema
();
}
files
[
table_file
.
date_
].
push_back
(
table_file
);
ConnectorPtr
->
remove
<
TableFileSchema
>
(
std
::
get
<
0
>
(
file
));
}
}
else
{
//step 1: get all table files
auto
selected
=
ConnectorPtr
->
select
(
columns
(
&
TableFileSchema
::
id_
,
&
TableFileSchema
::
table_id_
,
&
TableFileSchema
::
file_id_
,
&
TableFileSchema
::
size_
,
&
TableFileSchema
::
date_
),
where
(
c
(
&
TableFileSchema
::
file_type_
)
!=
(
int
)
TableFileSchema
::
TO_DELETE
and
in
(
&
TableFileSchema
::
date_
,
partition
)
and
c
(
&
TableFileSchema
::
table_id_
)
==
table_id
));
//step 2: erase table files from meta
for
(
auto
&
file
:
selected
)
{
TableFileSchema
table_file
;
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
table_id_
=
std
::
get
<
1
>
(
file
);
table_file
.
file_id_
=
std
::
get
<
2
>
(
file
);
table_file
.
size_
=
std
::
get
<
3
>
(
file
);
table_file
.
date_
=
std
::
get
<
4
>
(
file
);
GetTableFilePath
(
table_file
);
auto
dateItr
=
files
.
find
(
table_file
.
date_
);
if
(
dateItr
==
files
.
end
())
{
files
[
table_file
.
date_
]
=
TableFilesSchema
();
}
files
[
table_file
.
date_
].
push_back
(
table_file
);
ConnectorPtr
->
remove
<
TableFileSchema
>
(
std
::
get
<
0
>
(
file
));
}
}
}
catch
(
std
::
exception
&
e
)
{
HandleException
(
e
);
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -520,8 +645,7 @@ Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
...
@@ -520,8 +645,7 @@ Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
" File:"
+
file_schema
.
file_id_
+
" not found"
);
" File:"
+
file_schema
.
file_id_
+
" not found"
);
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -550,12 +674,11 @@ Status DBMetaImpl::Archive() {
...
@@ -550,12 +674,11 @@ Status DBMetaImpl::Archive() {
c
(
&
TableFileSchema
::
file_type_
)
!=
(
int
)
TableFileSchema
::
TO_DELETE
c
(
&
TableFileSchema
::
file_type_
)
!=
(
int
)
TableFileSchema
::
TO_DELETE
));
));
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
}
}
if
(
criteria
==
"disk"
)
{
if
(
criteria
==
"disk"
)
{
long
sum
=
0
;
uint64_t
sum
=
0
;
Size
(
sum
);
Size
(
sum
);
auto
to_delete
=
(
sum
-
limit
*
G
);
auto
to_delete
=
(
sum
-
limit
*
G
);
...
@@ -566,7 +689,7 @@ Status DBMetaImpl::Archive() {
...
@@ -566,7 +689,7 @@ Status DBMetaImpl::Archive() {
return
Status
::
OK
();
return
Status
::
OK
();
}
}
Status
DBMetaImpl
::
Size
(
long
&
result
)
{
Status
DBMetaImpl
::
Size
(
uint64_t
&
result
)
{
result
=
0
;
result
=
0
;
try
{
try
{
auto
selected
=
ConnectorPtr
->
select
(
columns
(
sum
(
&
TableFileSchema
::
size_
)),
auto
selected
=
ConnectorPtr
->
select
(
columns
(
sum
(
&
TableFileSchema
::
size_
)),
...
@@ -578,11 +701,10 @@ Status DBMetaImpl::Size(long &result) {
...
@@ -578,11 +701,10 @@ Status DBMetaImpl::Size(long &result) {
if
(
!
std
::
get
<
0
>
(
sub_query
))
{
if
(
!
std
::
get
<
0
>
(
sub_query
))
{
continue
;
continue
;
}
}
result
+=
(
long
)
(
*
std
::
get
<
0
>
(
sub_query
));
result
+=
(
uint64_t
)
(
*
std
::
get
<
0
>
(
sub_query
));
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -609,7 +731,8 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
...
@@ -609,7 +731,8 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
id_
=
std
::
get
<
0
>
(
file
);
table_file
.
size_
=
std
::
get
<
1
>
(
file
);
table_file
.
size_
=
std
::
get
<
1
>
(
file
);
ids
.
push_back
(
table_file
.
id_
);
ids
.
push_back
(
table_file
.
id_
);
LOG
(
DEBUG
)
<<
"Discard table_file.id="
<<
table_file
.
file_id_
<<
" table_file.size="
<<
table_file
.
size_
;
ENGINE_LOG_DEBUG
<<
"Discard table_file.id="
<<
table_file
.
file_id_
<<
" table_file.size="
<<
table_file
.
size_
;
to_discard_size
-=
table_file
.
size_
;
to_discard_size
-=
table_file
.
size_
;
}
}
...
@@ -626,11 +749,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
...
@@ -626,11 +749,9 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
));
));
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
DiscardFiles
(
to_discard_size
);
return
DiscardFiles
(
to_discard_size
);
}
}
...
@@ -644,9 +765,8 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
...
@@ -644,9 +765,8 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
server
::
Metrics
::
GetInstance
().
MetaAccessDurationSecondsHistogramObserve
(
total_time
);
server
::
Metrics
::
GetInstance
().
MetaAccessDurationSecondsHistogramObserve
(
total_time
);
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
ENGINE_LOG_DEBUG
<<
"table_id= "
<<
file_schema
.
table_id_
<<
" file_id="
<<
file_schema
.
file_id_
;
LOG
(
DEBUG
)
<<
"table_id= "
<<
file_schema
.
table_id_
<<
" file_id="
<<
file_schema
.
file_id_
;
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -669,8 +789,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
...
@@ -669,8 +789,7 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
return
Status
::
DBTransactionError
(
"Update files Error"
);
return
Status
::
DBTransactionError
(
"Update files Error"
);
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
@@ -708,8 +827,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
...
@@ -708,8 +827,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
...
@@ -747,14 +865,13 @@ Status DBMetaImpl::CleanUp() {
...
@@ -747,14 +865,13 @@ Status DBMetaImpl::CleanUp() {
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
}
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
Status
DBMetaImpl
::
Count
(
const
std
::
string
&
table_id
,
long
&
result
)
{
Status
DBMetaImpl
::
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
{
try
{
try
{
...
@@ -785,10 +902,10 @@ Status DBMetaImpl::Count(const std::string &table_id, long &result) {
...
@@ -785,10 +902,10 @@ Status DBMetaImpl::Count(const std::string &table_id, long &result) {
}
}
result
/=
table_schema
.
dimension_
;
result
/=
table_schema
.
dimension_
;
result
/=
sizeof
(
float
);
}
catch
(
std
::
exception
&
e
)
{
}
catch
(
std
::
exception
&
e
)
{
LOG
(
DEBUG
)
<<
e
.
what
();
HandleException
(
e
);
throw
e
;
}
}
return
Status
::
OK
();
return
Status
::
OK
();
}
}
...
...
cpp/src/db/DBMetaImpl.h
浏览文件 @
db3f0e9b
...
@@ -20,8 +20,10 @@ public:
...
@@ -20,8 +20,10 @@ public:
DBMetaImpl
(
const
DBMetaOptions
&
options_
);
DBMetaImpl
(
const
DBMetaOptions
&
options_
);
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
override
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
)
override
;
virtual
Status
DescribeTable
(
TableSchema
&
group_info_
)
override
;
virtual
Status
DescribeTable
(
TableSchema
&
group_info_
)
override
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
override
;
virtual
Status
AllTables
(
std
::
vector
<
TableSchema
>&
table_schema_array
)
override
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
override
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
override
;
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
...
@@ -40,11 +42,15 @@ public:
...
@@ -40,11 +42,15 @@ public:
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
override
;
DatePartionedTableFilesSchema
&
files
)
override
;
virtual
Status
FilesToDelete
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
override
;
virtual
Status
FilesToIndex
(
TableFilesSchema
&
)
override
;
virtual
Status
FilesToIndex
(
TableFilesSchema
&
)
override
;
virtual
Status
Archive
()
override
;
virtual
Status
Archive
()
override
;
virtual
Status
Size
(
long
&
result
)
override
;
virtual
Status
Size
(
uint64_t
&
result
)
override
;
virtual
Status
CleanUp
()
override
;
virtual
Status
CleanUp
()
override
;
...
@@ -52,7 +58,7 @@ public:
...
@@ -52,7 +58,7 @@ public:
virtual
Status
DropAll
()
override
;
virtual
Status
DropAll
()
override
;
virtual
Status
Count
(
const
std
::
string
&
table_id
,
long
&
result
)
override
;
virtual
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
override
;
virtual
~
DBMetaImpl
();
virtual
~
DBMetaImpl
();
...
...
cpp/src/db/Meta.h
浏览文件 @
db3f0e9b
...
@@ -24,8 +24,10 @@ public:
...
@@ -24,8 +24,10 @@ public:
using
Ptr
=
std
::
shared_ptr
<
Meta
>
;
using
Ptr
=
std
::
shared_ptr
<
Meta
>
;
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
CreateTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
)
=
0
;
virtual
Status
DescribeTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
DescribeTable
(
TableSchema
&
table_schema
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
=
0
;
virtual
Status
HasTable
(
const
std
::
string
&
table_id
,
bool
&
has_or_not
)
=
0
;
virtual
Status
AllTables
(
std
::
vector
<
TableSchema
>&
table_schema_array
)
=
0
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
CreateTableFile
(
TableFileSchema
&
file_schema
)
=
0
;
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
virtual
Status
DropPartitionsByDates
(
const
std
::
string
&
table_id
,
...
@@ -43,7 +45,11 @@ public:
...
@@ -43,7 +45,11 @@ public:
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
virtual
Status
FilesToMerge
(
const
std
::
string
&
table_id
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
Size
(
long
&
result
)
=
0
;
virtual
Status
FilesToDelete
(
const
std
::
string
&
table_id
,
const
DatesT
&
partition
,
DatePartionedTableFilesSchema
&
files
)
=
0
;
virtual
Status
Size
(
uint64_t
&
result
)
=
0
;
virtual
Status
Archive
()
=
0
;
virtual
Status
Archive
()
=
0
;
...
@@ -54,7 +60,7 @@ public:
...
@@ -54,7 +60,7 @@ public:
virtual
Status
DropAll
()
=
0
;
virtual
Status
DropAll
()
=
0
;
virtual
Status
Count
(
const
std
::
string
&
table_id
,
long
&
result
)
=
0
;
virtual
Status
Count
(
const
std
::
string
&
table_id
,
uint64_t
&
result
)
=
0
;
static
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
=
0
);
static
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
=
0
);
static
DateT
GetDate
();
static
DateT
GetDate
();
...
...
cpp/src/sdk/examples/simple/src/ClientTest.cpp
浏览文件 @
db3f0e9b
...
@@ -28,7 +28,7 @@ namespace {
...
@@ -28,7 +28,7 @@ namespace {
std
::
cout
<<
"Table name: "
<<
tb_schema
.
table_name
<<
std
::
endl
;
std
::
cout
<<
"Table name: "
<<
tb_schema
.
table_name
<<
std
::
endl
;
std
::
cout
<<
"Table index type: "
<<
(
int
)
tb_schema
.
index_type
<<
std
::
endl
;
std
::
cout
<<
"Table index type: "
<<
(
int
)
tb_schema
.
index_type
<<
std
::
endl
;
std
::
cout
<<
"Table dimension: "
<<
tb_schema
.
dimension
<<
std
::
endl
;
std
::
cout
<<
"Table dimension: "
<<
tb_schema
.
dimension
<<
std
::
endl
;
std
::
cout
<<
"Table store raw data: "
<<
tb_schema
.
store_raw_vector
<<
std
::
endl
;
std
::
cout
<<
"Table store raw data: "
<<
(
tb_schema
.
store_raw_vector
?
"true"
:
"false"
)
<<
std
::
endl
;
BLOCK_SPLITER
BLOCK_SPLITER
}
}
...
@@ -148,7 +148,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
...
@@ -148,7 +148,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std
::
cout
<<
"ShowTables function call status: "
<<
stat
.
ToString
()
<<
std
::
endl
;
std
::
cout
<<
"ShowTables function call status: "
<<
stat
.
ToString
()
<<
std
::
endl
;
std
::
cout
<<
"All tables: "
<<
std
::
endl
;
std
::
cout
<<
"All tables: "
<<
std
::
endl
;
for
(
auto
&
table
:
tables
)
{
for
(
auto
&
table
:
tables
)
{
std
::
cout
<<
"
\t
"
<<
table
<<
std
::
endl
;
int64_t
row_count
=
0
;
stat
=
conn
->
GetTableRowCount
(
table
,
row_count
);
std
::
cout
<<
"
\t
"
<<
table
<<
"("
<<
row_count
<<
" rows)"
<<
std
::
endl
;
}
}
}
}
...
@@ -192,10 +194,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
...
@@ -192,10 +194,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
PrintSearchResult
(
topk_query_result_array
);
PrintSearchResult
(
topk_query_result_array
);
}
}
//
{//delete table
{
//delete table
//
Status stat = conn->DeleteTable(TABLE_NAME);
Status
stat
=
conn
->
DeleteTable
(
TABLE_NAME
);
//
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
std
::
cout
<<
"DeleteTable function call status: "
<<
stat
.
ToString
()
<<
std
::
endl
;
//
}
}
{
//server status
{
//server status
std
::
string
status
=
conn
->
ServerStatus
();
std
::
string
status
=
conn
->
ServerStatus
();
...
...
cpp/src/sdk/include/Status.h
浏览文件 @
db3f0e9b
...
@@ -72,7 +72,7 @@ class Status {
...
@@ -72,7 +72,7 @@ class Status {
* @return, the status is assigned.
* @return, the status is assigned.
*
*
*/
*/
inline
Status
&
operator
=
(
const
Status
&
s
);
Status
&
operator
=
(
const
Status
&
s
);
/**
/**
* @brief Status
* @brief Status
...
@@ -93,7 +93,7 @@ class Status {
...
@@ -93,7 +93,7 @@ class Status {
* @return, the status is moved.
* @return, the status is moved.
*
*
*/
*/
inline
Status
&
operator
=
(
Status
&&
s
)
noexcept
;
Status
&
operator
=
(
Status
&&
s
)
noexcept
;
/**
/**
* @brief Status
* @brief Status
...
...
cpp/src/sdk/src/client/ClientProxy.cpp
浏览文件 @
db3f0e9b
...
@@ -77,7 +77,7 @@ ClientProxy::Disconnect() {
...
@@ -77,7 +77,7 @@ ClientProxy::Disconnect() {
std
::
string
std
::
string
ClientProxy
::
ClientVersion
()
const
{
ClientProxy
::
ClientVersion
()
const
{
return
std
::
string
(
"v1.0"
)
;
return
""
;
}
}
Status
Status
...
@@ -221,6 +221,8 @@ ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_sch
...
@@ -221,6 +221,8 @@ ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_sch
table_schema
.
table_name
=
thrift_schema
.
table_name
;
table_schema
.
table_name
=
thrift_schema
.
table_name
;
table_schema
.
index_type
=
(
IndexType
)
thrift_schema
.
index_type
;
table_schema
.
index_type
=
(
IndexType
)
thrift_schema
.
index_type
;
table_schema
.
dimension
=
thrift_schema
.
dimension
;
table_schema
.
store_raw_vector
=
thrift_schema
.
store_raw_vector
;
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
return
Status
(
StatusCode
::
UnknownError
,
"failed to describe table: "
+
std
::
string
(
ex
.
what
()));
return
Status
(
StatusCode
::
UnknownError
,
"failed to describe table: "
+
std
::
string
(
ex
.
what
()));
...
...
cpp/src/sdk/src/interface/ConnectionImpl.cpp
浏览文件 @
db3f0e9b
...
@@ -4,6 +4,7 @@
...
@@ -4,6 +4,7 @@
* Proprietary and confidential.
* Proprietary and confidential.
******************************************************************************/
******************************************************************************/
#include "ConnectionImpl.h"
#include "ConnectionImpl.h"
#include "version.h"
namespace
megasearch
{
namespace
megasearch
{
...
@@ -47,7 +48,7 @@ ConnectionImpl::Disconnect() {
...
@@ -47,7 +48,7 @@ ConnectionImpl::Disconnect() {
std
::
string
std
::
string
ConnectionImpl
::
ClientVersion
()
const
{
ConnectionImpl
::
ClientVersion
()
const
{
return
client_proxy_
->
ClientVersion
()
;
return
MEGASEARCH_VERSION
;
}
}
Status
Status
...
...
cpp/src/server/MegasearchTask.cpp
浏览文件 @
db3f0e9b
...
@@ -21,9 +21,6 @@ static const std::string DQL_TASK_GROUP = "dql";
...
@@ -21,9 +21,6 @@ static const std::string DQL_TASK_GROUP = "dql";
static
const
std
::
string
DDL_DML_TASK_GROUP
=
"ddl_dml"
;
static
const
std
::
string
DDL_DML_TASK_GROUP
=
"ddl_dml"
;
static
const
std
::
string
PING_TASK_GROUP
=
"ping"
;
static
const
std
::
string
PING_TASK_GROUP
=
"ping"
;
static
const
std
::
string
VECTOR_UID
=
"uid"
;
static
const
uint64_t
USE_MT
=
5000
;
using
DB_META
=
zilliz
::
vecwise
::
engine
::
meta
::
Meta
;
using
DB_META
=
zilliz
::
vecwise
::
engine
::
meta
::
Meta
;
using
DB_DATE
=
zilliz
::
vecwise
::
engine
::
meta
::
DateT
;
using
DB_DATE
=
zilliz
::
vecwise
::
engine
::
meta
::
DateT
;
...
@@ -76,6 +73,20 @@ namespace {
...
@@ -76,6 +73,20 @@ namespace {
return
map_type
[
type
];
return
map_type
[
type
];
}
}
int
IndexType
(
engine
::
EngineType
type
)
{
static
std
::
map
<
engine
::
EngineType
,
int
>
map_type
=
{
{
engine
::
EngineType
::
INVALID
,
0
},
{
engine
::
EngineType
::
FAISS_IDMAP
,
1
},
{
engine
::
EngineType
::
FAISS_IVFFLAT
,
2
},
};
if
(
map_type
.
find
(
type
)
==
map_type
.
end
())
{
return
0
;
}
return
map_type
[
type
];
}
ServerError
ServerError
ConvertRowRecordToFloatArray
(
const
std
::
vector
<
thrift
::
RowRecord
>&
record_array
,
ConvertRowRecordToFloatArray
(
const
std
::
vector
<
thrift
::
RowRecord
>&
record_array
,
uint64_t
dimension
,
uint64_t
dimension
,
...
@@ -174,16 +185,17 @@ ServerError CreateTableTask::OnExecute() {
...
@@ -174,16 +185,17 @@ ServerError CreateTableTask::OnExecute() {
//step 2: create table
//step 2: create table
engine
::
Status
stat
=
DB
()
->
CreateTable
(
table_info
);
engine
::
Status
stat
=
DB
()
->
CreateTable
(
table_info
);
if
(
!
stat
.
ok
())
{
//table could exist
if
(
!
stat
.
ok
())
{
//table could exist
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
SERVER_LOG_ERROR
<<
error_msg_
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
SERVER_SUCCESS
;
return
error_code_
;
}
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
ex
.
what
();
error_msg_
=
ex
.
what
();
SERVER_LOG_ERROR
<<
error_msg_
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
SERVER_UNEXPECTED_ERROR
;
return
error_code_
;
}
}
rc
.
Record
(
"done"
);
rc
.
Record
(
"done"
);
...
@@ -215,10 +227,13 @@ ServerError DescribeTableTask::OnExecute() {
...
@@ -215,10 +227,13 @@ ServerError DescribeTableTask::OnExecute() {
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
SERVER_LOG_ERROR
<<
error_msg_
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
return
error_code_
;
}
else
{
}
}
schema_
.
table_name
=
table_info
.
table_id_
;
schema_
.
index_type
=
IndexType
((
engine
::
EngineType
)
table_info
.
engine_type_
);
schema_
.
dimension
=
table_info
.
dimension_
;
schema_
.
store_raw_vector
=
table_info
.
store_raw_data_
;
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
ex
.
what
();
error_msg_
=
ex
.
what
();
...
@@ -243,16 +258,53 @@ BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) {
...
@@ -243,16 +258,53 @@ BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) {
}
}
ServerError
DeleteTableTask
::
OnExecute
()
{
ServerError
DeleteTableTask
::
OnExecute
()
{
error_code_
=
SERVER_NOT_IMPLEMENT
;
try
{
error_msg_
=
"delete table not implemented"
;
TimeRecorder
rc
(
"DeleteTableTask"
);
SERVER_LOG_ERROR
<<
error_msg_
;
//step 1: check validation
if
(
table_name_
.
empty
())
{
error_code_
=
SERVER_INVALID_ARGUMENT
;
error_msg_
=
"Table name cannot be empty"
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
//step 2: check table existence
engine
::
meta
::
TableSchema
table_info
;
table_info
.
table_id_
=
table_name_
;
engine
::
Status
stat
=
DB
()
->
DescribeTable
(
table_info
);
if
(
!
stat
.
ok
())
{
error_code_
=
SERVER_TABLE_NOT_EXIST
;
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
rc
.
Record
(
"check validation"
);
return
SERVER_NOT_IMPLEMENT
;
//step 3: delete table
std
::
vector
<
DB_DATE
>
dates
;
stat
=
DB
()
->
DeleteTable
(
table_name_
,
dates
);
if
(
!
stat
.
ok
())
{
SERVER_LOG_ERROR
<<
"Engine failed: "
<<
stat
.
ToString
();
return
SERVER_UNEXPECTED_ERROR
;
}
rc
.
Record
(
"deleta table"
);
rc
.
Elapse
(
"totally cost"
);
}
catch
(
std
::
exception
&
ex
)
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
ex
.
what
();
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
return
SERVER_SUCCESS
;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask
::
ShowTablesTask
(
std
::
vector
<
std
::
string
>&
tables
)
ShowTablesTask
::
ShowTablesTask
(
std
::
vector
<
std
::
string
>&
tables
)
:
BaseTask
(
PING
_TASK_GROUP
),
:
BaseTask
(
DQL
_TASK_GROUP
),
tables_
(
tables
)
{
tables_
(
tables
)
{
}
}
...
@@ -262,6 +314,19 @@ BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
...
@@ -262,6 +314,19 @@ BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
}
}
ServerError
ShowTablesTask
::
OnExecute
()
{
ServerError
ShowTablesTask
::
OnExecute
()
{
std
::
vector
<
engine
::
meta
::
TableSchema
>
schema_array
;
engine
::
Status
stat
=
DB
()
->
AllTables
(
schema_array
);
if
(
!
stat
.
ok
())
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
tables_
.
clear
();
for
(
auto
&
schema
:
schema_array
)
{
tables_
.
push_back
(
schema
.
table_id_
);
}
return
SERVER_SUCCESS
;
return
SERVER_SUCCESS
;
}
}
...
@@ -468,17 +533,39 @@ BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t&
...
@@ -468,17 +533,39 @@ BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t&
}
}
ServerError
GetTableRowCountTask
::
OnExecute
()
{
ServerError
GetTableRowCountTask
::
OnExecute
()
{
if
(
table_name_
.
empty
())
{
try
{
TimeRecorder
rc
(
"GetTableRowCountTask"
);
//step 1: check validation
if
(
table_name_
.
empty
())
{
error_code_
=
SERVER_INVALID_ARGUMENT
;
error_msg_
=
"Table name cannot be empty"
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
//step 2: get row count
uint64_t
row_count
=
0
;
engine
::
Status
stat
=
DB
()
->
GetTableRowCount
(
table_name_
,
row_count
);
if
(
!
stat
.
ok
())
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
"Engine failed: "
+
stat
.
ToString
();
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
row_count_
=
(
int64_t
)
row_count
;
rc
.
Elapse
(
"totally cost"
);
}
catch
(
std
::
exception
&
ex
)
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
"Table name cannot be empty"
;
error_msg_
=
ex
.
what
()
;
SERVER_LOG_ERROR
<<
error_msg_
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
return
error_code_
;
}
}
error_code_
=
SERVER_NOT_IMPLEMENT
;
return
SERVER_SUCCESS
;
error_msg_
=
"Not implemented"
;
SERVER_LOG_ERROR
<<
error_msg_
;
return
error_code_
;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
...
...
cpp/unittest/db/db_tests.cpp
浏览文件 @
db3f0e9b
...
@@ -64,7 +64,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
...
@@ -64,7 +64,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
static
const
std
::
string
group_name
=
"test_group"
;
static
const
std
::
string
group_name
=
"test_group"
;
static
const
int
group_dim
=
256
;
static
const
int
group_dim
=
256
;
long
size
;
uint64_t
size
;
engine
::
meta
::
TableSchema
group_info
;
engine
::
meta
::
TableSchema
group_info
;
group_info
.
dimension_
=
group_dim
;
group_info
.
dimension_
=
group_dim
;
...
@@ -149,8 +149,8 @@ TEST_F(DBTest, DB_TEST) {
...
@@ -149,8 +149,8 @@ TEST_F(DBTest, DB_TEST) {
INIT_TIMER
;
INIT_TIMER
;
std
::
stringstream
ss
;
std
::
stringstream
ss
;
long
count
=
0
;
uint64_t
count
=
0
;
long
prev_count
=
-
1
;
uint64_t
prev_count
=
0
;
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
ss
.
str
(
""
);
ss
.
str
(
""
);
...
...
cpp/unittest/metrics/metrics_test.cpp
浏览文件 @
db3f0e9b
...
@@ -72,8 +72,8 @@ TEST_F(DBTest, Metric_Tes) {
...
@@ -72,8 +72,8 @@ TEST_F(DBTest, Metric_Tes) {
INIT_TIMER
;
INIT_TIMER
;
std
::
stringstream
ss
;
std
::
stringstream
ss
;
long
count
=
0
;
uint64_t
count
=
0
;
long
prev_count
=
-
1
;
uint64_t
prev_count
=
0
;
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
for
(
auto
j
=
0
;
j
<
10
;
++
j
)
{
ss
.
str
(
""
);
ss
.
str
(
""
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录