提交 c13edb90 编写于 作者: S starlord

MS-578 makesure milvus5.0 dont crack 0.3.1 data


Former-commit-id: e3c81573e4a28ac51b564db548dd29fee92e1915
上级 48e296ca
......@@ -21,6 +21,7 @@
#include <mutex>
#include <chrono>
#include <regex>
#include <boost/filesystem.hpp>
namespace zilliz {
......@@ -195,6 +196,41 @@ meta::DateT GetDate() {
return GetDate(std::time(nullptr), 0);
}
// URI format: dialect://username:password@host:port/database
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) {
std::string dialect_regex = "(.*)";
std::string username_tegex = "(.*)";
std::string password_regex = "(.*)";
std::string host_regex = "(.*)";
std::string port_regex = "(.*)";
std::string db_name_regex = "(.*)";
std::string uri_regex_str =
dialect_regex + "\\:\\/\\/" +
username_tegex + "\\:" +
password_regex + "\\@" +
host_regex + "\\:" +
port_regex + "\\/" +
db_name_regex;
std::regex uri_regex(uri_regex_str);
std::smatch pieces_match;
if (std::regex_match(uri, pieces_match, uri_regex)) {
info.dialect_ = pieces_match[1].str();
info.username_ = pieces_match[2].str();
info.password_ = pieces_match[3].str();
info.host_ = pieces_match[4].str();
info.port_ = pieces_match[5].str();
info.db_name_ = pieces_match[6].str();
//TODO: verify host, port...
} else {
return Status(DB_INVALID_META_URI, "Invalid meta uri: " + uri);
}
return Status::OK();
}
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -44,6 +44,17 @@ meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
meta::DateT GetDate();
meta::DateT GetDateWithDelta(int day_delta);
struct MetaUriInfo {
std::string dialect_;
std::string username_;
std::string password_;
std::string host_;
std::string port_;
std::string db_name_;
};
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info);
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -20,13 +20,14 @@
#include "MySQLMetaImpl.h"
#include "utils/Log.h"
#include "utils/Exception.h"
#include "db/Utils.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
#include <cstdlib>
#include <string>
#include <regex>
#include <string.h>
namespace zilliz {
namespace milvus {
......@@ -49,38 +50,23 @@ namespace engine {
meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
std::string uri = metaOptions.backend_uri;
std::string dialectRegex = "(.*)";
std::string usernameRegex = "(.*)";
std::string passwordRegex = "(.*)";
std::string hostRegex = "(.*)";
std::string portRegex = "(.*)";
std::string dbNameRegex = "(.*)";
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
usernameRegex + "\\:" +
passwordRegex + "\\@" +
hostRegex + "\\:" +
portRegex + "\\/" +
dbNameRegex;
std::regex uriRegex(uriRegexStr);
std::smatch pieces_match;
if (std::regex_match(uri, pieces_match, uriRegex)) {
std::string dialect = pieces_match[1].str();
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::SqliteMetaImpl>(metaOptions);
} else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
}
} else {
utils::MetaUriInfo uri_info;
auto status = utils::ParseMetaUri(uri, uri_info);
if(!status.ok()) {
ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri;
throw InvalidArgumentException("Wrong URI format ");
}
if (strcasecmp(uri_info.dialect_.c_str(), "mysql") == 0) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
} else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::SqliteMetaImpl>(metaOptions);
} else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << uri_info.dialect_;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
}
}
} // namespace engine
......
......@@ -19,21 +19,22 @@
#include "db/IDGenerator.h"
#include "db/Utils.h"
#include "utils/Log.h"
#include "utils/Exception.h"
#include "MetaConsts.h"
#include "metrics/Metrics.h"
#include <unistd.h>
#include <sstream>
#include <iostream>
#include <boost/filesystem.hpp>
#include <chrono>
#include <fstream>
#include <regex>
#include <string>
#include <mutex>
#include <thread>
#include "mysql++/mysql++.h"
#include <string.h>
#include <boost/filesystem.hpp>
#include <mysql++/mysql++.h>
namespace zilliz {
......@@ -56,8 +57,112 @@ Status HandleException(const std::string &desc, const char* what = nullptr) {
}
}
class MetaField {
public:
MetaField(const std::string& name, const std::string& type, const std::string& setting)
: name_(name),
type_(type),
setting_(setting) {
}
std::string name() const {
return name_;
}
std::string ToString() const {
return name_ + " " + type_ + " " + setting_;
}
// mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
// we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
bool IsEqual(const MetaField& field) const {
size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
}
private:
std::string name_;
std::string type_;
std::string setting_;
};
using MetaFields = std::vector<MetaField>;
class MetaSchema {
public:
MetaSchema(const std::string& name, const MetaFields& fields)
: name_(name),
fields_(fields) {
}
std::string name() const {
return name_;
}
std::string ToString() const {
std::string result;
for(auto& field : fields_) {
if(!result.empty()) {
result += ",";
}
result += field.ToString();
}
return result;
}
//if the outer fields contains all this MetaSchema fields, return true
//otherwise return false
bool IsEqual(const MetaFields& fields) const {
std::vector<std::string> found_field;
for(const auto& this_field : fields_) {
for(const auto& outer_field : fields) {
if(this_field.IsEqual(outer_field)) {
found_field.push_back(this_field.name());
break;
}
}
}
return found_field.size() == fields_.size();
}
private:
std::string name_;
MetaFields fields_;
};
//Tables schema
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
MetaField("state", "INT", "NOT NULL"),
MetaField("dimension", "SMALLINT", "NOT NULL"),
MetaField("created_on", "BIGINT", "NOT NULL"),
MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"),
MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
});
//TableFiles schema
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
MetaField("updated_time", "BIGINT", "NOT NULL"),
MetaField("created_on", "BIGINT", "NOT NULL"),
MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
});
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
: options_(options_),
mode_(mode) {
......@@ -84,7 +189,56 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) {
return Status::OK();
}
void MySQLMetaImpl::ValidateMetaSchema() {
if(nullptr == mysql_connection_pool_) {
return;
}
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return;
}
auto validate_func = [&](const MetaSchema& schema) {
Query query_statement = connectionPtr->query();
query_statement << "DESC " << schema.name() << ";";
MetaFields exist_fields;
try {
StoreQueryResult res = query_statement.store();
for (size_t i = 0; i < res.num_rows(); i++) {
const Row &row = res[i];
std::string name, type;
row["Field"].to_string(name);
row["Type"].to_string(type);
exist_fields.push_back(MetaField(name, type, ""));
}
} catch (std::exception &e) {
ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
}
if(exist_fields.empty()) {
return true;
}
return schema.IsEqual(exist_fields);
};
//verify Tables
if (!validate_func(TABLES_SCHEMA)) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
//verufy TableFiles
if (!validate_func(TABLEFILES_SCHEMA)) {
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
}
}
Status MySQLMetaImpl::Initialize() {
//step 1: create db root path
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
if (!ret) {
......@@ -96,108 +250,79 @@ Status MySQLMetaImpl::Initialize() {
std::string uri = options_.backend_uri;
std::string dialectRegex = "(.*)";
std::string usernameRegex = "(.*)";
std::string passwordRegex = "(.*)";
std::string hostRegex = "(.*)";
std::string portRegex = "(.*)";
std::string dbNameRegex = "(.*)";
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
usernameRegex + "\\:" +
passwordRegex + "\\@" +
hostRegex + "\\:" +
portRegex + "\\/" +
dbNameRegex;
std::regex uriRegex(uriRegexStr);
std::smatch pieces_match;
if (std::regex_match(uri, pieces_match, uriRegex)) {
std::string dialect = pieces_match[1].str();
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
if (dialect.find("mysql") == std::string::npos) {
return Status(DB_ERROR, "URI's dialect is not MySQL");
}
std::string username = pieces_match[2].str();
std::string password = pieces_match[3].str();
std::string serverAddress = pieces_match[4].str();
unsigned int port = 0;
if (!pieces_match[5].str().empty()) {
port = std::stoi(pieces_match[5].str());
//step 2: parse and check meta uri
utils::MetaUriInfo uri_info;
auto status = utils::ParseMetaUri(uri, uri_info);
if(!status.ok()) {
std::string msg = "Wrong URI format: " + uri;
ENGINE_LOG_ERROR << msg;
throw Exception(DB_INVALID_META_URI, msg);
}
if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
std::string msg = "URI's dialect is not MySQL";
ENGINE_LOG_ERROR << msg;
throw Exception(DB_INVALID_META_URI, msg);
}
//step 3: connect mysql
int thread_hint = std::thread::hardware_concurrency();
int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
unsigned int port = 0;
if (!uri_info.port_.empty()) {
port = std::stoi(uri_info.port_);
}
mysql_connection_pool_ =
std::make_shared<MySQLConnectionPool>(uri_info.db_name_, uri_info.username_,
uri_info.password_, uri_info.host_, port, max_pool_size);
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);
//step 4: validate to avoid open old version schema
ValidateMetaSchema();
//step 5: create meta tables
try {
if (mode_ != DBOptions::MODE::READ_ONLY) {
CleanUp();
}
std::string dbName = pieces_match[6].str();
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
int threadHint = std::thread::hardware_concurrency();
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
mysql_connection_pool_ =
std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
}
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
try {
if (mode_ != DBOptions::MODE::READ_ONLY) {
CleanUp();
if (!connectionPtr->thread_aware()) {
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
}
Query InitializeQuery = connectionPtr->query();
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");";
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to database server");
}
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
if (!InitializeQuery.exec()) {
return HandleException("Initialization Error", InitializeQuery.error());
}
if (!connectionPtr->thread_aware()) {
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
}
Query InitializeQuery = connectionPtr->query();
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
TABLEFILES_SCHEMA.name() << " (" << TABLEFILES_SCHEMA.ToString() + ");";
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
META_TABLES << " " <<
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
"table_id VARCHAR(255) UNIQUE NOT NULL, " <<
"state INT NOT NULL, " <<
"dimension SMALLINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"flag BIGINT DEFAULT 0 NOT NULL, " <<
"index_file_size BIGINT DEFAULT 1024 NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"nlist INT DEFAULT 16384 NOT NULL, " <<
"metric_type INT DEFAULT 1 NOT NULL);";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
if (!InitializeQuery.exec()) {
return HandleException("Initialization Error", InitializeQuery.error());
}
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
META_TABLEFILES << " " <<
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
"table_id VARCHAR(255) NOT NULL, " <<
"engine_type INT DEFAULT 1 NOT NULL, " <<
"file_id VARCHAR(255) NOT NULL, " <<
"file_type INT DEFAULT 0 NOT NULL, " <<
"file_size BIGINT DEFAULT 0 NOT NULL, " <<
"row_count BIGINT DEFAULT 0 NOT NULL, " <<
"updated_time BIGINT NOT NULL, " <<
"created_on BIGINT NOT NULL, " <<
"date INT DEFAULT -1 NOT NULL);";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
if (!InitializeQuery.exec()) {
return HandleException("Initialization Error", InitializeQuery.error());
}
} //Scoped Connection
if (!InitializeQuery.exec()) {
return HandleException("Initialization Error", InitializeQuery.error());
}
} //Scoped Connection
} catch (std::exception &e) {
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
}
} else {
ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
return Status(DB_ERROR, "Wrong URI format");
} catch (std::exception &e) {
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
}
return Status::OK();
......@@ -1843,7 +1968,7 @@ Status MySQLMetaImpl::DropAll() {
}
Query dropTableQuery = connectionPtr->query();
dropTableQuery << "DROP TABLE IF EXISTS " << META_TABLES << ", " << META_TABLEFILES << ";";
dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();
......
......@@ -103,8 +103,11 @@ class MySQLMetaImpl : public Meta {
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long long to_discard_size);
void ValidateMetaSchema();
Status Initialize();
private:
const DBMetaOptions options_;
const int mode_;
......
......@@ -84,7 +84,6 @@ inline auto StoragePrototype(const std::string &path) {
using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
: options_(options_) {
......@@ -111,6 +110,23 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
return Status::OK();
}
void SqliteMetaImpl::ValidateMetaSchema() {
if(ConnectorPtr == nullptr) {
return;
}
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
auto ret = ConnectorPtr->sync_schema_simulate();
if(ret.find(META_TABLES) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
}
if(ret.find(META_TABLEFILES) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
}
}
Status SqliteMetaImpl::Initialize() {
if (!boost::filesystem::is_directory(options_.path)) {
auto ret = boost::filesystem::create_directory(options_.path);
......@@ -123,16 +139,7 @@ Status SqliteMetaImpl::Initialize() {
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
auto ret = ConnectorPtr->sync_schema_simulate();
if(ret.find(META_TABLES) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version");
}
if(ret.find(META_TABLEFILES) != ret.end()
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version");
}
ValidateMetaSchema();
ConnectorPtr->sync_schema();
ConnectorPtr->open_forever(); // thread safe option
......
......@@ -97,10 +97,12 @@ class SqliteMetaImpl : public Meta {
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long to_discard_size);
void ValidateMetaSchema();
Status Initialize();
private:
const DBMetaOptions options_;
std::mutex meta_mutex_;
}; // DBMetaImpl
......
......@@ -87,6 +87,7 @@ constexpr ErrorCode DB_NOT_FOUND = ToDbErrorCode(3);
constexpr ErrorCode DB_ALREADY_EXIST = ToDbErrorCode(4);
constexpr ErrorCode DB_INVALID_PATH = ToDbErrorCode(5);
constexpr ErrorCode DB_INCOMPATIB_META = ToDbErrorCode(6);
constexpr ErrorCode DB_INVALID_META_URI = ToDbErrorCode(7);
//knowhere error code
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册