From c13edb909e42d36941422c1f1f2740924910b419 Mon Sep 17 00:00:00 2001 From: starlord Date: Fri, 20 Sep 2019 17:35:31 +0800 Subject: [PATCH] MS-578 makesure milvus5.0 dont crack 0.3.1 data Former-commit-id: e3c81573e4a28ac51b564db548dd29fee92e1915 --- cpp/src/db/Utils.cpp | 36 ++++ cpp/src/db/Utils.h | 11 + cpp/src/db/meta/MetaFactory.cpp | 46 ++--- cpp/src/db/meta/MySQLMetaImpl.cpp | 311 ++++++++++++++++++++--------- cpp/src/db/meta/MySQLMetaImpl.h | 3 + cpp/src/db/meta/SqliteMetaImpl.cpp | 29 ++- cpp/src/db/meta/SqliteMetaImpl.h | 4 +- cpp/src/utils/Error.h | 1 + 8 files changed, 306 insertions(+), 135 deletions(-) diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index 26f93563..41ebe618 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -21,6 +21,7 @@ #include #include +#include #include namespace zilliz { @@ -195,6 +196,41 @@ meta::DateT GetDate() { return GetDate(std::time(nullptr), 0); } +// URI format: dialect://username:password@host:port/database +Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) { + std::string dialect_regex = "(.*)"; + std::string username_tegex = "(.*)"; + std::string password_regex = "(.*)"; + std::string host_regex = "(.*)"; + std::string port_regex = "(.*)"; + std::string db_name_regex = "(.*)"; + std::string uri_regex_str = + dialect_regex + "\\:\\/\\/" + + username_tegex + "\\:" + + password_regex + "\\@" + + host_regex + "\\:" + + port_regex + "\\/" + + db_name_regex; + + std::regex uri_regex(uri_regex_str); + std::smatch pieces_match; + + if (std::regex_match(uri, pieces_match, uri_regex)) { + info.dialect_ = pieces_match[1].str(); + info.username_ = pieces_match[2].str(); + info.password_ = pieces_match[3].str(); + info.host_ = pieces_match[4].str(); + info.port_ = pieces_match[5].str(); + info.db_name_ = pieces_match[6].str(); + + //TODO: verify host, port... + } else { + return Status(DB_INVALID_META_URI, "Invalid meta uri: " + uri); + } + + return Status::OK(); +} + } // namespace utils } // namespace engine } // namespace milvus diff --git a/cpp/src/db/Utils.h b/cpp/src/db/Utils.h index f27273c6..39bdfbfb 100644 --- a/cpp/src/db/Utils.h +++ b/cpp/src/db/Utils.h @@ -44,6 +44,17 @@ meta::DateT GetDate(const std::time_t &t, int day_delta = 0); meta::DateT GetDate(); meta::DateT GetDateWithDelta(int day_delta); +struct MetaUriInfo { + std::string dialect_; + std::string username_; + std::string password_; + std::string host_; + std::string port_; + std::string db_name_; +}; + +Status ParseMetaUri(const std::string& uri, MetaUriInfo& info); + } // namespace utils } // namespace engine } // namespace milvus diff --git a/cpp/src/db/meta/MetaFactory.cpp b/cpp/src/db/meta/MetaFactory.cpp index 2f498941..34b553ea 100644 --- a/cpp/src/db/meta/MetaFactory.cpp +++ b/cpp/src/db/meta/MetaFactory.cpp @@ -20,13 +20,14 @@ #include "MySQLMetaImpl.h" #include "utils/Log.h" #include "utils/Exception.h" +#include "db/Utils.h" #include #include #include #include #include -#include +#include namespace zilliz { namespace milvus { @@ -49,38 +50,23 @@ namespace engine { meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) { std::string uri = metaOptions.backend_uri; - std::string dialectRegex = "(.*)"; - std::string usernameRegex = "(.*)"; - std::string passwordRegex = "(.*)"; - std::string hostRegex = "(.*)"; - std::string portRegex = "(.*)"; - std::string dbNameRegex = "(.*)"; - std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + - usernameRegex + "\\:" + - passwordRegex + "\\@" + - hostRegex + "\\:" + - portRegex + "\\/" + - dbNameRegex; - std::regex uriRegex(uriRegexStr); - std::smatch pieces_match; - - if (std::regex_match(uri, pieces_match, uriRegex)) { - std::string dialect = pieces_match[1].str(); - std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); - if (dialect.find("mysql") != std::string::npos) { - ENGINE_LOG_INFO << "Using MySQL"; - return std::make_shared(metaOptions, mode); - } else if (dialect.find("sqlite") != std::string::npos) { - ENGINE_LOG_INFO << "Using SQLite"; - return std::make_shared(metaOptions); - } else { - ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; - throw InvalidArgumentException("URI dialect is not mysql / sqlite"); - } - } else { + utils::MetaUriInfo uri_info; + auto status = utils::ParseMetaUri(uri, uri_info); + if(!status.ok()) { ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; throw InvalidArgumentException("Wrong URI format "); } + + if (strcasecmp(uri_info.dialect_.c_str(), "mysql") == 0) { + ENGINE_LOG_INFO << "Using MySQL"; + return std::make_shared(metaOptions, mode); + } else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) { + ENGINE_LOG_INFO << "Using SQLite"; + return std::make_shared(metaOptions); + } else { + ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << uri_info.dialect_; + throw InvalidArgumentException("URI dialect is not mysql / sqlite"); + } } } // namespace engine diff --git a/cpp/src/db/meta/MySQLMetaImpl.cpp b/cpp/src/db/meta/MySQLMetaImpl.cpp index 282df32e..a070b19f 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.cpp +++ b/cpp/src/db/meta/MySQLMetaImpl.cpp @@ -19,21 +19,22 @@ #include "db/IDGenerator.h" #include "db/Utils.h" #include "utils/Log.h" +#include "utils/Exception.h" #include "MetaConsts.h" #include "metrics/Metrics.h" #include #include #include -#include #include #include #include #include #include #include - -#include "mysql++/mysql++.h" +#include +#include +#include namespace zilliz { @@ -56,8 +57,112 @@ Status HandleException(const std::string &desc, const char* what = nullptr) { } } +class MetaField { +public: + MetaField(const std::string& name, const std::string& type, const std::string& setting) + : name_(name), + type_(type), + setting_(setting) { + } + + std::string name() const { + return name_; + } + + std::string ToString() const { + return name_ + " " + type_ + " " + setting_; + } + + // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT' + // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)' + bool IsEqual(const MetaField& field) const { + size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length(); + size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length(); + return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 && + strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0; + } + +private: + std::string name_; + std::string type_; + std::string setting_; +}; + +using MetaFields = std::vector; +class MetaSchema { +public: + MetaSchema(const std::string& name, const MetaFields& fields) + : name_(name), + fields_(fields) { + } + + std::string name() const { + return name_; + } + + std::string ToString() const { + std::string result; + for(auto& field : fields_) { + if(!result.empty()) { + result += ","; + } + result += field.ToString(); + } + return result; + } + + //if the outer fields contains all this MetaSchema fields, return true + //otherwise return false + bool IsEqual(const MetaFields& fields) const { + std::vector found_field; + for(const auto& this_field : fields_) { + for(const auto& outer_field : fields) { + if(this_field.IsEqual(outer_field)) { + found_field.push_back(this_field.name()); + break; + } + } + } + + return found_field.size() == fields_.size(); + } + +private: + std::string name_; + MetaFields fields_; +}; + +//Tables schema +static const MetaSchema TABLES_SCHEMA(META_TABLES, { + MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"), + MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"), + MetaField("state", "INT", "NOT NULL"), + MetaField("dimension", "SMALLINT", "NOT NULL"), + MetaField("created_on", "BIGINT", "NOT NULL"), + MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"), + MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"), + MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"), + MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"), + MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"), +}); + +//TableFiles schema +static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, { + MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"), + MetaField("table_id", "VARCHAR(255)", "NOT NULL"), + MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"), + MetaField("file_id", "VARCHAR(255)", "NOT NULL"), + MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"), + MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"), + MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"), + MetaField("updated_time", "BIGINT", "NOT NULL"), + MetaField("created_on", "BIGINT", "NOT NULL"), + MetaField("date", "INT", "DEFAULT -1 NOT NULL"), +}); + } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode) : options_(options_), mode_(mode) { @@ -84,7 +189,56 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) { return Status::OK(); } +void MySQLMetaImpl::ValidateMetaSchema() { + if(nullptr == mysql_connection_pool_) { + return; + } + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + if (connectionPtr == nullptr) { + return; + } + + auto validate_func = [&](const MetaSchema& schema) { + Query query_statement = connectionPtr->query(); + query_statement << "DESC " << schema.name() << ";"; + + MetaFields exist_fields; + + try { + StoreQueryResult res = query_statement.store(); + for (size_t i = 0; i < res.num_rows(); i++) { + const Row &row = res[i]; + std::string name, type; + row["Field"].to_string(name); + row["Type"].to_string(type); + + exist_fields.push_back(MetaField(name, type, "")); + } + } catch (std::exception &e) { + ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created"; + } + + if(exist_fields.empty()) { + return true; + } + + return schema.IsEqual(exist_fields); + }; + + //verify Tables + if (!validate_func(TABLES_SCHEMA)) { + throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); + } + + //verufy TableFiles + if (!validate_func(TABLEFILES_SCHEMA)) { + throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); + } +} + Status MySQLMetaImpl::Initialize() { + //step 1: create db root path if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); if (!ret) { @@ -96,108 +250,79 @@ Status MySQLMetaImpl::Initialize() { std::string uri = options_.backend_uri; - std::string dialectRegex = "(.*)"; - std::string usernameRegex = "(.*)"; - std::string passwordRegex = "(.*)"; - std::string hostRegex = "(.*)"; - std::string portRegex = "(.*)"; - std::string dbNameRegex = "(.*)"; - std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + - usernameRegex + "\\:" + - passwordRegex + "\\@" + - hostRegex + "\\:" + - portRegex + "\\/" + - dbNameRegex; - std::regex uriRegex(uriRegexStr); - std::smatch pieces_match; - - if (std::regex_match(uri, pieces_match, uriRegex)) { - std::string dialect = pieces_match[1].str(); - std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); - if (dialect.find("mysql") == std::string::npos) { - return Status(DB_ERROR, "URI's dialect is not MySQL"); - } - std::string username = pieces_match[2].str(); - std::string password = pieces_match[3].str(); - std::string serverAddress = pieces_match[4].str(); - unsigned int port = 0; - if (!pieces_match[5].str().empty()) { - port = std::stoi(pieces_match[5].str()); + //step 2: parse and check meta uri + utils::MetaUriInfo uri_info; + auto status = utils::ParseMetaUri(uri, uri_info); + if(!status.ok()) { + std::string msg = "Wrong URI format: " + uri; + ENGINE_LOG_ERROR << msg; + throw Exception(DB_INVALID_META_URI, msg); + } + + if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) { + std::string msg = "URI's dialect is not MySQL"; + ENGINE_LOG_ERROR << msg; + throw Exception(DB_INVALID_META_URI, msg); + } + + //step 3: connect mysql + int thread_hint = std::thread::hardware_concurrency(); + int max_pool_size = (thread_hint == 0) ? 8 : thread_hint; + unsigned int port = 0; + if (!uri_info.port_.empty()) { + port = std::stoi(uri_info.port_); + } + + mysql_connection_pool_ = + std::make_shared(uri_info.db_name_, uri_info.username_, + uri_info.password_, uri_info.host_, port, max_pool_size); + ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size); + + //step 4: validate to avoid open old version schema + ValidateMetaSchema(); + + //step 5: create meta tables + try { + + if (mode_ != DBOptions::MODE::READ_ONLY) { + CleanUp(); } - std::string dbName = pieces_match[6].str(); + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); - int threadHint = std::thread::hardware_concurrency(); - int maxPoolSize = threadHint == 0 ? 8 : threadHint; - mysql_connection_pool_ = - std::make_shared(dbName, username, password, serverAddress, port, maxPoolSize); + if (connectionPtr == nullptr) { + return Status(DB_ERROR, "Failed to connect to database server"); + } - ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize); - try { - if (mode_ != DBOptions::MODE::READ_ONLY) { - CleanUp(); + if (!connectionPtr->thread_aware()) { + ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; + return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it."); } + Query InitializeQuery = connectionPtr->query(); - { - ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + InitializeQuery << "CREATE TABLE IF NOT EXISTS " << + TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");"; - if (connectionPtr == nullptr) { - return Status(DB_ERROR, "Failed to connect to database server"); - } + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); + if (!InitializeQuery.exec()) { + return HandleException("Initialization Error", InitializeQuery.error()); + } - if (!connectionPtr->thread_aware()) { - ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; - return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it."); - } - Query InitializeQuery = connectionPtr->query(); + InitializeQuery << "CREATE TABLE IF NOT EXISTS " << + TABLEFILES_SCHEMA.name() << " (" << TABLEFILES_SCHEMA.ToString() + ");"; - InitializeQuery << "CREATE TABLE IF NOT EXISTS " << - META_TABLES << " " << - "(id BIGINT PRIMARY KEY AUTO_INCREMENT, " << - "table_id VARCHAR(255) UNIQUE NOT NULL, " << - "state INT NOT NULL, " << - "dimension SMALLINT NOT NULL, " << - "created_on BIGINT NOT NULL, " << - "flag BIGINT DEFAULT 0 NOT NULL, " << - "index_file_size BIGINT DEFAULT 1024 NOT NULL, " << - "engine_type INT DEFAULT 1 NOT NULL, " << - "nlist INT DEFAULT 16384 NOT NULL, " << - "metric_type INT DEFAULT 1 NOT NULL);"; - - ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); - - if (!InitializeQuery.exec()) { - return HandleException("Initialization Error", InitializeQuery.error()); - } + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); - InitializeQuery << "CREATE TABLE IF NOT EXISTS " << - META_TABLEFILES << " " << - "(id BIGINT PRIMARY KEY AUTO_INCREMENT, " << - "table_id VARCHAR(255) NOT NULL, " << - "engine_type INT DEFAULT 1 NOT NULL, " << - "file_id VARCHAR(255) NOT NULL, " << - "file_type INT DEFAULT 0 NOT NULL, " << - "file_size BIGINT DEFAULT 0 NOT NULL, " << - "row_count BIGINT DEFAULT 0 NOT NULL, " << - "updated_time BIGINT NOT NULL, " << - "created_on BIGINT NOT NULL, " << - "date INT DEFAULT -1 NOT NULL);"; - - ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); - - if (!InitializeQuery.exec()) { - return HandleException("Initialization Error", InitializeQuery.error()); - } - } //Scoped Connection + if (!InitializeQuery.exec()) { + return HandleException("Initialization Error", InitializeQuery.error()); + } + } //Scoped Connection - } catch (std::exception &e) { - return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what()); - } - } else { - ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri; - return Status(DB_ERROR, "Wrong URI format"); + } catch (std::exception &e) { + return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what()); } return Status::OK(); @@ -1843,7 +1968,7 @@ Status MySQLMetaImpl::DropAll() { } Query dropTableQuery = connectionPtr->query(); - dropTableQuery << "DROP TABLE IF EXISTS " << META_TABLES << ", " << META_TABLEFILES << ";"; + dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str(); diff --git a/cpp/src/db/meta/MySQLMetaImpl.h b/cpp/src/db/meta/MySQLMetaImpl.h index dca109b2..4950b604 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.h +++ b/cpp/src/db/meta/MySQLMetaImpl.h @@ -103,8 +103,11 @@ class MySQLMetaImpl : public Meta { Status NextFileId(std::string &file_id); Status NextTableId(std::string &table_id); Status DiscardFiles(long long to_discard_size); + + void ValidateMetaSchema(); Status Initialize(); +private: const DBMetaOptions options_; const int mode_; diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index a28038d0..7131a478 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -84,7 +84,6 @@ inline auto StoragePrototype(const std::string &path) { using ConnectorT = decltype(StoragePrototype("")); static std::unique_ptr ConnectorPtr; -using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL); SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_) : options_(options_) { @@ -111,6 +110,23 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) { return Status::OK(); } +void SqliteMetaImpl::ValidateMetaSchema() { + if(ConnectorPtr == nullptr) { + return; + } + + //old meta could be recreated since schema changed, throw exception if meta schema is not compatible + auto ret = ConnectorPtr->sync_schema_simulate(); + if(ret.find(META_TABLES) != ret.end() + && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) { + throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); + } + if(ret.find(META_TABLEFILES) != ret.end() + && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) { + throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); + } +} + Status SqliteMetaImpl::Initialize() { if (!boost::filesystem::is_directory(options_.path)) { auto ret = boost::filesystem::create_directory(options_.path); @@ -123,16 +139,7 @@ Status SqliteMetaImpl::Initialize() { ConnectorPtr = std::make_unique(StoragePrototype(options_.path + "/meta.sqlite")); - //old meta could be recreated since schema changed, throw exception if meta schema is not compatible - auto ret = ConnectorPtr->sync_schema_simulate(); - if(ret.find(META_TABLES) != ret.end() - && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) { - throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version"); - } - if(ret.find(META_TABLEFILES) != ret.end() - && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) { - throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version"); - } + ValidateMetaSchema(); ConnectorPtr->sync_schema(); ConnectorPtr->open_forever(); // thread safe option diff --git a/cpp/src/db/meta/SqliteMetaImpl.h b/cpp/src/db/meta/SqliteMetaImpl.h index b1465dd9..db7a70fe 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.h +++ b/cpp/src/db/meta/SqliteMetaImpl.h @@ -97,10 +97,12 @@ class SqliteMetaImpl : public Meta { Status NextFileId(std::string &file_id); Status NextTableId(std::string &table_id); Status DiscardFiles(long to_discard_size); + + void ValidateMetaSchema(); Status Initialize(); +private: const DBMetaOptions options_; - std::mutex meta_mutex_; }; // DBMetaImpl diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index 672aa928..e903dcb2 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -87,6 +87,7 @@ constexpr ErrorCode DB_NOT_FOUND = ToDbErrorCode(3); constexpr ErrorCode DB_ALREADY_EXIST = ToDbErrorCode(4); constexpr ErrorCode DB_INVALID_PATH = ToDbErrorCode(5); constexpr ErrorCode DB_INCOMPATIB_META = ToDbErrorCode(6); +constexpr ErrorCode DB_INVALID_META_URI = ToDbErrorCode(7); //knowhere error code constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1); -- GitLab