Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
af0c4e68
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
af0c4e68
编写于
9月 20, 2019
作者:
G
groot
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-578 makesure milvus5.0 dont crack 0.3.1 data
Former-commit-id: 43ebf9e458cb28f670c68e7ca4e18c545732d9cd
上级
2284b729
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
306 addition
and
135 deletion
+306
-135
cpp/src/db/Utils.cpp
cpp/src/db/Utils.cpp
+36
-0
cpp/src/db/Utils.h
cpp/src/db/Utils.h
+11
-0
cpp/src/db/meta/MetaFactory.cpp
cpp/src/db/meta/MetaFactory.cpp
+16
-30
cpp/src/db/meta/MySQLMetaImpl.cpp
cpp/src/db/meta/MySQLMetaImpl.cpp
+218
-93
cpp/src/db/meta/MySQLMetaImpl.h
cpp/src/db/meta/MySQLMetaImpl.h
+3
-0
cpp/src/db/meta/SqliteMetaImpl.cpp
cpp/src/db/meta/SqliteMetaImpl.cpp
+18
-11
cpp/src/db/meta/SqliteMetaImpl.h
cpp/src/db/meta/SqliteMetaImpl.h
+3
-1
cpp/src/utils/Error.h
cpp/src/utils/Error.h
+1
-0
未找到文件。
cpp/src/db/Utils.cpp
浏览文件 @
af0c4e68
...
...
@@ -21,6 +21,7 @@
#include <mutex>
#include <chrono>
#include <regex>
#include <boost/filesystem.hpp>
namespace
zilliz
{
...
...
@@ -195,6 +196,41 @@ meta::DateT GetDate() {
return
GetDate
(
std
::
time
(
nullptr
),
0
);
}
// URI format: dialect://username:password@host:port/database
Status
ParseMetaUri
(
const
std
::
string
&
uri
,
MetaUriInfo
&
info
)
{
std
::
string
dialect_regex
=
"(.*)"
;
std
::
string
username_tegex
=
"(.*)"
;
std
::
string
password_regex
=
"(.*)"
;
std
::
string
host_regex
=
"(.*)"
;
std
::
string
port_regex
=
"(.*)"
;
std
::
string
db_name_regex
=
"(.*)"
;
std
::
string
uri_regex_str
=
dialect_regex
+
"
\\
:
\\
/
\\
/"
+
username_tegex
+
"
\\
:"
+
password_regex
+
"
\\
@"
+
host_regex
+
"
\\
:"
+
port_regex
+
"
\\
/"
+
db_name_regex
;
std
::
regex
uri_regex
(
uri_regex_str
);
std
::
smatch
pieces_match
;
if
(
std
::
regex_match
(
uri
,
pieces_match
,
uri_regex
))
{
info
.
dialect_
=
pieces_match
[
1
].
str
();
info
.
username_
=
pieces_match
[
2
].
str
();
info
.
password_
=
pieces_match
[
3
].
str
();
info
.
host_
=
pieces_match
[
4
].
str
();
info
.
port_
=
pieces_match
[
5
].
str
();
info
.
db_name_
=
pieces_match
[
6
].
str
();
//TODO: verify host, port...
}
else
{
return
Status
(
DB_INVALID_META_URI
,
"Invalid meta uri: "
+
uri
);
}
return
Status
::
OK
();
}
}
// namespace utils
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/Utils.h
浏览文件 @
af0c4e68
...
...
@@ -44,6 +44,17 @@ meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
meta
::
DateT
GetDate
();
meta
::
DateT
GetDateWithDelta
(
int
day_delta
);
struct
MetaUriInfo
{
std
::
string
dialect_
;
std
::
string
username_
;
std
::
string
password_
;
std
::
string
host_
;
std
::
string
port_
;
std
::
string
db_name_
;
};
Status
ParseMetaUri
(
const
std
::
string
&
uri
,
MetaUriInfo
&
info
);
}
// namespace utils
}
// namespace engine
}
// namespace milvus
...
...
cpp/src/db/meta/MetaFactory.cpp
浏览文件 @
af0c4e68
...
...
@@ -20,13 +20,14 @@
#include "MySQLMetaImpl.h"
#include "utils/Log.h"
#include "utils/Exception.h"
#include "db/Utils.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
#include <cstdlib>
#include <string>
#include <
regex
>
#include <
string.h
>
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -49,38 +50,23 @@ namespace engine {
meta
::
MetaPtr
MetaFactory
::
Build
(
const
DBMetaOptions
&
metaOptions
,
const
int
&
mode
)
{
std
::
string
uri
=
metaOptions
.
backend_uri
;
std
::
string
dialectRegex
=
"(.*)"
;
std
::
string
usernameRegex
=
"(.*)"
;
std
::
string
passwordRegex
=
"(.*)"
;
std
::
string
hostRegex
=
"(.*)"
;
std
::
string
portRegex
=
"(.*)"
;
std
::
string
dbNameRegex
=
"(.*)"
;
std
::
string
uriRegexStr
=
dialectRegex
+
"
\\
:
\\
/
\\
/"
+
usernameRegex
+
"
\\
:"
+
passwordRegex
+
"
\\
@"
+
hostRegex
+
"
\\
:"
+
portRegex
+
"
\\
/"
+
dbNameRegex
;
std
::
regex
uriRegex
(
uriRegexStr
);
std
::
smatch
pieces_match
;
if
(
std
::
regex_match
(
uri
,
pieces_match
,
uriRegex
))
{
std
::
string
dialect
=
pieces_match
[
1
].
str
();
std
::
transform
(
dialect
.
begin
(),
dialect
.
end
(),
dialect
.
begin
(),
::
tolower
);
if
(
dialect
.
find
(
"mysql"
)
!=
std
::
string
::
npos
)
{
ENGINE_LOG_INFO
<<
"Using MySQL"
;
return
std
::
make_shared
<
meta
::
MySQLMetaImpl
>
(
metaOptions
,
mode
);
}
else
if
(
dialect
.
find
(
"sqlite"
)
!=
std
::
string
::
npos
)
{
ENGINE_LOG_INFO
<<
"Using SQLite"
;
return
std
::
make_shared
<
meta
::
SqliteMetaImpl
>
(
metaOptions
);
}
else
{
ENGINE_LOG_ERROR
<<
"Invalid dialect in URI: dialect = "
<<
dialect
;
throw
InvalidArgumentException
(
"URI dialect is not mysql / sqlite"
);
}
}
else
{
utils
::
MetaUriInfo
uri_info
;
auto
status
=
utils
::
ParseMetaUri
(
uri
,
uri_info
);
if
(
!
status
.
ok
())
{
ENGINE_LOG_ERROR
<<
"Wrong URI format: URI = "
<<
uri
;
throw
InvalidArgumentException
(
"Wrong URI format "
);
}
if
(
strcasecmp
(
uri_info
.
dialect_
.
c_str
(),
"mysql"
)
==
0
)
{
ENGINE_LOG_INFO
<<
"Using MySQL"
;
return
std
::
make_shared
<
meta
::
MySQLMetaImpl
>
(
metaOptions
,
mode
);
}
else
if
(
strcasecmp
(
uri_info
.
dialect_
.
c_str
(),
"sqlite"
)
==
0
)
{
ENGINE_LOG_INFO
<<
"Using SQLite"
;
return
std
::
make_shared
<
meta
::
SqliteMetaImpl
>
(
metaOptions
);
}
else
{
ENGINE_LOG_ERROR
<<
"Invalid dialect in URI: dialect = "
<<
uri_info
.
dialect_
;
throw
InvalidArgumentException
(
"URI dialect is not mysql / sqlite"
);
}
}
}
// namespace engine
...
...
cpp/src/db/meta/MySQLMetaImpl.cpp
浏览文件 @
af0c4e68
...
...
@@ -19,21 +19,22 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "utils/Log.h"
#include "utils/Exception.h"
#include "MetaConsts.h"
#include "metrics/Metrics.h"
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
#include <mutex>
#include <thread>
#include "mysql++/mysql++.h"
#include <string.h>
#include <boost/filesystem.hpp>
#include <mysql++/mysql++.h>
namespace
zilliz
{
...
...
@@ -56,8 +57,112 @@ Status HandleException(const std::string &desc, const char* what = nullptr) {
}
}
class
MetaField
{
public:
MetaField
(
const
std
::
string
&
name
,
const
std
::
string
&
type
,
const
std
::
string
&
setting
)
:
name_
(
name
),
type_
(
type
),
setting_
(
setting
)
{
}
std
::
string
name
()
const
{
return
name_
;
}
std
::
string
ToString
()
const
{
return
name_
+
" "
+
type_
+
" "
+
setting_
;
}
// mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
// we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
bool
IsEqual
(
const
MetaField
&
field
)
const
{
size_t
name_len_min
=
field
.
name_
.
length
()
>
name_
.
length
()
?
name_
.
length
()
:
field
.
name_
.
length
();
size_t
type_len_min
=
field
.
type_
.
length
()
>
type_
.
length
()
?
type_
.
length
()
:
field
.
type_
.
length
();
return
strncasecmp
(
field
.
name_
.
c_str
(),
name_
.
c_str
(),
name_len_min
)
==
0
&&
strncasecmp
(
field
.
type_
.
c_str
(),
type_
.
c_str
(),
type_len_min
)
==
0
;
}
private:
std
::
string
name_
;
std
::
string
type_
;
std
::
string
setting_
;
};
using
MetaFields
=
std
::
vector
<
MetaField
>
;
class
MetaSchema
{
public:
MetaSchema
(
const
std
::
string
&
name
,
const
MetaFields
&
fields
)
:
name_
(
name
),
fields_
(
fields
)
{
}
std
::
string
name
()
const
{
return
name_
;
}
std
::
string
ToString
()
const
{
std
::
string
result
;
for
(
auto
&
field
:
fields_
)
{
if
(
!
result
.
empty
())
{
result
+=
","
;
}
result
+=
field
.
ToString
();
}
return
result
;
}
//if the outer fields contains all this MetaSchema fields, return true
//otherwise return false
bool
IsEqual
(
const
MetaFields
&
fields
)
const
{
std
::
vector
<
std
::
string
>
found_field
;
for
(
const
auto
&
this_field
:
fields_
)
{
for
(
const
auto
&
outer_field
:
fields
)
{
if
(
this_field
.
IsEqual
(
outer_field
))
{
found_field
.
push_back
(
this_field
.
name
());
break
;
}
}
}
return
found_field
.
size
()
==
fields_
.
size
();
}
private:
std
::
string
name_
;
MetaFields
fields_
;
};
//Tables schema
static
const
MetaSchema
TABLES_SCHEMA
(
META_TABLES
,
{
MetaField
(
"id"
,
"BIGINT"
,
"PRIMARY KEY AUTO_INCREMENT"
),
MetaField
(
"table_id"
,
"VARCHAR(255)"
,
"UNIQUE NOT NULL"
),
MetaField
(
"state"
,
"INT"
,
"NOT NULL"
),
MetaField
(
"dimension"
,
"SMALLINT"
,
"NOT NULL"
),
MetaField
(
"created_on"
,
"BIGINT"
,
"NOT NULL"
),
MetaField
(
"flag"
,
"BIGINT"
,
"DEFAULT 0 NOT NULL"
),
MetaField
(
"index_file_size"
,
"BIGINT"
,
"DEFAULT 1024 NOT NULL"
),
MetaField
(
"engine_type"
,
"INT"
,
"DEFAULT 1 NOT NULL"
),
MetaField
(
"nlist"
,
"INT"
,
"DEFAULT 16384 NOT NULL"
),
MetaField
(
"metric_type"
,
"INT"
,
"DEFAULT 1 NOT NULL"
),
});
//TableFiles schema
static
const
MetaSchema
TABLEFILES_SCHEMA
(
META_TABLEFILES
,
{
MetaField
(
"id"
,
"BIGINT"
,
"PRIMARY KEY AUTO_INCREMENT"
),
MetaField
(
"table_id"
,
"VARCHAR(255)"
,
"NOT NULL"
),
MetaField
(
"engine_type"
,
"INT"
,
"DEFAULT 1 NOT NULL"
),
MetaField
(
"file_id"
,
"VARCHAR(255)"
,
"NOT NULL"
),
MetaField
(
"file_type"
,
"INT"
,
"DEFAULT 0 NOT NULL"
),
MetaField
(
"file_size"
,
"BIGINT"
,
"DEFAULT 0 NOT NULL"
),
MetaField
(
"row_count"
,
"BIGINT"
,
"DEFAULT 0 NOT NULL"
),
MetaField
(
"updated_time"
,
"BIGINT"
,
"NOT NULL"
),
MetaField
(
"created_on"
,
"BIGINT"
,
"NOT NULL"
),
MetaField
(
"date"
,
"INT"
,
"DEFAULT -1 NOT NULL"
),
});
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
MySQLMetaImpl
::
MySQLMetaImpl
(
const
DBMetaOptions
&
options_
,
const
int
&
mode
)
:
options_
(
options_
),
mode_
(
mode
)
{
...
...
@@ -84,7 +189,56 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) {
return
Status
::
OK
();
}
void
MySQLMetaImpl
::
ValidateMetaSchema
()
{
if
(
nullptr
==
mysql_connection_pool_
)
{
return
;
}
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
);
if
(
connectionPtr
==
nullptr
)
{
return
;
}
auto
validate_func
=
[
&
](
const
MetaSchema
&
schema
)
{
Query
query_statement
=
connectionPtr
->
query
();
query_statement
<<
"DESC "
<<
schema
.
name
()
<<
";"
;
MetaFields
exist_fields
;
try
{
StoreQueryResult
res
=
query_statement
.
store
();
for
(
size_t
i
=
0
;
i
<
res
.
num_rows
();
i
++
)
{
const
Row
&
row
=
res
[
i
];
std
::
string
name
,
type
;
row
[
"Field"
].
to_string
(
name
);
row
[
"Type"
].
to_string
(
type
);
exist_fields
.
push_back
(
MetaField
(
name
,
type
,
""
));
}
}
catch
(
std
::
exception
&
e
)
{
ENGINE_LOG_DEBUG
<<
"Meta table '"
<<
schema
.
name
()
<<
"' not exist and will be created"
;
}
if
(
exist_fields
.
empty
())
{
return
true
;
}
return
schema
.
IsEqual
(
exist_fields
);
};
//verify Tables
if
(
!
validate_func
(
TABLES_SCHEMA
))
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta Tables schema is created by Milvus old version"
);
}
//verufy TableFiles
if
(
!
validate_func
(
TABLEFILES_SCHEMA
))
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta TableFiles schema is created by Milvus old version"
);
}
}
Status
MySQLMetaImpl
::
Initialize
()
{
//step 1: create db root path
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
if
(
!
ret
)
{
...
...
@@ -96,108 +250,79 @@ Status MySQLMetaImpl::Initialize() {
std
::
string
uri
=
options_
.
backend_uri
;
std
::
string
dialectRegex
=
"(.*)"
;
std
::
string
usernameRegex
=
"(.*)"
;
std
::
string
passwordRegex
=
"(.*)"
;
std
::
string
hostRegex
=
"(.*)"
;
std
::
string
portRegex
=
"(.*)"
;
std
::
string
dbNameRegex
=
"(.*)"
;
std
::
string
uriRegexStr
=
dialectRegex
+
"
\\
:
\\
/
\\
/"
+
usernameRegex
+
"
\\
:"
+
passwordRegex
+
"
\\
@"
+
hostRegex
+
"
\\
:"
+
portRegex
+
"
\\
/"
+
dbNameRegex
;
std
::
regex
uriRegex
(
uriRegexStr
);
std
::
smatch
pieces_match
;
if
(
std
::
regex_match
(
uri
,
pieces_match
,
uriRegex
))
{
std
::
string
dialect
=
pieces_match
[
1
].
str
();
std
::
transform
(
dialect
.
begin
(),
dialect
.
end
(),
dialect
.
begin
(),
::
tolower
);
if
(
dialect
.
find
(
"mysql"
)
==
std
::
string
::
npos
)
{
return
Status
(
DB_ERROR
,
"URI's dialect is not MySQL"
);
}
std
::
string
username
=
pieces_match
[
2
].
str
();
std
::
string
password
=
pieces_match
[
3
].
str
();
std
::
string
serverAddress
=
pieces_match
[
4
].
str
();
unsigned
int
port
=
0
;
if
(
!
pieces_match
[
5
].
str
().
empty
())
{
port
=
std
::
stoi
(
pieces_match
[
5
].
str
());
//step 2: parse and check meta uri
utils
::
MetaUriInfo
uri_info
;
auto
status
=
utils
::
ParseMetaUri
(
uri
,
uri_info
);
if
(
!
status
.
ok
())
{
std
::
string
msg
=
"Wrong URI format: "
+
uri
;
ENGINE_LOG_ERROR
<<
msg
;
throw
Exception
(
DB_INVALID_META_URI
,
msg
);
}
if
(
strcasecmp
(
uri_info
.
dialect_
.
c_str
(),
"mysql"
)
!=
0
)
{
std
::
string
msg
=
"URI's dialect is not MySQL"
;
ENGINE_LOG_ERROR
<<
msg
;
throw
Exception
(
DB_INVALID_META_URI
,
msg
);
}
//step 3: connect mysql
int
thread_hint
=
std
::
thread
::
hardware_concurrency
();
int
max_pool_size
=
(
thread_hint
==
0
)
?
8
:
thread_hint
;
unsigned
int
port
=
0
;
if
(
!
uri_info
.
port_
.
empty
())
{
port
=
std
::
stoi
(
uri_info
.
port_
);
}
mysql_connection_pool_
=
std
::
make_shared
<
MySQLConnectionPool
>
(
uri_info
.
db_name_
,
uri_info
.
username_
,
uri_info
.
password_
,
uri_info
.
host_
,
port
,
max_pool_size
);
ENGINE_LOG_DEBUG
<<
"MySQL connection pool: maximum pool size = "
<<
std
::
to_string
(
max_pool_size
);
//step 4: validate to avoid open old version schema
ValidateMetaSchema
();
//step 5: create meta tables
try
{
if
(
mode_
!=
DBOptions
::
MODE
::
READ_ONLY
)
{
CleanUp
();
}
std
::
string
dbName
=
pieces_match
[
6
].
str
();
{
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
);
int
threadHint
=
std
::
thread
::
hardware_concurrency
();
int
maxPoolSize
=
threadHint
==
0
?
8
:
threadHint
;
mysql_connection_pool_
=
std
::
make_shared
<
MySQLConnectionPool
>
(
dbName
,
username
,
password
,
serverAddress
,
port
,
maxPoolSize
);
if
(
connectionPtr
==
nullptr
)
{
return
Status
(
DB_ERROR
,
"Failed to connect to database server"
);
}
ENGINE_LOG_DEBUG
<<
"MySQL connection pool: maximum pool size = "
<<
std
::
to_string
(
maxPoolSize
);
try
{
if
(
mode_
!=
DBOptions
::
MODE
::
READ_ONLY
)
{
CleanUp
();
if
(
!
connectionPtr
->
thread_aware
())
{
ENGINE_LOG_ERROR
<<
"MySQL++ wasn't built with thread awareness! Can't run without it."
;
return
Status
(
DB_ERROR
,
"MySQL++ wasn't built with thread awareness! Can't run without it."
);
}
Query
InitializeQuery
=
connectionPtr
->
query
();
{
ScopedConnection
connectionPtr
(
*
mysql_connection_pool_
,
safe_grab
)
;
InitializeQuery
<<
"CREATE TABLE IF NOT EXISTS "
<<
TABLES_SCHEMA
.
name
()
<<
" ("
<<
TABLES_SCHEMA
.
ToString
()
+
");"
;
if
(
connectionPtr
==
nullptr
)
{
return
Status
(
DB_ERROR
,
"Failed to connect to database server"
);
}
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::Initialize: "
<<
InitializeQuery
.
str
();
if
(
!
InitializeQuery
.
exec
())
{
return
HandleException
(
"Initialization Error"
,
InitializeQuery
.
error
());
}
if
(
!
connectionPtr
->
thread_aware
())
{
ENGINE_LOG_ERROR
<<
"MySQL++ wasn't built with thread awareness! Can't run without it."
;
return
Status
(
DB_ERROR
,
"MySQL++ wasn't built with thread awareness! Can't run without it."
);
}
Query
InitializeQuery
=
connectionPtr
->
query
();
InitializeQuery
<<
"CREATE TABLE IF NOT EXISTS "
<<
TABLEFILES_SCHEMA
.
name
()
<<
" ("
<<
TABLEFILES_SCHEMA
.
ToString
()
+
");"
;
InitializeQuery
<<
"CREATE TABLE IF NOT EXISTS "
<<
META_TABLES
<<
" "
<<
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, "
<<
"table_id VARCHAR(255) UNIQUE NOT NULL, "
<<
"state INT NOT NULL, "
<<
"dimension SMALLINT NOT NULL, "
<<
"created_on BIGINT NOT NULL, "
<<
"flag BIGINT DEFAULT 0 NOT NULL, "
<<
"index_file_size BIGINT DEFAULT 1024 NOT NULL, "
<<
"engine_type INT DEFAULT 1 NOT NULL, "
<<
"nlist INT DEFAULT 16384 NOT NULL, "
<<
"metric_type INT DEFAULT 1 NOT NULL);"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::Initialize: "
<<
InitializeQuery
.
str
();
if
(
!
InitializeQuery
.
exec
())
{
return
HandleException
(
"Initialization Error"
,
InitializeQuery
.
error
());
}
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::Initialize: "
<<
InitializeQuery
.
str
();
InitializeQuery
<<
"CREATE TABLE IF NOT EXISTS "
<<
META_TABLEFILES
<<
" "
<<
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, "
<<
"table_id VARCHAR(255) NOT NULL, "
<<
"engine_type INT DEFAULT 1 NOT NULL, "
<<
"file_id VARCHAR(255) NOT NULL, "
<<
"file_type INT DEFAULT 0 NOT NULL, "
<<
"file_size BIGINT DEFAULT 0 NOT NULL, "
<<
"row_count BIGINT DEFAULT 0 NOT NULL, "
<<
"updated_time BIGINT NOT NULL, "
<<
"created_on BIGINT NOT NULL, "
<<
"date INT DEFAULT -1 NOT NULL);"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::Initialize: "
<<
InitializeQuery
.
str
();
if
(
!
InitializeQuery
.
exec
())
{
return
HandleException
(
"Initialization Error"
,
InitializeQuery
.
error
());
}
}
//Scoped Connection
if
(
!
InitializeQuery
.
exec
())
{
return
HandleException
(
"Initialization Error"
,
InitializeQuery
.
error
());
}
}
//Scoped Connection
}
catch
(
std
::
exception
&
e
)
{
return
HandleException
(
"GENERAL ERROR DURING INITIALIZATION"
,
e
.
what
());
}
}
else
{
ENGINE_LOG_ERROR
<<
"Wrong URI format. URI = "
<<
uri
;
return
Status
(
DB_ERROR
,
"Wrong URI format"
);
}
catch
(
std
::
exception
&
e
)
{
return
HandleException
(
"GENERAL ERROR DURING INITIALIZATION"
,
e
.
what
());
}
return
Status
::
OK
();
...
...
@@ -1843,7 +1968,7 @@ Status MySQLMetaImpl::DropAll() {
}
Query
dropTableQuery
=
connectionPtr
->
query
();
dropTableQuery
<<
"DROP TABLE IF EXISTS "
<<
META_TABLES
<<
", "
<<
META_TABLEFILES
<<
";"
;
dropTableQuery
<<
"DROP TABLE IF EXISTS "
<<
TABLES_SCHEMA
.
name
()
<<
", "
<<
TABLEFILES_SCHEMA
.
name
()
<<
";"
;
ENGINE_LOG_DEBUG
<<
"MySQLMetaImpl::DropAll: "
<<
dropTableQuery
.
str
();
...
...
cpp/src/db/meta/MySQLMetaImpl.h
浏览文件 @
af0c4e68
...
...
@@ -103,8 +103,11 @@ class MySQLMetaImpl : public Meta {
Status
NextFileId
(
std
::
string
&
file_id
);
Status
NextTableId
(
std
::
string
&
table_id
);
Status
DiscardFiles
(
long
long
to_discard_size
);
void
ValidateMetaSchema
();
Status
Initialize
();
private:
const
DBMetaOptions
options_
;
const
int
mode_
;
...
...
cpp/src/db/meta/SqliteMetaImpl.cpp
浏览文件 @
af0c4e68
...
...
@@ -84,7 +84,6 @@ inline auto StoragePrototype(const std::string &path) {
using
ConnectorT
=
decltype
(
StoragePrototype
(
""
));
static
std
::
unique_ptr
<
ConnectorT
>
ConnectorPtr
;
using
ConditionT
=
decltype
(
c
(
&
TableFileSchema
::
id_
)
==
1UL
);
SqliteMetaImpl
::
SqliteMetaImpl
(
const
DBMetaOptions
&
options_
)
:
options_
(
options_
)
{
...
...
@@ -111,6 +110,23 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
return
Status
::
OK
();
}
void
SqliteMetaImpl
::
ValidateMetaSchema
()
{
if
(
ConnectorPtr
==
nullptr
)
{
return
;
}
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
auto
ret
=
ConnectorPtr
->
sync_schema_simulate
();
if
(
ret
.
find
(
META_TABLES
)
!=
ret
.
end
()
&&
sqlite_orm
::
sync_schema_result
::
dropped_and_recreated
==
ret
[
META_TABLES
])
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta Tables schema is created by Milvus old version"
);
}
if
(
ret
.
find
(
META_TABLEFILES
)
!=
ret
.
end
()
&&
sqlite_orm
::
sync_schema_result
::
dropped_and_recreated
==
ret
[
META_TABLEFILES
])
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta TableFiles schema is created by Milvus old version"
);
}
}
Status
SqliteMetaImpl
::
Initialize
()
{
if
(
!
boost
::
filesystem
::
is_directory
(
options_
.
path
))
{
auto
ret
=
boost
::
filesystem
::
create_directory
(
options_
.
path
);
...
...
@@ -123,16 +139,7 @@ Status SqliteMetaImpl::Initialize() {
ConnectorPtr
=
std
::
make_unique
<
ConnectorT
>
(
StoragePrototype
(
options_
.
path
+
"/meta.sqlite"
));
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
auto
ret
=
ConnectorPtr
->
sync_schema_simulate
();
if
(
ret
.
find
(
META_TABLES
)
!=
ret
.
end
()
&&
sqlite_orm
::
sync_schema_result
::
dropped_and_recreated
==
ret
[
META_TABLES
])
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta schema is created by Milvus old version"
);
}
if
(
ret
.
find
(
META_TABLEFILES
)
!=
ret
.
end
()
&&
sqlite_orm
::
sync_schema_result
::
dropped_and_recreated
==
ret
[
META_TABLEFILES
])
{
throw
Exception
(
DB_INCOMPATIB_META
,
"Meta schema is created by Milvus old version"
);
}
ValidateMetaSchema
();
ConnectorPtr
->
sync_schema
();
ConnectorPtr
->
open_forever
();
// thread safe option
...
...
cpp/src/db/meta/SqliteMetaImpl.h
浏览文件 @
af0c4e68
...
...
@@ -97,10 +97,12 @@ class SqliteMetaImpl : public Meta {
Status
NextFileId
(
std
::
string
&
file_id
);
Status
NextTableId
(
std
::
string
&
table_id
);
Status
DiscardFiles
(
long
to_discard_size
);
void
ValidateMetaSchema
();
Status
Initialize
();
private:
const
DBMetaOptions
options_
;
std
::
mutex
meta_mutex_
;
};
// DBMetaImpl
...
...
cpp/src/utils/Error.h
浏览文件 @
af0c4e68
...
...
@@ -87,6 +87,7 @@ constexpr ErrorCode DB_NOT_FOUND = ToDbErrorCode(3);
constexpr
ErrorCode
DB_ALREADY_EXIST
=
ToDbErrorCode
(
4
);
constexpr
ErrorCode
DB_INVALID_PATH
=
ToDbErrorCode
(
5
);
constexpr
ErrorCode
DB_INCOMPATIB_META
=
ToDbErrorCode
(
6
);
constexpr
ErrorCode
DB_INVALID_META_URI
=
ToDbErrorCode
(
7
);
//knowhere error code
constexpr
ErrorCode
KNOWHERE_ERROR
=
ToKnowhereErrorCode
(
1
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录