diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index 55b4266387a890db85a8e3d7cb5311294429d029..9aa3f62124a58c4d6f345532c72601a03769a78c 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -271,7 +271,7 @@ endif() if(DEFINED ENV{MILVUS_MYSQLPP_URL}) set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}") else() - set(MYSQLPP_SOURCE_URL "https://github.com/youny626/mysqlpp.git") + set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz") endif() if (DEFINED ENV{MILVUS_OPENBLAS_URL}) @@ -1095,18 +1095,18 @@ macro(build_mysqlpp) "LDFLAGS=-pthread") externalproject_add(mysqlpp_ep -# URL -# ${MYSQLPP_SOURCE_URL} - GIT_REPOSITORY + URL ${MYSQLPP_SOURCE_URL} - GIT_TAG - ${MYSQLPP_VERSION} - GIT_SHALLOW - TRUE +# GIT_REPOSITORY +# ${MYSQLPP_SOURCE_URL} +# GIT_TAG +# ${MYSQLPP_VERSION} +# GIT_SHALLOW +# TRUE ${EP_LOG_OPTIONS} CONFIGURE_COMMAND - "./bootstrap" - COMMAND +# "./bootstrap" +# COMMAND "./configure" ${MYSQLPP_CONFIGURE_ARGS} BUILD_COMMAND diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index 05705bcc4c0c7752c04af5386271ef7195d29eba..9f60f0f2126aaf044a67dab0ef23ee4d5e39ddb0 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -10,7 +10,7 @@ db_config: db_path: /tmp/milvus #URI format: dialect://username:password@host:port/database #All parts except dialect are optional, but you MUST include the delimiters - db_backend_url: sqlite://:@:/ + db_backend_url: mysql://root:1234@:/test index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB metric_config: diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 7c28c83aeb6e21edea4258f379d370a1eddbb966..3a955e0654744506c5c4f6c22959330b20bc8b51 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -183,7 +183,10 @@ endif () install(TARGETS milvus_server DESTINATION bin) -install(FILES ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX} +install(FILES + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX} + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3 + ${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4 DESTINATION bin) #need to copy libmysqlpp.so add_subdirectory(sdk) diff --git a/cpp/src/db/MySQLConnectionPool.h b/cpp/src/db/MySQLConnectionPool.h index 26fc4432ec237de1afd2b0fcd6f4a39a36cef7f1..7d3a1f88a350dff400344ad51caae5bf9e91b1f9 100644 --- a/cpp/src/db/MySQLConnectionPool.h +++ b/cpp/src/db/MySQLConnectionPool.h @@ -3,6 +3,8 @@ #include #include +#include "Log.h" + class MySQLConnectionPool : public mysqlpp::ConnectionPool { public: @@ -43,6 +45,7 @@ public: sleep(1); } + ENGINE_LOG_DEBUG << "conns_in_use_ in grab: " << conns_in_use_ << std::endl; ++conns_in_use_; return mysqlpp::ConnectionPool::grab(); } @@ -50,7 +53,11 @@ public: // Other half of in-use conn count limit void release(const mysqlpp::Connection* pc) override { mysqlpp::ConnectionPool::release(pc); + ENGINE_LOG_DEBUG << "conns_in_use_ in release: " << conns_in_use_ << std::endl; --conns_in_use_; + if (conns_in_use_ < 0) { + ENGINE_LOG_DEBUG << "conns_in_use_ in release < 0: " << conns_in_use_ << std::endl; + } } void set_max_idle_time(int max_idle) { diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index e0c903795d999c3f2fdd43cca27bfff8b94a0a38..6b733ec5aec63b6dc666870822d1b5131fe8d49b 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -160,25 +160,27 @@ namespace meta { // std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; try { - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + + CleanUp(); + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); // if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { // return Status::Error("DB connection failed: ", connectionPtr->error()); // } - if (!connectionPtr->thread_aware()) { - ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; - return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it."); - } - - CleanUp(); - Query InitializeQuery = connectionPtr->query(); + if (!connectionPtr->thread_aware()) { + ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it."; + return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it."); + } + Query InitializeQuery = connectionPtr->query(); // InitializeQuery << "SET max_allowed_packet=67108864;"; // if (!InitializeQuery.exec()) { // return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); // } -// InitializeQuery << "DROP TABLE IF EXISTS meta, metaFile;"; - InitializeQuery << "CREATE TABLE IF NOT EXISTS meta (" << +// InitializeQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; + InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" << "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << "table_id VARCHAR(255) UNIQUE NOT NULL, " << "state INT NOT NULL, " << @@ -187,23 +189,24 @@ namespace meta { "files_cnt BIGINT DEFAULT 0 NOT NULL, " << "engine_type INT DEFAULT 1 NOT NULL, " << "store_raw_data BOOL DEFAULT false NOT NULL);"; - if (!InitializeQuery.exec()) { - return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); - } + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } - InitializeQuery << "CREATE TABLE IF NOT EXISTS metaFile (" << - "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << - "table_id VARCHAR(255) NOT NULL, " << - "engine_type INT DEFAULT 1 NOT NULL, " << - "file_id VARCHAR(255) NOT NULL, " << - "file_type INT DEFAULT 0 NOT NULL, " << - "size BIGINT DEFAULT 0 NOT NULL, " << - "updated_time BIGINT NOT NULL, " << - "created_on BIGINT NOT NULL, " << - "date INT DEFAULT -1 NOT NULL);"; - if (!InitializeQuery.exec()) { - return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); - } + InitializeQuery << "CREATE TABLE IF NOT EXISTS TableFiles (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "file_id VARCHAR(255) NOT NULL, " << + "file_type INT DEFAULT 0 NOT NULL, " << + "size BIGINT DEFAULT 0 NOT NULL, " << + "updated_time BIGINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "date INT DEFAULT -1 NOT NULL);"; + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + } //Scoped Connection // //Consume all results to avoid "Commands out of sync" error // while (InitializeQuery.more_results()) { @@ -247,7 +250,7 @@ namespace meta { // std::lock_guard lock(mysql_mutex); - if (dates.size() == 0) { + if (dates.empty()) { return Status::OK(); } @@ -260,8 +263,6 @@ namespace meta { try { - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - auto yesterday = GetDateWithDelta(-1); for (auto &date : dates) { @@ -270,24 +271,29 @@ namespace meta { } } - Query dropPartitionsByDatesQuery = connectionPtr->query(); - std::stringstream dateListSS; for (auto &date : dates) { - dateListSS << std::to_string(date) << ", "; + dateListSS << std::to_string(date) << ", "; } std::string dateListStr = dateListSS.str(); dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", " - dropPartitionsByDatesQuery << "UPDATE metaFile " << - "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << - "WHERE table_id = " << quote << table_id << " AND " << - "date in (" << dateListStr << ");"; + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - if (!dropPartitionsByDatesQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES"; - return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error()); - } + Query dropPartitionsByDatesQuery = connectionPtr->query(); + + dropPartitionsByDatesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "date in (" << dateListStr << ");"; + + if (!dropPartitionsByDatesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES"; + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", + dropPartitionsByDatesQuery.error()); + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors @@ -310,58 +316,61 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - - Query createTableQuery = connectionPtr->query(); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - if (table_schema.table_id_.empty()) { - NextTableId(table_schema.table_id_); - } - else { - createTableQuery << "SELECT state FROM meta " << - "WHERE table_id = " << quote << table_schema.table_id_ << ";"; - StoreQueryResult res = createTableQuery.store(); - assert(res && res.num_rows() <= 1); - if (res.num_rows() == 1) { - int state = res[0]["state"]; - std::string msg = (TableSchema::TO_DELETE == state) ? - "Table already exists and it is in delete state, please wait a second" : "Table already exists"; - return Status::Error(msg); + Query createTableQuery = connectionPtr->query(); + ENGINE_LOG_DEBUG << "Create Table in"; + if (table_schema.table_id_.empty()) { + NextTableId(table_schema.table_id_); + } else { + createTableQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << ";"; + ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + StoreQueryResult res = createTableQuery.store(); + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + std::string msg = (TableSchema::TO_DELETE == state) ? + "Table already exists and it is in delete state, please wait a second" + : "Table already exists"; + ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable: " << msg; + return Status::Error(msg); + } } - } + ENGINE_LOG_DEBUG << "Create Table start"; - table_schema.files_cnt_ = 0; - table_schema.id_ = -1; - table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); // auto start_time = METRICS_NOW_TIME; - std::string id = "NULL"; //auto-increment - std::string table_id = table_schema.table_id_; - std::string state = std::to_string(table_schema.state_); - std::string dimension = std::to_string(table_schema.dimension_); - std::string created_on = std::to_string(table_schema.created_on_); - std::string files_cnt = "0"; - std::string engine_type = std::to_string(table_schema.engine_type_); - std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; - - createTableQuery << "INSERT INTO meta VALUES" << - "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " << - created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data - << ");"; - - if (SimpleResult res = createTableQuery.execute()) { - table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + std::string id = "NULL"; //auto-increment + std::string table_id = table_schema.table_id_; + std::string state = std::to_string(table_schema.state_); + std::string dimension = std::to_string(table_schema.dimension_); + std::string created_on = std::to_string(table_schema.created_on_); + std::string files_cnt = "0"; + std::string engine_type = std::to_string(table_schema.engine_type_); + std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; + + createTableQuery << "INSERT INTO Tables VALUES" << + "(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " << + created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data << ");"; + ENGINE_LOG_DEBUG << "Create Table : " << createTableQuery.str(); + if (SimpleResult res = createTableQuery.execute()) { + table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? // std::cout << table_schema.id_ << std::endl; - //Consume all results to avoid "Commands out of sync" error + //Consume all results to avoid "Commands out of sync" error // while (createTableQuery.more_results()) { // createTableQuery.store_next(); // } - } - else { - ENGINE_LOG_ERROR << "Add Table Error"; - return Status::DBTransactionError("Add Table Error", createTableQuery.error()); - } + } else { + ENGINE_LOG_ERROR << "Add Table Error"; + return Status::DBTransactionError("Add Table Error", createTableQuery.error()); + } + } //Scoped Connection // auto end_time = METRICS_NOW_TIME; // auto total_time = METRICS_MICROSECONDS(start_time, end_time); @@ -399,19 +408,22 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - //soft delete table - Query deleteTableQuery = connectionPtr->query(); + //soft delete table + Query deleteTableQuery = connectionPtr->query(); // - deleteTableQuery << "UPDATE meta " << - "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " << - "WHERE table_id = " << quote << table_id << ";"; + deleteTableQuery << "UPDATE Tables " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << ";"; - if (!deleteTableQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE"; - return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error()); - } + if (!deleteTableQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableQuery.error()); + } + + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DELETING TABLE" << ": " << er.what(); @@ -429,21 +441,22 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - - //soft delete table files - Query deleteTableFilesQuery = connectionPtr->query(); - // - deleteTableFilesQuery << "UPDATE metaFile " << - "SET state = " << std::to_string(TableSchema::TO_DELETE) << ", " << - "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " - "WHERE table_id = " << quote << table_id << ";"; - - if (!deleteTableFilesQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES"; - return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error()); - } + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + //soft delete table files + Query deleteTableFilesQuery = connectionPtr->query(); + // + deleteTableFilesQuery << "UPDATE TableFiles " << + "SET state = " << std::to_string(TableSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE table_id = " << quote << table_id << ";"; + + if (!deleteTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", deleteTableFilesQuery.error()); + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors ENGINE_LOG_ERROR << "QUERY ERROR WHEN DELETING TABLE FILES" << ": " << er.what(); @@ -465,14 +478,18 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query describeTableQuery = connectionPtr->query(); - describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << - "FROM meta " << - "WHERE table_id = " << quote << table_schema.table_id_ << " " << - "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; - StoreQueryResult res = describeTableQuery.store(); + Query describeTableQuery = connectionPtr->query(); + describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE table_id = " << quote << table_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + res = describeTableQuery.store(); + } //Scoped Connection assert(res && res.num_rows() <= 1); if (res.num_rows() == 1) { @@ -516,16 +533,20 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query hasTableQuery = connectionPtr->query(); - //since table_id is a unique column we just need to check whether it exists or not - hasTableQuery << "SELECT EXISTS " << - "(SELECT 1 FROM meta " << - "WHERE table_id = " << quote << table_id << " " << - "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << - "AS " << quote << "check" << ";"; - StoreQueryResult res = hasTableQuery.store(); + Query hasTableQuery = connectionPtr->query(); + //since table_id is a unique column we just need to check whether it exists or not + hasTableQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << table_id << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + res = hasTableQuery.store(); + } //Scoped Connection assert(res && res.num_rows() == 1); int check = res[0]["check"]; @@ -552,13 +573,17 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query allTablesQuery = connectionPtr->query(); - allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << - "FROM meta " << - "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; - StoreQueryResult res = allTablesQuery.store(); + Query allTablesQuery = connectionPtr->query(); + allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM Tables " << + "WHERE state <> " << std::to_string(TableSchema::TO_DELETE) << ";"; + res = allTablesQuery.store(); + } //Scoped Connection for (auto& resRow : res) { TableSchema table_schema; @@ -610,8 +635,6 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - NextFileId(file_schema.file_id_); file_schema.file_type_ = TableFileSchema::NEW; file_schema.dimension_ = table_schema.dimension_; @@ -621,7 +644,6 @@ namespace meta { file_schema.engine_type_ = table_schema.engine_type_; GetTableFilePath(file_schema); - Query createTableFileQuery = connectionPtr->query(); std::string id = "NULL"; //auto-increment std::string table_id = file_schema.table_id_; std::string engine_type = std::to_string(file_schema.engine_type_); @@ -632,23 +654,28 @@ namespace meta { std::string created_on = std::to_string(file_schema.created_on_); std::string date = std::to_string(file_schema.date_); - createTableFileQuery << "INSERT INTO metaFile VALUES" << - "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << - quote << file_id << ", " << file_type << ", " << size << ", " << - updated_time << ", " << created_on << ", " << date << ");"; + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - if (SimpleResult res = createTableFileQuery.execute()) { - file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + Query createTableFileQuery = connectionPtr->query(); - //Consume all results to avoid "Commands out of sync" error + createTableFileQuery << "INSERT INTO TableFiles VALUES" << + "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << + quote << file_id << ", " << file_type << ", " << size << ", " << + updated_time << ", " << created_on << ", " << date << ");"; + + if (SimpleResult res = createTableFileQuery.execute()) { + file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + + //Consume all results to avoid "Commands out of sync" error // while (createTableFileQuery.more_results()) { // createTableFileQuery.store_next(); // } - } - else { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE"; - return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); - } + } else { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE"; + return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); + } + } // Scoped Connection auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); @@ -685,13 +712,17 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query filesToIndexQuery = connectionPtr->query(); - filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; - StoreQueryResult res = filesToIndexQuery.store(); + Query filesToIndexQuery = connectionPtr->query(); + filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; + res = filesToIndexQuery.store(); + } //Scoped Connection std::map groups; TableFileSchema table_file; @@ -757,43 +788,44 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - StoreQueryResult res; - if (partition.empty()) { + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query filesToSearchQuery = connectionPtr->query(); - filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE table_id = " << quote << table_id << " AND " << - "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << - "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << - "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; - res = filesToSearchQuery.store(); + if (partition.empty()) { - } - else { + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + res = filesToSearchQuery.store(); - Query filesToSearchQuery = connectionPtr->query(); + } else { - std::stringstream partitionListSS; - for (auto &date : partition) { - partitionListSS << std::to_string(date) << ", "; - } - std::string partitionListStr = partitionListSS.str(); - partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + Query filesToSearchQuery = connectionPtr->query(); - filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << - "FROM metaFile " << + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << "WHERE table_id = " << quote << table_id << " AND " << "date IN (" << partitionListStr << ") AND " << "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; - res = filesToSearchQuery.store(); + res = filesToSearchQuery.store(); - } + } + } //Scoped Connection TableSchema table_schema; table_schema.table_id_ = table_id; @@ -857,15 +889,19 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; - Query filesToMergeQuery = connectionPtr->query(); - filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE table_id = " << quote << table_id << " AND " << - "file_type = " << std::to_string(TableFileSchema::RAW) << " " << - "ORDER BY size DESC" << ";"; - StoreQueryResult res = filesToMergeQuery.store(); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + + Query filesToMergeQuery = connectionPtr->query(); + filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type = " << std::to_string(TableFileSchema::RAW) << " " << + "ORDER BY size DESC" << ";"; + res = filesToMergeQuery.store(); + } //Scoped Connection TableSchema table_schema; table_schema.table_id_ = table_id; @@ -934,14 +970,18 @@ namespace meta { try { - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query getTableFileQuery = connectionPtr->query(); - getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << - "FROM metaFile " << - "WHERE table_id = " << quote << table_id << " AND " << - "(" << idStr << ");"; - StoreQueryResult res = getTableFileQuery.store(); + Query getTableFileQuery = connectionPtr->query(); + getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(" << idStr << ");"; + res = getTableFileQuery.store(); + } //Scoped Connection assert(res); @@ -1011,7 +1051,7 @@ namespace meta { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); Query archiveQuery = connectionPtr->query(); - archiveQuery << "UPDATE metaFile " << + archiveQuery << "UPDATE TableFiles " << "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << "WHERE created_on < " << std::to_string(now - usecs) << " AND " << "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; @@ -1048,13 +1088,17 @@ namespace meta { result = 0; try { - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query getSizeQuery = connectionPtr->query(); - getSizeQuery << "SELECT SUM(size) AS sum " << - "FROM metaFile " << - "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; - StoreQueryResult res = getSizeQuery.store(); + Query getSizeQuery = connectionPtr->query(); + getSizeQuery << "SELECT SUM(size) AS sum " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + res = getSizeQuery.store(); + } //Scoped Connection assert(res && res.num_rows() == 1); // if (!res) { @@ -1097,51 +1141,55 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + bool status; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query discardFilesQuery = connectionPtr->query(); - discardFilesQuery << "SELECT id, size " << - "FROM metaFile " << - "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " << - "ORDER BY id ASC " << - "LIMIT 10;"; + Query discardFilesQuery = connectionPtr->query(); + discardFilesQuery << "SELECT id, size " << + "FROM TableFiles " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "ORDER BY id ASC " << + "LIMIT 10;"; // std::cout << discardFilesQuery.str() << std::endl; - StoreQueryResult res = discardFilesQuery.store(); + StoreQueryResult res = discardFilesQuery.store(); - assert(res); - if (res.num_rows() == 0) { - return Status::OK(); - } + assert(res); + if (res.num_rows() == 0) { + return Status::OK(); + } - TableFileSchema table_file; - std::stringstream idsToDiscardSS; - for (auto& resRow : res) { - if (to_discard_size <= 0) { - break; + TableFileSchema table_file; + std::stringstream idsToDiscardSS; + for (auto &resRow : res) { + if (to_discard_size <= 0) { + break; + } + table_file.id_ = resRow["id"]; + table_file.size_ = resRow["size"]; + idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR "; + ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ + << " table_file.size=" << table_file.size_; + to_discard_size -= table_file.size_; } - table_file.id_ = resRow["id"]; - table_file.size_ = resRow["size"]; - idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR "; - ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ - << " table_file.size=" << table_file.size_; - to_discard_size -= table_file.size_; - } - std::string idsToDiscardStr = idsToDiscardSS.str(); - idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " + std::string idsToDiscardStr = idsToDiscardSS.str(); + idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " - discardFilesQuery << "UPDATE metaFile " << - "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << - "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << - "WHERE " << idsToDiscardStr << ";"; + discardFilesQuery << "UPDATE TableFiles " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << ", " << + "updated_time = " << std::to_string(utils::GetMicroSecTimeStamp()) << " " << + "WHERE " << idsToDiscardStr << ";"; - if (discardFilesQuery.exec()) { - return DiscardFiles(to_discard_size); - } - else { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES"; - return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error()); - } + status = discardFilesQuery.exec(); + if (!status) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN DISCARDING FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error()); + } + } //Scoped Connection + + return DiscardFiles(to_discard_size); } catch (const BadQuery& er) { // Handle any query errors @@ -1164,54 +1212,57 @@ namespace meta { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query updateTableFileQuery = connectionPtr->query(); + Query updateTableFileQuery = connectionPtr->query(); - //if the table has been deleted, just mark the table file as TO_DELETE - //clean thread will delete the file later - updateTableFileQuery << "SELECT state FROM meta " << - "WHERE table_id = " << quote << file_schema.table_id_ << ";"; - StoreQueryResult res = updateTableFileQuery.store(); - assert(res && res.num_rows() <= 1); - if (res.num_rows() == 1) { - int state = res[0]["state"]; - if (state == TableSchema::TO_DELETE) { + //if the table has been deleted, just mark the table file as TO_DELETE + //clean thread will delete the file later + updateTableFileQuery << "SELECT state FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << ";"; + StoreQueryResult res = updateTableFileQuery.store(); + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + int state = res[0]["state"]; + if (state == TableSchema::TO_DELETE) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + } else { file_schema.file_type_ = TableFileSchema::TO_DELETE; } - } - else { - file_schema.file_type_ = TableFileSchema::TO_DELETE; - } - std::string id = std::to_string(file_schema.id_); - std::string table_id = file_schema.table_id_; - std::string engine_type = std::to_string(file_schema.engine_type_); - std::string file_id = file_schema.file_id_; - std::string file_type = std::to_string(file_schema.file_type_); - std::string size = std::to_string(file_schema.size_); - std::string updated_time = std::to_string(file_schema.updated_time_); - std::string created_on = std::to_string(file_schema.created_on_); - std::string date = std::to_string(file_schema.date_); + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); - updateTableFileQuery << "UPDATE metaFile " << - "SET table_id = " << quote << table_id << ", " << - "engine_type = " << engine_type << ", " << - "file_id = " << quote << file_id << ", " << - "file_type = " << file_type << ", " << - "size = " << size << ", " << - "updated_time = " << updated_time << ", " << - "created_on = " << created_on << ", " << - "date = " << date << " " << - "WHERE id = " << id << ";"; + updateTableFileQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; // std::cout << updateTableFileQuery.str() << std::endl; - if (!updateTableFileQuery.exec()) { - ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; - ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE"; - return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error()); - } + if (!updateTableFileQuery.exec()) { + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILE"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", + updateTableFileQuery.error()); + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors @@ -1234,63 +1285,65 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - - Query updateTableFilesQuery = connectionPtr->query(); - - std::map has_tables; - for (auto &file_schema : files) { + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - if(has_tables.find(file_schema.table_id_) != has_tables.end()) { - continue; - } + Query updateTableFilesQuery = connectionPtr->query(); - updateTableFilesQuery << "SELECT EXISTS " << - "(SELECT 1 FROM meta " << - "WHERE table_id = " << quote << file_schema.table_id_ << " " << - "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << - "AS " << quote << "check" << ";"; - StoreQueryResult res = updateTableFilesQuery.store(); + std::map has_tables; + for (auto &file_schema : files) { - assert(res && res.num_rows() == 1); - int check = res[0]["check"]; - has_tables[file_schema.table_id_] = (check == 1); - } + if (has_tables.find(file_schema.table_id_) != has_tables.end()) { + continue; + } - for (auto& file_schema : files) { + updateTableFilesQuery << "SELECT EXISTS " << + "(SELECT 1 FROM Tables " << + "WHERE table_id = " << quote << file_schema.table_id_ << " " << + "AND state <> " << std::to_string(TableSchema::TO_DELETE) << ") " << + "AS " << quote << "check" << ";"; + StoreQueryResult res = updateTableFilesQuery.store(); - if(!has_tables[file_schema.table_id_]) { - file_schema.file_type_ = TableFileSchema::TO_DELETE; + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_tables[file_schema.table_id_] = (check == 1); } - file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); - std::string id = std::to_string(file_schema.id_); - std::string table_id = file_schema.table_id_; - std::string engine_type = std::to_string(file_schema.engine_type_); - std::string file_id = file_schema.file_id_; - std::string file_type = std::to_string(file_schema.file_type_); - std::string size = std::to_string(file_schema.size_); - std::string updated_time = std::to_string(file_schema.updated_time_); - std::string created_on = std::to_string(file_schema.created_on_); - std::string date = std::to_string(file_schema.date_); - - updateTableFilesQuery << "UPDATE metaFile " << - "SET table_id = " << quote << table_id << ", " << - "engine_type = " << engine_type << ", " << - "file_id = " << quote << file_id << ", " << - "file_type = " << file_type << ", " << - "size = " << size << ", " << - "updated_time = " << updated_time << ", " << - "created_on = " << created_on << ", " << - "date = " << date << " " << - "WHERE id = " << id << ";"; + for (auto &file_schema : files) { - } - - if (!updateTableFilesQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES"; - return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error()); - } + if (!has_tables[file_schema.table_id_]) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFilesQuery << "UPDATE TableFiles " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + if (!updateTableFilesQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE FILES"; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", + updateTableFilesQuery.error()); + } + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors @@ -1314,54 +1367,57 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << - "FROM metaFile " << - "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << - "updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << + "FROM TableFiles " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << + "updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); - assert(res); + assert(res); - TableFileSchema table_file; - std::vector idsToDelete; + TableFileSchema table_file; + std::vector idsToDelete; - for (auto& resRow : res) { + for (auto &resRow : res) { - table_file.id_ = resRow["id"]; //implicit conversion + table_file.id_ = resRow["id"]; //implicit conversion - std::string table_id; - resRow["table_id"].to_string(table_id); - table_file.table_id_ = table_id; + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; - std::string file_id; - resRow["file_id"].to_string(file_id); - table_file.file_id_ = file_id; + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; - table_file.date_ = resRow["date"]; + table_file.date_ = resRow["date"]; - GetTableFilePath(table_file); + GetTableFilePath(table_file); - ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl; - boost::filesystem::remove(table_file.location_); + ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " + << table_file.location_ << std::endl; + boost::filesystem::remove(table_file.location_); - idsToDelete.emplace_back(std::to_string(table_file.id_)); - } + idsToDelete.emplace_back(std::to_string(table_file.id_)); + } - std::stringstream idsToDeleteSS; - for (auto& id : idsToDelete) { - idsToDeleteSS << "id = " << id << " OR "; - } - std::string idsToDeleteStr = idsToDeleteSS.str(); - idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM metaFile WHERE " << - idsToDeleteStr << ";"; - if (!cleanUpFilesWithTTLQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; - return Status::DBTransactionError("CleanUpFilesWithTTL Error", cleanUpFilesWithTTLQuery.error()); - } + std::stringstream idsToDeleteSS; + for (auto &id : idsToDelete) { + idsToDeleteSS << "id = " << id << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM TableFiles WHERE " << + idsToDeleteStr << ";"; + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("CleanUpFilesWithTTL Error", cleanUpFilesWithTTLQuery.error()); + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors @@ -1376,37 +1432,39 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id " << - "FROM meta " << - "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; - StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); - assert(res); + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id " << + "FROM Tables " << + "WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + assert(res); // std::cout << res.num_rows() << std::endl; - std::stringstream idsToDeleteSS; - for (auto& resRow : res) { - size_t id = resRow["id"]; - std::string table_id; - resRow["table_id"].to_string(table_id); + std::stringstream idsToDeleteSS; + for (auto &resRow : res) { + size_t id = resRow["id"]; + std::string table_id; + resRow["table_id"].to_string(table_id); - auto table_path = GetTablePath(table_id); + auto table_path = GetTablePath(table_id); - ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; - boost::filesystem::remove_all(table_path); - - idsToDeleteSS << "id = " << std::to_string(id) << " OR "; - } - std::string idsToDeleteStr = idsToDeleteSS.str(); - idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM meta WHERE " << - idsToDeleteStr << ";"; - if (!cleanUpFilesWithTTLQuery.exec()) { - ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; - return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error()); - } + ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; + boost::filesystem::remove_all(table_path); + idsToDeleteSS << "id = " << std::to_string(id) << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM Tables WHERE " << + idsToDeleteStr << ";"; + if (!cleanUpFilesWithTTLQuery.exec()) { + ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES WITH TTL"; + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", + cleanUpFilesWithTTLQuery.error()); + } + } //Scoped Connection } catch (const BadQuery& er) { // Handle any query errors @@ -1430,7 +1488,7 @@ namespace meta { ENGINE_LOG_DEBUG << "Remove table file type as NEW"; Query cleanUpQuery = connectionPtr->query(); - cleanUpQuery << "DELETE FROM metaFile WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; + cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; if (!cleanUpQuery.exec()) { ENGINE_LOG_ERROR << "QUERY ERROR WHEN CLEANING UP FILES"; @@ -1457,17 +1515,6 @@ namespace meta { try { MetricCollector metric; - ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); - - Query countQuery = connectionPtr->query(); - countQuery << "SELECT size " << - "FROM metaFile " << - "WHERE table_id = " << quote << table_id << " AND " << - "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << - "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << - "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; - StoreQueryResult res = countQuery.store(); - TableSchema table_schema; table_schema.table_id_ = table_id; auto status = DescribeTable(table_schema); @@ -1476,6 +1523,21 @@ namespace meta { return status; } + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + + Query countQuery = connectionPtr->query(); + countQuery << "SELECT size " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + res = countQuery.store(); + } //Scoped Connection + result = 0; for (auto &resRow : res) { size_t size = resRow["size"]; @@ -1510,7 +1572,7 @@ namespace meta { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); Query dropTableQuery = connectionPtr->query(); - dropTableQuery << "DROP TABLE IF EXISTS meta, metaFile;"; + dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; if (dropTableQuery.exec()) { return Status::OK(); } diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 8e63a4b5c9a9407929c3532f1cc3f73d4fc2a3f8..311760948daa31f617a197852084b66bbe25290f 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -7,7 +7,7 @@ GTEST_VERSION=1.8.1 JSONCONS_VERSION=0.126.0 LAPACK_VERSION=v3.8.0 LZ4_VERSION=v1.9.1 -MYSQLPP_VERSION=zilliz +MYSQLPP_VERSION=3.2.4 OPENBLAS_VERSION=v0.3.6 PROMETHEUS_VERSION=v0.7.0 ROCKSDB_VERSION=v6.0.2