diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 98e820358dc05fb9396faca044422a6cc1544301..d165738aa78133914a6df258c2a47cf2b1434657 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -46,7 +46,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-85 - add NetIO metric - MS-96 - add new query interface for specified files - MS-97 - Add S3 SDK for MinIO Storage +- MS-105 - Add MySQL - MS-130 - Add prometheus_test + ## Task - MS-74 - Change README.md in cpp - MS-88 - Add support for arm architecture diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0506dd896334801be21368f6fcd3412ddb413b50..116f30026d9247d53732589f6b08696066f7d970 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -113,20 +113,13 @@ link_directories(${MILVUS_BINARY_DIR}) set(MILVUS_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include) set(MILVUS_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src) -#set(MILVUS_THIRD_PARTY ${CMAKE_CURRENT_SOURCE_DIR}/third_party) -#set(MILVUS_THIRD_PARTY_BUILD ${CMAKE_CURRENT_SOURCE_DIR}/third_party/build) add_compile_definitions(PROFILER=${PROFILER}) include_directories(${MILVUS_ENGINE_INCLUDE}) include_directories(${MILVUS_ENGINE_SRC}) -#include_directories(${MILVUS_THIRD_PARTY_BUILD}/include) link_directories(${CMAKE_CURRRENT_BINARY_DIR}) -#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib) -#link_directories(${MILVUS_THIRD_PARTY_BUILD}/lib64) -#execute_process(COMMAND bash build.sh -# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party) add_subdirectory(src) diff --git a/cpp/README.md b/cpp/README.md index 69f7f8c0e0ce8df7560c8d2cf3a7a57695943a01..c1cd381442c25cb9296f2ebbc606b16507503f72 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -1,13 +1,22 @@ ### Compilation #### Step 1: install necessery tools + Install MySQL + centos7 : - yum install gfortran flex bison + yum install gfortran qt4 flex bison mysql-devel ubuntu16.04 : - sudo apt-get install gfortran flex bison + sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev + +If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link: + +``` +sudo ln -s /path/to/libmysqlclient.so /path/to/libmysqlclient_r.so +``` #### Step 2: build(output to cmake_build folder) + cmake_build/src/milvus_server is the server cmake_build/src/libmilvus_engine.a is the static library diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index d95e7c7ed19081273c13513b2c81e1875b6fea4a..147663d0db7a444d20fbf24278ca0a4a69718a34 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -93,6 +93,8 @@ define_option(MILVUS_WITH_SQLITE "Build with SQLite library" ON) define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) +define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) + define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index cb5f3532fe413e72bc5f0bfba1f0139fb1221fa0..9aa3f62124a58c4d6f345532c72601a03769a78c 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -26,6 +26,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES JSONCONS LAPACK Lz4 + MySQLPP OpenBLAS Prometheus RocksDB @@ -56,12 +57,14 @@ macro(build_dependency DEPENDENCY_NAME) build_easyloggingpp() elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS") build_faiss() + elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") + build_gtest() elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK") build_lapack() elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4") build_lz4() - elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest") - build_gtest() + elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP") + build_mysqlpp() elseif ("${DEPENDENCY_NAME}" STREQUAL "JSONCONS") build_jsoncons() elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS") @@ -265,6 +268,12 @@ else() set(LZ4_SOURCE_URL "https://github.com/lz4/lz4/archive/${LZ4_VERSION}.tar.gz") endif() +if(DEFINED ENV{MILVUS_MYSQLPP_URL}) + set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}") +else() + set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz") +endif() + if (DEFINED ENV{MILVUS_OPENBLAS_URL}) set(OPENBLAS_SOURCE_URL "$ENV{MILVUS_OPENBLAS_URL}") else () @@ -829,8 +838,8 @@ macro(build_faiss) # ${MAKE} ${MAKE_BUILD_ARGS} BUILD_COMMAND ${MAKE} ${MAKE_BUILD_ARGS} all - COMMAND - cd gpu && make ${MAKE_BUILD_ARGS} + COMMAND + cd gpu && ${MAKE} ${MAKE_BUILD_ARGS} BUILD_IN_SOURCE 1 # INSTALL_DIR @@ -1068,6 +1077,65 @@ if(MILVUS_WITH_LZ4) include_directories(SYSTEM ${LZ4_INCLUDE_DIR}) endif() +# ---------------------------------------------------------------------- +# MySQL++ + +macro(build_mysqlpp) + message(STATUS "Building MySQL++-${MYSQLPP_VERSION} from source") + set(MYSQLPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep") + set(MYSQLPP_INCLUDE_DIR "${MYSQLPP_PREFIX}/include") + set(MYSQLPP_SHARED_LIB + "${MYSQLPP_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}") + + set(MYSQLPP_CONFIGURE_ARGS + "--prefix=${MYSQLPP_PREFIX}" + "--enable-thread-check" + "CFLAGS=${EP_C_FLAGS}" + "CXXFLAGS=${EP_CXX_FLAGS}" + "LDFLAGS=-pthread") + + externalproject_add(mysqlpp_ep + URL + ${MYSQLPP_SOURCE_URL} +# GIT_REPOSITORY +# ${MYSQLPP_SOURCE_URL} +# GIT_TAG +# ${MYSQLPP_VERSION} +# GIT_SHALLOW +# TRUE + ${EP_LOG_OPTIONS} + CONFIGURE_COMMAND +# "./bootstrap" +# COMMAND + "./configure" + ${MYSQLPP_CONFIGURE_ARGS} + BUILD_COMMAND + ${MAKE} ${MAKE_BUILD_ARGS} + BUILD_IN_SOURCE + 1 + BUILD_BYPRODUCTS + ${MYSQLPP_SHARED_LIB}) + + file(MAKE_DIRECTORY "${MYSQLPP_INCLUDE_DIR}") + add_library(mysqlpp SHARED IMPORTED) + set_target_properties( + mysqlpp + PROPERTIES + IMPORTED_LOCATION "${MYSQLPP_SHARED_LIB}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQLPP_INCLUDE_DIR}") + + add_dependencies(mysqlpp mysqlpp_ep) + +endmacro() + +if(MILVUS_WITH_MYSQLPP) + + resolve_dependency(MySQLPP) + get_target_property(MYSQLPP_INCLUDE_DIR mysqlpp INTERFACE_INCLUDE_DIRECTORIES) + include_directories(SYSTEM "${MYSQLPP_INCLUDE_DIR}") + link_directories(SYSTEM ${MYSQLPP_PREFIX}/lib) +endif() + # ---------------------------------------------------------------------- # Prometheus diff --git a/cpp/coverage.sh b/cpp/coverage.sh index 3415e752bdc3879e533ab1a1084a3e1ccc7c1610..54cf4807037d914b767eae6334b0aefa488b5587 100755 --- a/cpp/coverage.sh +++ b/cpp/coverage.sh @@ -20,15 +20,20 @@ if [ $? -ne 0 ]; then fi for test in `ls ${DIR_UNITTEST}`; do - echo $test - case ${test} in + case ${test} in + db_test) + # set run args for db_test + args="mysql://root:Fantast1c@192.168.1.194:3306/test" + ;; *_test) - # run unittest - ./${DIR_UNITTEST}/${test} - if [ $? -ne 0 ]; then - echo ${DIR_UNITTEST}/${test} "run failed" - fi + args="" + ;; esac + # run unittest + ./${DIR_UNITTEST}/${test} "${args}" + if [ $? -ne 0 ]; then + echo ${DIR_UNITTEST}/${test} "run failed" + fi done # gen test converage diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index e00420b2d1be8fb98204fca87aa9af651a3ad3c9..6c1bd1277141562548012968f80c16042890d75b 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -62,6 +62,7 @@ set(s3_client_files include_directories(/usr/include) include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include") include_directories(thrift/gen-cpp) +include_directories(/usr/include/mysql) set(third_party_libs easyloggingpp @@ -83,6 +84,7 @@ set(third_party_libs snappy zlib zstd + mysqlpp ${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so ) if (MEGASEARCH_WITH_ARROW STREQUAL "ON") @@ -181,4 +183,10 @@ endif () install(TARGETS milvus_server DESTINATION bin) +install(FILES + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX} + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3 + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4 + DESTINATION bin/lib) #need to copy libmysqlpp.so + add_subdirectory(sdk) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ebc836a97875775014549242e81fed67fe0938ec..a1bcc7ff444e198322290d0cf005a47792af86af 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -7,11 +7,13 @@ #include "DBMetaImpl.h" #include "Log.h" #include "EngineFactory.h" +#include "Factories.h" #include "metrics/Metrics.h" #include "scheduler/TaskScheduler.h" #include "scheduler/context/SearchContext.h" #include "scheduler/context/DeleteContext.h" #include "utils/TimeRecorder.h" +#include "MetaConsts.h" #include #include @@ -132,11 +134,14 @@ void CalcScore(uint64_t vector_count, DBImpl::DBImpl(const Options& options) : options_(options), shutting_down_(false), - meta_ptr_(new meta::DBMetaImpl(options_.meta)), - mem_mgr_(new MemManager(meta_ptr_, options_)), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { - StartTimerTasks(); + meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); + mem_mgr_ = std::make_shared(meta_ptr_, options_); + // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_)); + if (options.mode != Options::MODE::READ_ONLY) { + StartTimerTasks(); + } } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { @@ -465,9 +470,14 @@ void DBImpl::StartMetricTask() { } void DBImpl::StartCompactionTask() { +// static int count = 0; +// count++; +// std::cout << "StartCompactionTask: " << count << std::endl; +// std::cout << "c: " << count++ << std::endl; static uint64_t compact_clock_tick = 0; compact_clock_tick++; if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) { +// std::cout << "c r: " << count++ << std::endl; return; } @@ -574,6 +584,10 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { } void DBImpl::BackgroundCompaction(std::set table_ids) { +// static int b_count = 0; +// b_count++; +// std::cout << "BackgroundCompaction: " << b_count << std::endl; + Status status; for (auto table_id : table_ids) { status = BackgroundMergeFiles(table_id); @@ -584,7 +598,13 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { } meta_ptr_->Archive(); - meta_ptr_->CleanUpFilesWithTTL(1); + + int ttl = 1; + if (options_.mode == Options::MODE::CLUSTER) { + ttl = meta::D_SEC; +// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds."; + } + meta_ptr_->CleanUpFilesWithTTL(ttl); } void DBImpl::StartBuildIndexTask() { diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index d834d6cff28e112464b2229a0055834391435ecf..8c56c863e7f57d0dcb0ba1489252dd60058a868e 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -183,6 +183,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, } Status DBMetaImpl::CreateTable(TableSchema &table_schema) { + try { MetricCollector metric; diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 99a2918b855853abd9357ae7ee72192eca53ee69..4b24bd3a1c4895f468b993fe1e308518f307a243 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -3,16 +3,18 @@ // Unauthorized copying of this file, via any medium is strictly prohibited. // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// +#include #include "Factories.h" #include "DBImpl.h" -#include #include #include #include #include #include #include +#include +#include "Exception.h" namespace zilliz { namespace milvus { @@ -26,6 +28,7 @@ DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) { ss << "/tmp/" << rand(); p = ss.str(); } + DBMetaOptions meta; meta.path = p; return meta; @@ -43,6 +46,48 @@ std::shared_ptr DBMetaImplFactory::Build() { return std::shared_ptr(new meta::DBMetaImpl(options)); } +std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOptions, + const int& mode) { + + std::string uri = metaOptions.backend_uri; + + std::string dialectRegex = "(.*)"; + std::string usernameRegex = "(.*)"; + std::string passwordRegex = "(.*)"; + std::string hostRegex = "(.*)"; + std::string portRegex = "(.*)"; + std::string dbNameRegex = "(.*)"; + std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + + usernameRegex + "\\:" + + passwordRegex + "\\@" + + hostRegex + "\\:" + + portRegex + "\\/" + + dbNameRegex; + std::regex uriRegex(uriRegexStr); + std::smatch pieces_match; + + if (std::regex_match(uri, pieces_match, uriRegex)) { + std::string dialect = pieces_match[1].str(); + std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); + if (dialect.find("mysql") != std::string::npos) { + ENGINE_LOG_INFO << "Using MySQL"; + return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); + } + else if (dialect.find("sqlite") != std::string::npos) { + ENGINE_LOG_INFO << "Using SQLite"; + return std::make_shared(meta::DBMetaImpl(metaOptions)); + } + else { + ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; + throw InvalidArgumentException("URI dialect is not mysql / sqlite"); + } + } + else { + ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; + throw InvalidArgumentException("Wrong URI format "); + } +} + std::shared_ptr DBFactory::Build() { auto options = OptionsFactory::Build(); auto db = DBFactory::Build(options); diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 46d3e1bbc048e202d6739c04cb7adb4ad9c7bd82..889922b17a59cb9bb96df73971a0952abe6079e7 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -7,6 +7,7 @@ #include "DB.h" #include "DBMetaImpl.h" +#include "MySQLMetaImpl.h" #include "Options.h" #include "ExecutionEngine.h" @@ -27,6 +28,7 @@ struct OptionsFactory { struct DBMetaImplFactory { static std::shared_ptr Build(); + static std::shared_ptr Build(const DBMetaOptions& metaOptions, const int& mode); }; struct DBFactory { diff --git a/cpp/src/db/MySQLConnectionPool.h b/cpp/src/db/MySQLConnectionPool.h new file mode 100644 index 0000000000000000000000000000000000000000..8992ba274c7bc48c1374bf2b6e88e7804aa98b6a --- /dev/null +++ b/cpp/src/db/MySQLConnectionPool.h @@ -0,0 +1,109 @@ +#include "mysql++/mysql++.h" + +#include +#include +#include + +#include "Log.h" + +class MySQLConnectionPool : public mysqlpp::ConnectionPool { + +public: + // The object's only constructor + MySQLConnectionPool(std::string dbName, + std::string userName, + std::string passWord, + std::string serverIp, + int port = 0, + int maxPoolSize = 8) : + db_(dbName), + user_(userName), + password_(passWord), + server_(serverIp), + port_(port), + max_pool_size_(maxPoolSize) + { + + conns_in_use_ = 0; + + max_idle_time_ = 10; //10 seconds + } + + // The destructor. We _must_ call ConnectionPool::clear() here, + // because our superclass can't do it for us. + ~MySQLConnectionPool() override { + clear(); + } + + // Do a simple form of in-use connection limiting: wait to return + // a connection until there are a reasonably low number in use + // already. Can't do this in create() because we're interested in + // connections actually in use, not those created. Also note that + // we keep our own count; ConnectionPool::size() isn't the same! + mysqlpp::Connection* grab() override { + while (conns_in_use_ > max_pool_size_) { + sleep(1); + } + + ++conns_in_use_; + return mysqlpp::ConnectionPool::grab(); + } + + // Other half of in-use conn count limit + void release(const mysqlpp::Connection* pc) override { + mysqlpp::ConnectionPool::release(pc); + + if (conns_in_use_ <= 0) { + ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl; + } + else { + --conns_in_use_; + } + } + + int getConnectionsInUse() { + return conns_in_use_; + } + + void set_max_idle_time(int max_idle) { + max_idle_time_ = max_idle; + } + +protected: + + // Superclass overrides + mysqlpp::Connection* create() override { + // Create connection using the parameters we were passed upon + // creation. + mysqlpp::Connection* conn = new mysqlpp::Connection(); + conn->set_option(new mysqlpp::ReconnectOption(true)); + conn->connect(db_.empty() ? 0 : db_.c_str(), + server_.empty() ? 0 : server_.c_str(), + user_.empty() ? 0 : user_.c_str(), + password_.empty() ? 0 : password_.c_str(), + port_); + return conn; + } + + void destroy(mysqlpp::Connection* cp) override { + // Our superclass can't know how we created the Connection, so + // it delegates destruction to us, to be safe. + delete cp; + } + + unsigned int max_idle_time() override { + return max_idle_time_; + } + +private: + // Number of connections currently in use + std::atomic conns_in_use_; + + // Our connection parameters + std::string db_, user_, password_, server_; + int port_; + + int max_pool_size_; + + unsigned int max_idle_time_; +}; \ No newline at end of file diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0b0cc01e5d2ab1f52848bff8f301185bedb0ef00 --- /dev/null +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -0,0 +1,1872 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "MySQLMetaImpl.h" +#include "IDGenerator.h" +#include "Utils.h" +#include "Log.h" +#include "MetaConsts.h" +#include "Factories.h" +#include "metrics/Metrics.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mysql++/mysql++.h" + +namespace zilliz { +namespace milvus { +namespace engine { +namespace meta { + + using namespace mysqlpp; + +// static std::unique_ptr connectionPtr(new Connection()); +// std::recursive_mutex mysql_mutex; +// +// std::unique_ptr& MySQLMetaImpl::getConnectionPtr() { +//// static std::recursive_mutex connectionMutex_; +// std::lock_guard lock(connectionMutex_); +// return connectionPtr; +// } + + namespace { + + Status HandleException(const std::string& desc, std::exception &e) { + ENGINE_LOG_ERROR << desc << ": " << e.what(); + return Status::DBTransactionError(desc, e.what()); + } + + class MetricCollector { + public: + MetricCollector() { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + start_time_ = METRICS_NOW_TIME; + } + + ~MetricCollector() { + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time_, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + + private: + using TIME_POINT = std::chrono::system_clock::time_point; + TIME_POINT start_time_; + }; + + } + + std::string MySQLMetaImpl::GetTablePath(const std::string &table_id) { + return options_.path + "/tables/" + table_id; + } + + std::string MySQLMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) { + std::stringstream ss; + ss << GetTablePath(table_id) << "/" << date; + return ss.str(); + } + + void MySQLMetaImpl::GetTableFilePath(TableFileSchema &group_file) { + if (group_file.date_ == EmptyDate) { + group_file.date_ = Meta::GetDate(); + } + std::stringstream ss; + ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_) + << "/" << group_file.file_id_; + group_file.location_ = ss.str(); + } + + Status MySQLMetaImpl::NextTableId(std::string &table_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.GetNextIDNumber(); + table_id = ss.str(); + return Status::OK(); + } + + Status MySQLMetaImpl::NextFileId(std::string &file_id) { + std::stringstream ss; + SimpleIDGenerator g; + ss << g.GetNextIDNumber(); + file_id = ss.str(); + return Status::OK(); + } + + MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int& mode) + : options_(options_), + mode_(mode) { + Initialize(); + } + + Status MySQLMetaImpl::Initialize() { + +// std::lock_guard lock(mysql_mutex); + + if (!boost::filesystem::is_directory(options_.path)) { + auto ret = boost::filesystem::create_directory(options_.path); + if (!ret) { + ENGINE_LOG_ERROR << "Failed to create db directory " << options_.path; + return Status::DBTransactionError("Failed to create db directory", options_.path); + } + } + + std::string uri = options_.backend_uri; + + std::string dialectRegex = "(.*)"; + std::string usernameRegex = "(.*)"; + std::string passwordRegex = "(.*)"; + std::string hostRegex = "(.*)"; + std::string portRegex = "(.*)"; + std::string dbNameRegex = "(.*)"; + std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + + usernameRegex + "\\:" + + passwordRegex + "\\@" + + hostRegex + "\\:" + + portRegex + "\\/" + + dbNameRegex; + std::regex uriRegex(uriRegexStr); + std::smatch pieces_match; + + if (std::regex_match(uri, pieces_match, uriRegex)) { + std::string dialect = pieces_match[1].str(); + std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); + if (dialect.find("mysql") == std::string::npos) { + return Status::Error("URI's dialect is not MySQL"); + } + const char* username = pieces_match[2].str().c_str(); + const char* password = pieces_match[3].str().c_str(); + const char* serverAddress = pieces_match[4].str().c_str(); + unsigned int port = 0; + if (!pieces_match[5].str().empty()) { + port = std::stoi(pieces_match[5].str()); + } + const char* dbName = pieces_match[6].str().c_str(); + //std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl; +// connectionPtr->set_option(new MultiStatementsOption(true)); +// connectionPtr->set_option(new mysqlpp::ReconnectOption(true)); + int threadHint = std::thread::hardware_concurrency(); + int maxPoolSize = threadHint == 0 ? 8 : threadHint; + mysql_connection_pool_ = std::make_shared(dbName, username, password, serverAddress, port, maxPoolSize); +// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; + ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize); + try { + + CleanUp(); + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mysql_connection_pool_->getConnectionsInUse(); +// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { +// return Status::Error("DB connection failed: ", connectionPtr->error()); +// } + if (!connectionPtr->thread_aware()) { + ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; + return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it."); + } + Query InitializeQuery = connectionPtr->query(); + +// InitializeQuery << "SET max_allowed_packet=67108864;"; +// if (!InitializeQuery.exec()) { +// return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); +// } + +// InitializeQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; + InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) UNIQUE NOT NULL, " << + "state INT NOT NULL, " << + "dimension SMALLINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "files_cnt BIGINT DEFAULT 0 NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "store_raw_data BOOL DEFAULT false NOT NULL);"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); + } + + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + + InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "file_id VARCHAR(255) NOT NULL, " << + "file_type INT DEFAULT 0 NOT NULL, " << + "size BIGINT DEFAULT 0 NOT NULL, " << + "updated_time BIGINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "date INT DEFAULT -1 NOT NULL);"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str(); + } + + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + } //Scoped Connection + +// //Consume all results to avoid "Commands out of sync" error +// while (InitializeQuery.more_results()) { +// InitializeQuery.store_next(); +// } + return Status::OK(); + +// if (InitializeQuery.exec()) { +// std::cout << "XXXXXXXXXXXXXXXXXXXXXXXXX" << std::endl; +// while (InitializeQuery.more_results()) { +// InitializeQuery.store_next(); +// } +// return Status::OK(); +// } else { +// return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); +// } + } catch (const ConnectionFailed& er) { + ENGINE_LOG_ERROR << "Failed to connect to database server" << ": " << er.what(); + return Status::DBTransactionError("Failed to connect to database server", er.what()); + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR DURING INITIALIZATION" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception during initialization", e); + } + } + else { + ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri; + return Status::Error("Wrong URI format"); + } + } + +// PXU TODO: Temp solution. Will fix later + Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, + const DatesT &dates) { + +// std::lock_guard lock(mysql_mutex); + + if (dates.empty()) { + return Status::OK(); + } + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + try { + + auto yesterday = GetDateWithDelta(-1); + + for (auto &date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions within 2 days"); + } + } + + std::stringstream dateListSS; + for (auto &date : dates) { + dateListSS << std::to_string(date) << ", "; + } + std::string dateListStr = dateListSS.str(); + dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", " + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query dropPartitionsByDatesQuery = connectionPtr->query(); + + dropPartitionsByDatesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "date in (" << dateListStr << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropPartitionsByDates: " << dropPartitionsByDatesQuery.str(); + } + + if (!dropPartitionsByDatesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES"; + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", + dropPartitionsByDatesQuery.error()); + } + } //Scoped Connection + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { + +// std::lock_guard lock(mysql_mutex); + +// server::Metrics::GetInstance().MetaAccessTotalIncrement(); + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query createTableQuery = connectionPtr->query(); +// ENGINE_LOG_DEBUG << "Create Table in"; + if (table_schema.table_id_.empty()) { + NextTableId(table_schema.table_id_); + } else { + createTableQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << ";"; +// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str(); + } + + StoreQueryResult res = createTableQuery.store(); + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (TableSchema::TO_DELETE == state) { + return Status::Error("Table already exists and it is in delete state, please wait a second"); + } + else { + return Status::OK();//table already exists, no error + } + } + } +// ENGINE_LOG_DEBUG << "Create Table start"; + + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + +// auto start_time = METRICS_NOW_TIME; + + std::string id = "NULL"; //auto-increment + std::string table_id = table_schema.table_id_; + std::string state = std::to_string(table_schema.state_); + std::string dimension = std::to_string(table_schema.dimension_); + std::string created_on = std::to_string(table_schema.created_on_); + std::string files_cnt = "0"; + std::string engine_type = std::to_string(table_schema.engine_type_); + std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; + + createTableQuery << "INSERT INTO Tables VALUES" << + "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " << + created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data << ");"; +// ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str(); + } + + if (SimpleResult res = createTableQuery.execute()) { + table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? +// std::cout << table_schema.id_ << std::endl; + //Consume all results to avoid "Commands out of sync" error +// while (createTableQuery.more_results()) { +// createTableQuery.store_next(); +// } + } else { + ENGINE_LOG_ERROR << "Add Table Error"; + return Status::DBTransactionError("Add Table Error", createTableQuery.error()); + } + } //Scoped Connection + +// auto end_time = METRICS_NOW_TIME; +// auto total_time = METRICS_MICROSECONDS(start_time, end_time); +// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + if (!boost::filesystem::is_directory(table_path)) { + auto ret = boost::filesystem::create_directories(table_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + return Status::Error("Failed to create table path"); + } + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); + } catch (std::exception &e) { + return HandleException("Encounter exception when create table", e); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DeleteTable(const std::string& table_id) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + //soft delete table + Query deleteTableQuery = connectionPtr->query(); +// + deleteTableQuery << "UPDATE Tables " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTable: " << deleteTableQuery.str(); + } + + if (!deleteTableQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error()); + } + + } //Scoped Connection + + + if (mode_ != Options::MODE::SINGLE) { + DeleteTableFiles(table_id); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DeleteTableFiles(const std::string& table_id) { + try { + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + //soft delete table files + Query deleteTableFilesQuery = connectionPtr->query(); + // + deleteTableFilesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DeleteTableFiles: " << deleteTableFilesQuery.str(); + } + + if (!deleteTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error()); + } + } //Scoped Connection + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE FILES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query describeTableQuery = connectionPtr->query(); + describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTable: " << describeTableQuery.str(); + } + + res = describeTableQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + const Row& resRow = res[0]; + + table_schema.id_ = resRow["id"]; //implicit conversion + + table_schema.dimension_ = resRow["dimension"]; + + table_schema.files_cnt_ = resRow["files_cnt"]; + + table_schema.engine_type_ = resRow["engine_type"]; + + int store_raw_data = resRow["store_raw_data"]; + table_schema.store_raw_data_ = (store_raw_data == 1); + } + else { + return Status::NotFound("Table " + table_schema.table_id_ + " not found"); + } + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING TABLE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query hasTableQuery = connectionPtr->query(); + //since table_id is a unique column we just need to check whether it exists or not + hasTableQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << table_id << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasTable: " << hasTableQuery.str(); + } + + res = hasTableQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_or_not = (check == 1); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF TABLE EXISTS" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::AllTables(std::vector& table_schema_array) { + +// std::lock_guard lock(mysql_mutex); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query allTablesQuery = connectionPtr->query(); + allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::AllTables: " << allTablesQuery.str(); + } + + res = allTablesQuery.store(); + } //Scoped Connection + + for (auto& resRow : res) { + TableSchema table_schema; + + table_schema.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_schema.table_id_ = table_id; + + table_schema.dimension_ = resRow["dimension"]; + + table_schema.files_cnt_ = resRow["files_cnt"]; + + table_schema.engine_type_ = resRow["engine_type"]; + + int store_raw_data = resRow["store_raw_data"]; + table_schema.store_raw_data_ = (store_raw_data == 1); + + table_schema_array.emplace_back(table_schema); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBING ALL TABLES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { + +// std::lock_guard lock(mysql_mutex); + + if (file_schema.date_ == EmptyDate) { + file_schema.date_ = Meta::GetDate(); + } + TableSchema table_schema; + table_schema.table_id_ = file_schema.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + try { + + MetricCollector metric; + + NextFileId(file_schema.file_id_); + file_schema.file_type_ = TableFileSchema::NEW; + file_schema.dimension_ = table_schema.dimension_; + file_schema.size_ = 0; + file_schema.created_on_ = utils::GetMicroSecTimeStamp(); + file_schema.updated_time_ = file_schema.created_on_; + file_schema.engine_type_ = table_schema.engine_type_; + GetTableFilePath(file_schema); + + std::string id = "NULL"; //auto-increment + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query createTableFileQuery = connectionPtr->query(); + + createTableFileQuery << "INSERT INTO TableFiles VALUES" << + "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << + quote << file_id << ", " << file_type << ", " << size << ", " << + updated_time << ", " << created_on << ", " << date << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTableFile: " << createTableFileQuery.str(); + } + + if (SimpleResult res = createTableFileQuery.execute()) { + file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + + //Consume all results to avoid "Commands out of sync" error +// while (createTableFileQuery.more_results()) { +// createTableFileQuery.store_next(); +// } + } else { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE"; + return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); + } + } // Scoped Connection + + auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + + if (!boost::filesystem::is_directory(partition_path)) { + auto ret = boost::filesystem::create_directory(partition_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + return Status::DBTransactionError("Failed to create partition directory"); + } + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN ADDING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (std::exception& ex) { + return HandleException("Encounter exception when create table file", ex); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query filesToIndexQuery = connectionPtr->query(); + filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToIndex: " << filesToIndexQuery.str(); + } + + res = filesToIndexQuery.store(); + } //Scoped Connection + + std::map groups; + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + auto groupItr = groups.find(table_file.table_id_); + if (groupItr == groups.end()) { + TableSchema table_schema; + table_schema.table_id_ = table_file.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + groups[table_file.table_id_] = table_schema; +// std::cout << table_schema.dimension_ << std::endl; + } + table_file.dimension_ = groups[table_file.table_id_].dimension_; + + GetTableFilePath(table_file); + + files.push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + if (partition.empty()) { + + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + } + + res = filesToSearchQuery.store(); + + } else { + + Query filesToSearchQuery = connectionPtr->query(); + + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "date IN (" << partitionListStr << ") AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + } + + res = filesToSearchQuery.store(); + + } + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, + DatePartionedTableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + files.clear(); + + try { + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query filesToMergeQuery = connectionPtr->query(); + filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type = " << std::to_string(TableFileSchema::RAW) << " " << + "ORDER BY size DESC" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToMerge: " << filesToMergeQuery.str(); + } + + res = filesToMergeQuery.store(); + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) { + +// std::lock_guard lock(mysql_mutex); + + if (ids.empty()) { + return Status::OK(); + } + + std::stringstream idSS; + for (auto& id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR " + + try { + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query getTableFileQuery = connectionPtr->query(); + getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(" << idStr << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str(); + } + + res = getTableFileQuery.store(); + } //Scoped Connection + + assert(res); + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + for (auto& resRow : res) { + + TableFileSchema file_schema; + + file_schema.table_id_ = table_id; + + file_schema.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + file_schema.file_id_ = file_id; + + file_schema.file_type_ = resRow["file_type"]; + + file_schema.size_ = resRow["size"]; + + file_schema.date_ = resRow["date"]; + + file_schema.dimension_ = table_schema.dimension_; + + GetTableFilePath(file_schema); + + table_files.emplace_back(file_schema); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILES", er.what()); + } + + return Status::OK(); + } + +// PXU TODO: Support Swap + Status MySQLMetaImpl::Archive() { + +// std::lock_guard lock(mysql_mutex); + + auto &criterias = options_.archive_conf.GetCriterias(); + if (criterias.empty()) { + return Status::OK(); + } + + for (auto& kv : criterias) { + auto &criteria = kv.first; + auto &limit = kv.second; + if (criteria == "days") { + size_t usecs = limit * D_SEC * US_PS; + long now = utils::GetMicroSecTimeStamp(); + + try { + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query archiveQuery = connectionPtr->query(); + archiveQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE created_on < " << std::to_string(now - usecs) << " AND " << + "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Archive: " << archiveQuery.str(); + } + + if (!archiveQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DURING ARCHIVE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DURING ARCHIVE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DURING ARCHIVE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DURING ARCHIVE", er.what()); + } + } + if (criteria == "disk") { + uint64_t sum = 0; + Size(sum); + + auto to_delete = (sum - limit * G); + DiscardFiles(to_delete); + } + } + + return Status::OK(); + } + + Status MySQLMetaImpl::Size(uint64_t &result) { + +// std::lock_guard lock(mysql_mutex); + + result = 0; + try { + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query getSizeQuery = connectionPtr->query(); + getSizeQuery << "SELECT IFNULL(SUM(size),0) AS sum " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Size: " << getSizeQuery.str(); + } + + res = getSizeQuery.store(); + } //Scoped Connection + + assert(res && res.num_rows() == 1); +// if (!res) { +//// std::cout << "result is NULL" << std::endl; +// return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", getSizeQuery.error()); +// } + if (res.empty()) { + result = 0; +// std::cout << "result = 0" << std::endl; + } + else { + result = res[0]["sum"]; +// std::cout << "result = " << std::to_string(result) << std::endl; + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING SIZE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING SIZE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING SIZE", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) { + +// std::lock_guard lock(mysql_mutex); + + if (to_discard_size <= 0) { +// std::cout << "in" << std::endl; + return Status::OK(); + } + ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; + + try { + + MetricCollector metric; + + bool status; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query discardFilesQuery = connectionPtr->query(); + discardFilesQuery << "SELECT id, size " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "ORDER BY id ASC " << + "LIMIT 10;"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str(); + } + + // std::cout << discardFilesQuery.str() << std::endl; + StoreQueryResult res = discardFilesQuery.store(); + + assert(res); + if (res.num_rows() == 0) { + return Status::OK(); + } + + TableFileSchema table_file; + std::stringstream idsToDiscardSS; + for (auto &resRow : res) { + if (to_discard_size <= 0) { + break; + } + table_file.id_ = resRow["id"]; + table_file.size_ = resRow["size"]; + idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR "; + ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ + << " table_file.size=" << table_file.size_; + to_discard_size -= table_file.size_; + } + + std::string idsToDiscardStr = idsToDiscardSS.str(); + idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " + + discardFilesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE " << idsToDiscardStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str(); + } + + status = discardFilesQuery.exec(); + if (!status) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error()); + } + } //Scoped Connection + + return DiscardFiles(to_discard_size); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DISCARDING FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DISCARDING FILES", er.what()); + } + } + + //ZR: this function assumes all fields in file_schema have value + Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { + +// std::lock_guard lock(mysql_mutex); + + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + try { + + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query updateTableFileQuery = connectionPtr->query(); + + //if the table has been deleted, just mark the table file as TO_DELETE + //clean thread will delete the file later + updateTableFileQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str(); + } + + StoreQueryResult res = updateTableFileQuery.store(); + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (state == TableSchema::TO_DELETE) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + } else { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFileQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFile: " << updateTableFileQuery.str(); + } + + // std::cout << updateTableFileQuery.str() << std::endl; + + if (!updateTableFileQuery.exec()) { + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", + updateTableFileQuery.error()); + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILE", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { + +// std::lock_guard lock(mysql_mutex); + + try { + MetricCollector metric; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query updateTableFilesQuery = connectionPtr->query(); + + std::map has_tables; + for (auto &file_schema : files) { + + if (has_tables.find(file_schema.table_id_) != has_tables.end()) { + continue; + } + + updateTableFilesQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str(); + } + + StoreQueryResult res = updateTableFilesQuery.store(); + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_tables[file_schema.table_id_] = (check == 1); + } + + for (auto &file_schema : files) { + + if (!has_tables[file_schema.table_id_]) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFilesQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFiles: " << updateTableFilesQuery.str(); + } + + if (!updateTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", + updateTableFilesQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +// static int b_count = 0; +// b_count++; +// std::cout << "CleanUpFilesWithTTL: " << b_count << std::endl; +// std::lock_guard lock(mysql_mutex); + + auto now = utils::GetMicroSecTimeStamp(); + try { + MetricCollector metric; + + { + +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << + "updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + + assert(res); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto &resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.date_ = resRow["date"]; + + GetTableFilePath(table_file); + + ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " + << table_file.location_ << std::endl; + boost::filesystem::remove(table_file.location_); + + idsToDelete.emplace_back(std::to_string(table_file.id_)); + } + + if (!idsToDelete.empty()) { + + std::stringstream idsToDeleteSS; + for (auto &id : idsToDelete) { + idsToDeleteSS << "id = " << id << " OR "; + } + + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " << + idsToDeleteStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("CleanUpFilesWithTTL Error", + cleanUpFilesWithTTLQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } + + try { + MetricCollector metric; + + { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = " +// << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id " << + "FROM Tables " << + "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + assert(res); +// std::cout << res.num_rows() << std::endl; + + if (!res.empty()) { + + std::stringstream idsToDeleteSS; + for (auto &resRow : res) { + size_t id = resRow["id"]; + std::string table_id; + resRow["table_id"].to_string(table_id); + + auto table_path = GetTablePath(table_id); + + ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; + boost::filesystem::remove_all(table_path); + + idsToDeleteSS << "id = " << std::to_string(id) << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " << + idsToDeleteStr << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + } + + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", + cleanUpFilesWithTTLQuery.error()); + } + } + } //Scoped Connection + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES WITH TTL" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::CleanUp() { + +// std::lock_guard lock(mysql_mutex); + + try { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; + Query cleanUpQuery = connectionPtr->query(); + cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str(); + } + + if (!cleanUpQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES"; + return Status::DBTransactionError("Clean up Error", cleanUpQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CLEANING UP FILES" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES", er.what()); + } + + return Status::OK(); + } + + Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { + +// std::lock_guard lock(mysql_mutex); + + try { + MetricCollector metric; + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query countQuery = connectionPtr->query(); + countQuery << "SELECT size " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::Count: " << countQuery.str(); + } + + res = countQuery.store(); + } //Scoped Connection + + result = 0; + for (auto &resRow : res) { + size_t size = resRow["size"]; + result += size; + } + + assert(table_schema.dimension_ != 0); + result /= table_schema.dimension_; + result /= sizeof(float); + + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN RETRIEVING COUNT" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING COUNT", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING COUNT" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what()); + } + return Status::OK(); + } + + Status MySQLMetaImpl::DropAll() { + +// std::lock_guard lock(mysql_mutex); + + if (boost::filesystem::is_directory(options_.path)) { + boost::filesystem::remove_all(options_.path); + } + try { + + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + +// if (mysql_connection_pool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mysql_connection_pool_->getConnectionsInUse(); +// } + + Query dropTableQuery = connectionPtr->query(); + dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; + + if (options_.sql_echo) { + ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str(); + } + + if (dropTableQuery.exec()) { + return Status::OK(); + } + else { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE"; + return Status::DBTransactionError("DROP TABLE ERROR", dropTableQuery.error()); + } + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING TABLE" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING TABLE" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what()); + } + return Status::OK(); + } + + MySQLMetaImpl::~MySQLMetaImpl() { +// std::lock_guard lock(mysql_mutex); + CleanUp(); + } + +} // namespace meta +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h new file mode 100644 index 0000000000000000000000000000000000000000..9ff8254b6006d6cc404dd77ae398d90c6963be97 --- /dev/null +++ b/cpp/src/db/MySQLMetaImpl.h @@ -0,0 +1,91 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Meta.h" +#include "Options.h" +#include "MySQLConnectionPool.h" + +#include "mysql++/mysql++.h" +#include + +namespace zilliz { +namespace milvus { +namespace engine { +namespace meta { + +// auto StoragePrototype(const std::string& path); + using namespace mysqlpp; + + class MySQLMetaImpl : public Meta { + public: + MySQLMetaImpl(const DBMetaOptions& options_, const int& mode); + + virtual Status CreateTable(TableSchema& table_schema) override; + virtual Status DescribeTable(TableSchema& group_info_) override; + virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; + virtual Status AllTables(std::vector& table_schema_array) override; + + virtual Status DeleteTable(const std::string& table_id) override; + virtual Status DeleteTableFiles(const std::string& table_id) override; + + virtual Status CreateTableFile(TableFileSchema& file_schema) override; + virtual Status DropPartitionsByDates(const std::string& table_id, + const DatesT& dates) override; + + virtual Status GetTableFiles(const std::string& table_id, + const std::vector& ids, + TableFilesSchema& table_files) override; + + virtual Status UpdateTableFile(TableFileSchema& file_schema) override; + + virtual Status UpdateTableFiles(TableFilesSchema& files) override; + + virtual Status FilesToSearch(const std::string& table_id, + const DatesT& partition, + DatePartionedTableFilesSchema& files) override; + + virtual Status FilesToMerge(const std::string& table_id, + DatePartionedTableFilesSchema& files) override; + + virtual Status FilesToIndex(TableFilesSchema&) override; + + virtual Status Archive() override; + + virtual Status Size(uint64_t& result) override; + + virtual Status CleanUp() override; + + virtual Status CleanUpFilesWithTTL(uint16_t seconds) override; + + virtual Status DropAll() override; + + virtual Status Count(const std::string& table_id, uint64_t& result) override; + + virtual ~MySQLMetaImpl(); + + private: + Status NextFileId(std::string& file_id); + Status NextTableId(std::string& table_id); + Status DiscardFiles(long long to_discard_size); + std::string GetTablePath(const std::string& table_id); + std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date); + void GetTableFilePath(TableFileSchema& group_file); + Status Initialize(); + + const DBMetaOptions options_; + const int mode_; + + std::shared_ptr mysql_connection_pool_; + bool safe_grab = false; + +// std::mutex connectionMutex_; + }; // DBMetaImpl + +} // namespace meta +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 919d21709c638e38af512803c7396b730c24bc05..609e3ca24520e65f89122083fea0779689288049 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -45,15 +45,23 @@ struct DBMetaOptions { std::string path; std::string backend_uri; ArchiveConf archive_conf = ArchiveConf("delete"); + bool sql_echo = false; }; // DBMetaOptions - struct Options { + + typedef enum { + SINGLE, + CLUSTER, + READ_ONLY + } MODE; + Options(); uint16_t memory_sync_interval = 1; //unit: second uint16_t merge_trigger_number = 2; size_t index_trigger_size = ONE_GB; //unit: byte DBMetaOptions meta; + int mode = MODE::SINGLE; }; // Options diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 7892a57f2b933befb343f4207cf17bb5d080b9fd..a3db0bf1103822d5d0babb5d5894719ff2b2d905 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -23,6 +23,33 @@ DBWrapper::DBWrapper() { if(index_size > 0) {//ensure larger than zero, unit is MB opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; } + std::string sql_echo = config.GetValue(CONFIG_DB_SQL_ECHO, "off"); + if (sql_echo == "on") { + opt.meta.sql_echo = true; + } + else if (sql_echo == "off") { + opt.meta.sql_echo = false; + } + else { + std::cout << "ERROR: sql_echo specified in db_config is not one of ['on', 'off']" << std::endl; + kill(0, SIGUSR1); + } + + ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); + std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single"); + if (mode == "single") { + opt.mode = zilliz::milvus::engine::Options::MODE::SINGLE; + } + else if (mode == "cluster") { + opt.mode = zilliz::milvus::engine::Options::MODE::CLUSTER; + } + else if (mode == "read_only") { + opt.mode = zilliz::milvus::engine::Options::MODE::READ_ONLY; + } + else { + std::cout << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl; + kill(0, SIGUSR1); + } //set archive config engine::ArchiveConf::CriteriaT criterial; diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index abf898bacf9305f77af8a45149a88108fac63b3f..768430f02378e81267f6ea12e44d434cdd1ac566 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -19,6 +19,7 @@ static const std::string CONFIG_SERVER_ADDRESS = "address"; static const std::string CONFIG_SERVER_PORT = "port"; static const std::string CONFIG_SERVER_PROTOCOL = "transfer_protocol"; static const std::string CONFIG_SERVER_MODE = "server_mode"; +static const std::string CONFIG_CLUSTER_MODE = "mode"; static const std::string CONFIG_DB = "db_config"; static const std::string CONFIG_DB_URL = "db_backend_url"; @@ -26,6 +27,7 @@ static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; +static const std::string CONFIG_DB_SQL_ECHO = "sql_echo"; static const std::string CONFIG_LOG = "log_config"; diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 269d81a498b17bf9f515b44c81cfd1caedf00ec6..311760948daa31f617a197852084b66bbe25290f 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -7,6 +7,7 @@ GTEST_VERSION=1.8.1 JSONCONS_VERSION=0.126.0 LAPACK_VERSION=v3.8.0 LZ4_VERSION=v1.9.1 +MYSQLPP_VERSION=3.2.4 OPENBLAS_VERSION=v0.3.6 PROMETHEUS_VERSION=v0.7.0 ROCKSDB_VERSION=v6.0.2 diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index b868d0f1f1ae5b3081231b4f41bc476e8a31502d..38046617ae2a5d5c8505d14b2fb341441f4bb116 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -5,15 +5,11 @@ #------------------------------------------------------------------------------- link_directories( "${CMAKE_BINARY_DIR}/lib" - "${GTEST_PREFIX}/lib/" - ) aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) -message(STATUS "GTEST LIB: ${GTEST_PREFIX}/lib") - set(unittest_srcs ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) #${EASYLOGGINGPP_INCLUDE_DIR}/easylogging++.cc) diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 8427639ab95f11d5ec3cb95371fa925e4395c451..5bae9190f5d2002678fa21cf754cac797bf5a435 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -21,10 +21,10 @@ set(db_scheduler_srcs include_directories(/usr/local/cuda/include) link_directories("/usr/local/cuda/lib64") - +include_directories(/usr/include/mysql) set(db_test_src - ${unittest_srcs} + #${unittest_srcs} ${config_files} ${cache_srcs} ${db_srcs} @@ -44,8 +44,9 @@ set(db_libs boost_system boost_filesystem lz4 + mysqlpp ) target_link_libraries(db_test ${db_libs} ${unittest_libs}) -install(TARGETS db_test DESTINATION bin) \ No newline at end of file +install(TARGETS db_test DESTINATION bin) diff --git a/cpp/unittest/db/MySQLMetaImpl_test.cpp b/cpp/unittest/db/MySQLMetaImpl_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..93e0ce9b28ed198fbbe11351b84a41af241274f1 --- /dev/null +++ b/cpp/unittest/db/MySQLMetaImpl_test.cpp @@ -0,0 +1,465 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include +#include + +#include "utils.h" +#include "db/MySQLMetaImpl.h" +#include "db/Factories.h" +#include "db/Utils.h" +#include "db/MetaConsts.h" + +#include "mysql++/mysql++.h" + +#include + +using namespace zilliz::milvus::engine; + +//TEST_F(MySQLTest, InitializeTest) { +// DBMetaOptions options; +// //dialect+driver://username:password@host:port/database +// options.backend_uri = "mysql://root:1234@:/test"; +// meta::MySQLMetaImpl impl(options); +// auto status = impl.Initialize(); +// std::cout << status.ToString() << std::endl; +// ASSERT_TRUE(status.ok()); +//} + +TEST_F(MySQLTest, core) { +// DBMetaOptions options; +// //dialect+driver://username:password@host:port/database +// options.backend_uri = "mysql://root:1234@:/test"; +// options.path = "/tmp/vecwise_test"; + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(getDBMetaOptions(), mode); +// auto status = impl.Initialize(); +// ASSERT_TRUE(status.ok()); + + meta::TableSchema schema1; + schema1.table_id_ = "test1"; + schema1.dimension_ = 123; + + auto status = impl.CreateTable(schema1); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableSchema schema2; + schema2.table_id_ = "test2"; + schema2.dimension_ = 321; + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; +// ASSERT_THROW(impl.CreateTable(schema), mysqlpp::BadQuery); + ASSERT_TRUE(status.ok()); + + status = impl.DeleteTable(schema2.table_id_); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + size_t id1 = schema1.id_; + long created_on1 = schema1.created_on_; + status = impl.DescribeTable(schema1); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(schema1.id_, id1); + ASSERT_EQ(schema1.table_id_, "test1"); + ASSERT_EQ(schema1.created_on_, created_on1); + ASSERT_EQ(schema1.files_cnt_, 0); + ASSERT_EQ(schema1.engine_type_, 1); + ASSERT_EQ(schema1.store_raw_data_, false); + + bool check; + status = impl.HasTable("test1", check); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(check, true); + + std::vector table_schema_array; + status = impl.AllTables(table_schema_array); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_schema_array.size(), 1); + meta::TableSchema resultSchema = table_schema_array[0]; + ASSERT_EQ(resultSchema.id_, id1); + ASSERT_EQ(resultSchema.table_id_, "test1"); + ASSERT_EQ(resultSchema.dimension_, 123); + ASSERT_EQ(resultSchema.files_cnt_, 0); + ASSERT_EQ(resultSchema.engine_type_, 1); + ASSERT_EQ(resultSchema.store_raw_data_, false); + + meta::TableFileSchema tableFileSchema; + tableFileSchema.table_id_ = "test1"; + + status = impl.CreateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + tableFileSchema.file_type_ = meta::TableFileSchema::TO_INDEX; + status = impl.UpdateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableFilesSchema filesToIndex; + status = impl.FilesToIndex(filesToIndex); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToIndex.size(), 1); + meta::TableFileSchema fileToIndex = filesToIndex[0]; + ASSERT_EQ(fileToIndex.table_id_, "test1"); + ASSERT_EQ(fileToIndex.dimension_, 123); + +// meta::TableFilesSchema filesToIndex; +// status = impl.FilesToIndex(filesToIndex); +// ASSERT_TRUE(status.ok()); +// ASSERT_EQ(filesToIndex.size(), 0); + + meta::DatesT partition; + partition.push_back(tableFileSchema.date_); + meta::DatePartionedTableFilesSchema filesToSearch; + status = impl.FilesToSearch(tableFileSchema.table_id_, partition, filesToSearch); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToSearch.size(), 1); + ASSERT_EQ(filesToSearch[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToSearch = filesToSearch[tableFileSchema.date_][0]; + ASSERT_EQ(fileToSearch.table_id_, "test1"); + ASSERT_EQ(fileToSearch.dimension_, 123); + + tableFileSchema.file_type_ = meta::TableFileSchema::RAW; + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + meta::DatePartionedTableFilesSchema filesToMerge; + status = impl.FilesToMerge(tableFileSchema.table_id_, filesToMerge); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToMerge.size(), 1); + ASSERT_EQ(filesToMerge[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToMerge = filesToMerge[tableFileSchema.date_][0]; + ASSERT_EQ(fileToMerge.table_id_, "test1"); + ASSERT_EQ(fileToMerge.dimension_, 123); + + meta::TableFilesSchema resultTableFilesSchema; + std::vector ids; + ids.push_back(tableFileSchema.id_); + status = impl.GetTableFiles(tableFileSchema.table_id_, ids, resultTableFilesSchema); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultTableFilesSchema.size(), 1); + meta::TableFileSchema resultTableFileSchema = resultTableFilesSchema[0]; +// ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_); + ASSERT_EQ(resultTableFileSchema.table_id_, tableFileSchema.table_id_); + ASSERT_EQ(resultTableFileSchema.file_id_, tableFileSchema.file_id_); + ASSERT_EQ(resultTableFileSchema.file_type_, tableFileSchema.file_type_); + ASSERT_EQ(resultTableFileSchema.size_, tableFileSchema.size_); + ASSERT_EQ(resultTableFileSchema.date_, tableFileSchema.date_); + ASSERT_EQ(resultTableFileSchema.engine_type_, tableFileSchema.engine_type_); + ASSERT_EQ(resultTableFileSchema.dimension_, tableFileSchema.dimension_); + + tableFileSchema.size_ = 234; + meta::TableSchema schema3; + schema3.table_id_ = "test3"; + schema3.dimension_ = 321; + status = impl.CreateTable(schema3); + ASSERT_TRUE(status.ok()); + meta::TableFileSchema tableFileSchema2; + tableFileSchema2.table_id_ = "test3"; + tableFileSchema2.size_ = 345; + status = impl.CreateTableFile(tableFileSchema2); + ASSERT_TRUE(status.ok()); + meta::TableFilesSchema filesToUpdate; + filesToUpdate.emplace_back(tableFileSchema); + filesToUpdate.emplace_back(tableFileSchema2); + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + uint64_t resultSize; + status = impl.Size(resultSize); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultSize, tableFileSchema.size_ + tableFileSchema2.size_); + + uint64_t countResult; + status = impl.Count(tableFileSchema.table_id_, countResult); + ASSERT_TRUE(status.ok()); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); + +} + +TEST_F(MySQLTest, GROUP_TEST) { + + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(getDBMetaOptions(), mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + auto gid = group.id_; + group.id_ = -1; + status = impl.DescribeTable(group); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(group.id_, gid); + ASSERT_EQ(group.table_id_, table_id); + + group.table_id_ = "not_found"; + status = impl.DescribeTable(group); + ASSERT_TRUE(!status.ok()); + + group.table_id_ = table_id; + status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + group.table_id_ = ""; + status = impl.CreateTable(group); + ASSERT_TRUE(status.ok()); + + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, table_file_TEST) { + + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(getDBMetaOptions(), mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + group.dimension_ = 256; + auto status = impl.CreateTable(group); + + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + status = impl.CreateTableFile(table_file); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); + + uint64_t cnt = 0; + status = impl.Count(table_id, cnt); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(cnt, 0UL); + + auto file_id = table_file.file_id_; + + auto new_file_type = meta::TableFileSchema::INDEX; + table_file.file_type_ = new_file_type; + + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, new_file_type); + + meta::DatesT dates; + dates.push_back(meta::Meta::GetDate()); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_FALSE(status.ok()); + + dates.clear(); + for (auto i=2; i < 10; ++i) { + dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + } + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + + table_file.date_ = meta::Meta::GetDateWithDelta(-2); + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2)); + ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); + + dates.clear(); + dates.push_back(table_file.date_); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + + std::vector ids = {table_file.id_}; + meta::TableFilesSchema files; + status = impl.GetTableFiles(table_file.table_id_, ids, files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(files.size(), 1UL); + ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) { + srand(time(0)); + DBMetaOptions options = getDBMetaOptions(); + int days_num = rand() % 100; + std::stringstream ss; + ss << "days:" << days_num; + options.archive_conf = ArchiveConf("delete", ss.str()); + int mode = Options::MODE::SINGLE; + meta::MySQLMetaImpl impl(options, mode); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + + meta::TableFilesSchema files; + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + + auto cnt = 100; + long ts = utils::GetMicroSecTimeStamp(); + std::vector days; + std::vector ids; + for (auto i=0; i ids; + for (auto i=0; i& vectors) { - vectors.clear(); - vectors.resize(n*TABLE_DIM); - float* data = vectors.data(); - for(int i = 0; i < n; i++) { - for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); - data[TABLE_DIM * i] += i / 2000.; + void BuildVectors(int64_t n, std::vector& vectors) { + vectors.clear(); + vectors.resize(n*TABLE_DIM); + float* data = vectors.data(); + for(int i = 0; i < n; i++) { + for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } } -} } @@ -292,3 +293,251 @@ TEST_F(DBTest2, DELETE_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); }; + +TEST_F(MySQLDBTest, DB_TEST) { + + auto options = GetOptions(); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + int64_t nb = 50; + std::vector xb; + BuildVectors(nb, xb); + + int64_t qb = 5; + std::vector qxb; + BuildVectors(qb, qxb); + + std::thread search([&]() { + engine::QueryResults results; + int k = 10; + std::this_thread::sleep_for(std::chrono::seconds(2)); + + INIT_TIMER; + std::stringstream ss; + uint64_t count = 0; + uint64_t prev_count = 0; + + for (auto j=0; j<10; ++j) { + ss.str(""); + db_->Size(count); + prev_count = count; + + START_TIMER; + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); + ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + STOP_TIMER(ss.str()); + + ASSERT_STATS(stat); + for (auto k=0; k= prev_count); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + int loop = 100000; + + for (auto i=0; iInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); + ASSERT_EQ(target_ids.size(), qb); + } else { + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + } + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + search.join(); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; + +TEST_F(MySQLDBTest, SEARCH_TEST) { + auto options = GetOptions(); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + // prepare raw data + size_t nb = 250000; + size_t nq = 10; + size_t k = 5; + std::vector xb(nb*TABLE_DIM); + std::vector xq(nq*TABLE_DIM); + std::vector ids(nb); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution<> dis_xt(-1.0, 1.0); + for (size_t i = 0; i < nb*TABLE_DIM; i++) { + xb[i] = dis_xt(gen); + if (i < nb){ + ids[i] = i; + } + } + for (size_t i = 0; i < nq*TABLE_DIM; i++) { + xq[i] = dis_xt(gen); + } + + // result data + //std::vector nns_gt(k*nq); + std::vector nns(k*nq); // nns = nearst neg search + //std::vector dis_gt(k*nq); + std::vector dis(k*nq); + + // insert data + const int batch_size = 100; + for (int j = 0; j < nb / batch_size; ++j) { + stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids); + if (j == 200){ sleep(1);} + ASSERT_STATS(stat); + } + + sleep(2); // wait until build index finish + + engine::QueryResults results; + stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results); + ASSERT_STATS(stat); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; + + // TODO(linxj): add groundTruth assert +}; + +TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) { + + auto options = GetOptions(); + options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + std::vector table_schema_array; + stat = db_->AllTables(table_schema_array); + ASSERT_STATS(stat); + bool bfound = false; + for(auto& schema : table_schema_array) { + if(schema.table_id_ == TABLE_NAME) { + bfound = true; + break; + } + } + ASSERT_TRUE(bfound); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = 10; + std::vector xb; + BuildVectors(nb, xb); + + int loop = 100000; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + db_->Size(size); + LOG(DEBUG) << "size=" << size; + ASSERT_LE(size, 1 * engine::meta::G); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; + +TEST_F(MySQLDBTest, DELETE_TEST) { + + auto options = GetOptions(); + options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); +// std::cout << stat.ToString() << std::endl; + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + +// std::cout << "location: " << table_info_get.location_ << std::endl; + ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_)); + + engine::IDNumbers vector_ids; + + uint64_t size; + db_->Size(size); + + int64_t nb = 100000; + std::vector xb; + BuildVectors(nb, xb); + + int loop = 20; + for (auto i=0; iInsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + std::vector dates; + stat = db_->DeleteTable(TABLE_NAME, dates); +// std::cout << "5 sec start" << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(5)); +// std::cout << "5 sec finish" << std::endl; + ASSERT_TRUE(stat.ok()); +// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_)); + + delete db_; + + auto dummyDB = engine::DBFactory::Build(options); + dummyDB->DropAll(); + delete dummyDB; +}; diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 0de876b39c97f5e9d0815615d36a8ca4d47c5962..6b1fc1e407d7eea883cc5b7e2a79e1ac9486a2f1 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -11,9 +11,29 @@ #include "utils.h" #include "db/Factories.h" +#include "db/Options.h" + +INITIALIZE_EASYLOGGINGPP using namespace zilliz::milvus; +static std::string uri; + +class DBTestEnvironment : public ::testing::Environment { +public: + +// explicit DBTestEnvironment(std::string uri) : uri_(uri) {} + + static std::string getURI() { + return uri; + } + + void SetUp() override { + getURI(); + } + +}; + void ASSERT_STATS(engine::Status& stat) { ASSERT_TRUE(stat.ok()); if(!stat.ok()) { @@ -21,6 +41,7 @@ void ASSERT_STATS(engine::Status& stat) { } } + void DBTest::InitLog() { el::Configurations defaultConf; defaultConf.setToDefault(); @@ -32,6 +53,7 @@ void DBTest::InitLog() { engine::Options DBTest::GetOptions() { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; return options; } @@ -50,6 +72,7 @@ engine::Options DBTest2::GetOptions() { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; options.meta.archive_conf = engine::ArchiveConf("delete", "disk:1"); + options.meta.backend_uri = "sqlite://:@:/"; return options; } @@ -61,3 +84,29 @@ void MetaTest::SetUp() { void MetaTest::TearDown() { impl_->DropAll(); } + +zilliz::milvus::engine::DBMetaOptions MySQLTest::getDBMetaOptions() { +// std::string path = "/tmp/milvus_test"; +// engine::DBMetaOptions options = engine::DBMetaOptionsFactory::Build(path); + zilliz::milvus::engine::DBMetaOptions options; + options.path = "/tmp/milvus_test"; + options.backend_uri = DBTestEnvironment::getURI(); + return options; +} + +zilliz::milvus::engine::Options MySQLDBTest::GetOptions() { + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = DBTestEnvironment::getURI(); + return options; +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + uri = argv[1]; + } +// std::cout << uri << std::endl; + ::testing::AddGlobalTestEnvironment(new DBTestEnvironment); + return RUN_ALL_TESTS(); +} diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index cf0ff360f1b4785f8ed08875f8f80efe8927b9f5..361c24b4becf2a48b5df1f2f9fbe3b62f5b1d767 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -8,9 +8,11 @@ #include #include +//#include #include "db/DB.h" #include "db/DBMetaImpl.h" +#include "db/MySQLMetaImpl.h" #define TIMING @@ -28,9 +30,28 @@ #define STOP_TIMER(name) #endif - void ASSERT_STATS(zilliz::milvus::engine::Status& stat); +//class TestEnv : public ::testing::Environment { +//public: +// +// static std::string getURI() { +// if (const char* uri = std::getenv("MILVUS_DBMETA_URI")) { +// return uri; +// } +// else { +// return ""; +// } +// } +// +// void SetUp() override { +// getURI(); +// } +// +//}; +// +//::testing::Environment* const test_env = +// ::testing::AddGlobalTestEnvironment(new TestEnv); class DBTest : public ::testing::Test { protected: @@ -55,3 +76,14 @@ protected: virtual void SetUp() override; virtual void TearDown() override; }; + +class MySQLTest : public ::testing::Test { +protected: +// std::shared_ptr impl_; + zilliz::milvus::engine::DBMetaOptions getDBMetaOptions(); +}; + +class MySQLDBTest : public ::testing::Test { +protected: + zilliz::milvus::engine::Options GetOptions(); +}; diff --git a/cpp/unittest/server/appendix/server_config.yaml b/cpp/unittest/server/appendix/server_config.yaml index 90194619400afa148508a9a03401e252cdeba84c..c937c578331530aa7b774d94ff17e19260dc16c2 100644 --- a/cpp/unittest/server/appendix/server_config.yaml +++ b/cpp/unittest/server/appendix/server_config.yaml @@ -2,11 +2,14 @@ server_config: address: 0.0.0.0 port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 - mode: single # milvus deployment type: single, cluster + mode: single # milvus deployment type: single, cluster, read_only db_config: db_path: /tmp/milvus # milvus data storage path - db_backend_url: http://127.0.0.1 # meta database uri + #URI format: dialect://username:password@host:port/database + #All parts except dialect are optional, but you MUST include the delimiters + #Currently supports mysql or sqlite + db_backend_url: mysql://root:1234@:/test # meta database uri index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB archive_days_threshold: 30 # files older than x days will be archived, unit: day