From 75a9b4fbd9f069319cdd99104b028b09a18d0778 Mon Sep 17 00:00:00 2001 From: zhiru Date: Thu, 20 Jun 2019 10:02:02 +0800 Subject: [PATCH] finished MySQLMetaImpl; need to test with DBImpl Former-commit-id: f3a9848f5f2d7a8b15945ae824827371316d0b79 --- cpp/src/db/DBMetaImpl.cpp | 2 +- cpp/src/db/MetaTypes.h | 2 +- cpp/src/db/MySQLMetaImpl.cpp | 1726 ++++++++++++++---------- cpp/src/db/MySQLMetaImpl.h | 3 +- cpp/unittest/CMakeLists.txt | 4 +- cpp/unittest/db/MySQLMetaImpl_test.cpp | 427 +++++- cpp/unittest/db/utils.h | 4 +- 7 files changed, 1433 insertions(+), 735 deletions(-) diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 26185b84..ba9fe8d8 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -164,7 +164,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id, Status DBMetaImpl::CreateTable(TableSchema &table_schema) { server::Metrics::GetInstance().MetaAccessTotalIncrement(); - if (table_schema.table_id_ == "") { + if (table_schema.table_id_.empty()) { NextTableId(table_schema.table_id_); } table_schema.files_cnt_ = 0; diff --git a/cpp/src/db/MetaTypes.h b/cpp/src/db/MetaTypes.h index a284d14a..c54a974b 100644 --- a/cpp/src/db/MetaTypes.h +++ b/cpp/src/db/MetaTypes.h @@ -24,7 +24,7 @@ struct TableSchema { size_t id_; std::string table_id_; size_t files_cnt_ = 0; - uint16_t dimension_; + uint16_t dimension_ = 0; std::string location_; long created_on_; int engine_type_ = (int)EngineType::FAISS_IDMAP; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 5dacb3f3..21093004 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -29,8 +29,7 @@ namespace meta { using namespace mysqlpp; -// static std::unique_ptr connectionPtr(new Connection()); - static Connection* connectionPtr = new Connection(); + static std::unique_ptr connectionPtr(new Connection()); namespace { @@ -79,23 +78,19 @@ namespace meta { MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_) : options_(options_) { - //Initialize(); + Initialize(); } Status MySQLMetaImpl::Initialize() { -// if (!boost::filesystem::is_directory(options_.path)) { -// auto ret = boost::filesystem::create_directory(options_.path); -// if (!ret) { -// ENGINE_LOG_ERROR << "Create directory " << options_.path << " Error"; -// } -// assert(ret); -// } - -// ConnectorPtr = std::make_unique(StoragePrototype(options_.path + "/meta.sqlite")); -// -// ConnectorPtr->sync_schema(); -// ConnectorPtr->open_forever(); // thread safe option -// ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log + + std::string path = options_.path; + if (!boost::filesystem::is_directory(path)) { + auto ret = boost::filesystem::create_directory(path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << path << " Error"; + } + assert(ret); + } std::string uri = options_.backend_uri; @@ -129,7 +124,7 @@ namespace meta { } const char* dbName = pieces_match[6].str().c_str(); //std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl; - connectionPtr->set_option(new MultiStatementsOption(true)); +// connectionPtr->set_option(new MultiStatementsOption(true)); try { if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { @@ -137,42 +132,54 @@ namespace meta { } CleanUp(); - Query InitializeQuery = connectionPtr->query(); - InitializeQuery << "DROP TABLE IF EXISTS meta, metaFile;"; - InitializeQuery << "CREATE TABLE meta (" << - "id BIGINT AUTO INCREMENT PRIMARY KEY, " << - "table_id VARCHAR(255) UNIQUE, " << - "dimension SMALLINT, " << - "created_on BIGINT, " << - "files_cnt BIGINT DEFAULT 0, " << - "engine_type INT DEFAULT 1, " << - "store_raw_data BOOL DEFAULT false);"; - InitializeQuery << "CREATE TABLE metaFile (" << - "id BIGINT AUTO INCREMENT PRIMARY KEY, " << - "table_id VARCHAR(255), " << - "engine_type INT DEFAULT 1, " << - "file_id VARCHAR(255), " << - "file_type INT DEFAULT 0, " << - "size BIGINT DEFAULT 0, " << - "updated_time BIGINT, " << - "created_on BIGINT, " << - "date INT DEFAULT -1);"; - - if (InitializeQuery.exec()) { - return Status::OK(); - } else { - return Status::DBTransactionError("Initialization Error: ", InitializeQuery.error()); +// InitializeQuery << "DROP TABLE IF EXISTS meta, metaFile;"; + InitializeQuery << "CREATE TABLE IF NOT EXISTS meta (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) UNIQUE NOT NULL, " << + "dimension SMALLINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "files_cnt BIGINT DEFAULT 0 NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "store_raw_data BOOL DEFAULT false NOT NULL);"; + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); } + + InitializeQuery << "CREATE TABLE IF NOT EXISTS metaFile (" << + "id BIGINT PRIMARY KEY AUTO_INCREMENT, " << + "table_id VARCHAR(255) NOT NULL, " << + "engine_type INT DEFAULT 1 NOT NULL, " << + "file_id VARCHAR(255) NOT NULL, " << + "file_type INT DEFAULT 0 NOT NULL, " << + "size BIGINT DEFAULT 0 NOT NULL, " << + "updated_time BIGINT NOT NULL, " << + "created_on BIGINT NOT NULL, " << + "date INT DEFAULT -1 NOT NULL);"; + if (!InitializeQuery.exec()) { + return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); + } + + return Status::OK(); + +// if (InitializeQuery.exec()) { +// std::cout << "XXXXXXXXXXXXXXXXXXXXXXXXX" << std::endl; +// while (InitializeQuery.more_results()) { +// InitializeQuery.store_next(); +// } +// return Status::OK(); +// } else { +// return Status::DBTransactionError("Initialization Error", InitializeQuery.error()); +// } } catch (const ConnectionFailed& er) { - return Status::DBTransactionError("Failed to connect to MySQL server: ", er.what()); + return Status::DBTransactionError("Failed to connect to database server", er.what()); } catch (const BadQuery& er) { // Handle any query errors - return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION: ", er.what()); + return Status::DBTransactionError("QUERY ERROR DURING INITIALIZATION", er.what()); } catch (const Exception& er) { // Catch-all for any other MySQL++ exceptions - return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION: ", er.what()); + return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what()); } } else { @@ -183,272 +190,404 @@ namespace meta { // PXU TODO: Temp solution. Will fix later Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id, const DatesT &dates) { -// if (dates.size() == 0) { -// return Status::OK(); -// } -// -// TableSchema table_schema; -// table_schema.table_id_ = table_id; -// auto status = DescribeTable(table_schema); -// if (!status.ok()) { -// return status; -// } -// -// auto yesterday = GetDateWithDelta(-1); -// -// for (auto &date : dates) { -// if (date >= yesterday) { -// return Status::Error("Could not delete partitions with 2 days"); -// } -// } -// -// try { -// ConnectorPtr->update_all( -// set( -// c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE -// ), -// where( -// c(&TableFileSchema::table_id_) == table_id and -// in(&TableFileSchema::date_, dates) -// )); -// } catch (std::exception &e) { -// HandleException(e); -// } + if (dates.size() == 0) { + return Status::OK(); + } + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + auto yesterday = GetDateWithDelta(-1); + + for (auto &date : dates) { + if (date >= yesterday) { + return Status::Error("Could not delete partitions within 2 days"); + } + } + + try { + + Query dropPartitionsByDatesQuery = connectionPtr->query(); + + std::stringstream dateListSS; + for (auto &date : dates) { + dateListSS << std::to_string(date) << ", "; + } + std::string dateListStr = dateListSS.str(); + dateListStr = dateListStr.substr(0, dateListStr.size() - 2); //remove the last ", " + + dropPartitionsByDatesQuery << "UPDATE metaFile " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE table_id = " << quote << table_id << " AND " << + "date in (" << dateListStr << ");"; + + if (!dropPartitionsByDatesQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// if (table_schema.table_id_.empty()) { -// NextTableId(table_schema.table_id_); -// } -// table_schema.files_cnt_ = 0; -// table_schema.id_ = -1; -// table_schema.created_on_ = utils::GetMicroSecTimeStamp(); -// auto start_time = METRICS_NOW_TIME; -// { -// try { -// Query addTableQuery = connectionPtr->query(); -// -// } catch (...) { -// return Status::DBTransactionError("Add Table Error"); -// } -// } -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// -// auto table_path = GetTablePath(table_schema.table_id_); -// table_schema.location_ = table_path; -// if (!boost::filesystem::is_directory(table_path)) { -// auto ret = boost::filesystem::create_directories(table_path); -// if (!ret) { -// ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; -// } -// assert(ret); -// } + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + if (table_schema.table_id_.empty()) { + NextTableId(table_schema.table_id_); + } + table_schema.files_cnt_ = 0; + table_schema.id_ = -1; + table_schema.created_on_ = utils::GetMicroSecTimeStamp(); + auto start_time = METRICS_NOW_TIME; + { + try { + Query createTableQuery = connectionPtr->query(); + std::string id = "NULL"; //auto-increment + std::string table_id = table_schema.table_id_; + std::string dimension = std::to_string(table_schema.dimension_); + std::string created_on = std::to_string(table_schema.created_on_); + std::string files_cnt = "0"; + std::string engine_type = std::to_string(table_schema.engine_type_); + std::string store_raw_data = table_schema.store_raw_data_ ? "true" : "false"; + createTableQuery << "INSERT INTO meta VALUES" << + "(" << id << ", " << quote << table_id << ", " << dimension << ", " << + created_on << ", " << files_cnt << ", " << engine_type << ", " << store_raw_data + << ");"; + if (SimpleResult res = createTableQuery.execute()) { + table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? +// std::cout << table_schema.id_ << std::endl; + } + else { + return Status::DBTransactionError("Add Table Error", createTableQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what()); + } + } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + if (!boost::filesystem::is_directory(table_path)) { + auto ret = boost::filesystem::create_directories(table_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << table_path << " Error"; + } + assert(ret); + } return Status::OK(); } Status MySQLMetaImpl::DeleteTable(const std::string& table_id) { -// try { -// //drop the table from meta -// auto tables = ConnectorPtr->select(columns(&TableSchema::id_), -// where(c(&TableSchema::table_id_) == table_id)); -// for (auto &table : tables) { -// ConnectorPtr->remove(std::get<0>(table)); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } - - return Status::OK(); + try { + //drop the table from meta + Query deleteTableQuery = connectionPtr->query(); + deleteTableQuery << "DELETE FROM meta WHERE table_id = " << quote << table_id << ";"; + if (deleteTableQuery.exec()) { + return Status::OK(); + } + else { + return Status::DBTransactionError("Delete Table Error", deleteTableQuery.error()); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DELETING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DELETING TABLE", er.what()); + } } Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) { -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto groups = ConnectorPtr->select(columns(&TableSchema::id_, -// &TableSchema::table_id_, -// &TableSchema::files_cnt_, -// &TableSchema::dimension_, -// &TableSchema::engine_type_, -// &TableSchema::store_raw_data_), -// where(c(&TableSchema::table_id_) == table_schema.table_id_)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// assert(groups.size() <= 1); -// if (groups.size() == 1) { -// table_schema.id_ = std::get<0>(groups[0]); -// table_schema.files_cnt_ = std::get<2>(groups[0]); -// table_schema.dimension_ = std::get<3>(groups[0]); -// table_schema.engine_type_ = std::get<4>(groups[0]); -// table_schema.store_raw_data_ = std::get<5>(groups[0]); -// } else { -// return Status::NotFound("Table " + table_schema.table_id_ + " not found"); -// } -// -// auto table_path = GetTablePath(table_schema.table_id_); -// table_schema.location_ = table_path; -// -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query describeTableQuery = connectionPtr->query(); + describeTableQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM meta " << + "WHERE table_id = " << quote << table_schema.table_id_ << ";"; + StoreQueryResult res = describeTableQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + const Row& resRow = res[0]; + +// std::string id; +// resRow["id"].to_string(id); +// table_schema.id_ = std::stoul(id); + table_schema.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_schema.table_id_ = table_id; + +// std::string created_on; +// resRow["created_on"].to_string(created_on); +// table_schema.created_on_ = std::stol(created_on); + table_schema.dimension_ = resRow["dimension"]; + +// std::string files_cnt; +// resRow["files_cnt"].to_string(files_cnt); +// table_schema.files_cnt_ = std::stoul(files_cnt); + table_schema.files_cnt_ = resRow["files_cnt"]; + +// std::string engine_type; +// resRow["engine_type"].to_string(engine_type); +// table_schema.engine_type_ = std::stoi(engine_type); + table_schema.engine_type_ = resRow["engine_type"]; + + table_schema.store_raw_data_ = (resRow["store_raw_data"].compare("true") == 0); + } + else { + return Status::NotFound("Table " + table_schema.table_id_ + " not found"); + } + + auto table_path = GetTablePath(table_schema.table_id_); + table_schema.location_ = table_path; + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING TABLE", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// -// auto tables = ConnectorPtr->select(columns(&TableSchema::id_), -// where(c(&TableSchema::table_id_) == table_id)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// assert(tables.size() <= 1); -// if (tables.size() == 1) { -// has_or_not = true; -// } else { -// has_or_not = false; -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query hasTableQuery = connectionPtr->query(); + //since table_id is a unique column we just need to check whether it exists or not + hasTableQuery << "SELECT EXISTS (SELECT 1 FROM meta WHERE table_id = " << quote << table_id << ") " + << "AS " << quote << "check" << ";"; + StoreQueryResult res = hasTableQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + assert(res && res.num_rows() == 1); + int check = res[0]["check"]; + has_or_not = (check == 1); + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", er.what()); + } + return Status::OK(); } Status MySQLMetaImpl::AllTables(std::vector& table_schema_array) { -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto selected = ConnectorPtr->select(columns(&TableSchema::id_, -// &TableSchema::table_id_, -// &TableSchema::files_cnt_, -// &TableSchema::dimension_, -// &TableSchema::engine_type_, -// &TableSchema::store_raw_data_)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// for (auto &table : selected) { -// TableSchema schema; -// schema.id_ = std::get<0>(table); -// schema.table_id_ = std::get<1>(table); -// schema.files_cnt_ = std::get<2>(table); -// schema.dimension_ = std::get<3>(table); -// schema.engine_type_ = std::get<4>(table); -// schema.store_raw_data_ = std::get<5>(table); -// -// table_schema_array.emplace_back(schema); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query allTablesQuery = connectionPtr->query(); + allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << + "FROM meta;"; + StoreQueryResult res = allTablesQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + for (auto& resRow : res) { + TableSchema table_schema; + + table_schema.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_schema.table_id_ = table_id; + + table_schema.dimension_ = resRow["dimension"]; + + table_schema.files_cnt_ = resRow["files_cnt"]; + + table_schema.engine_type_ = resRow["engine_type"]; + + table_schema.store_raw_data_ = (resRow["store_raw_data"].compare("true") == 0); + + table_schema_array.emplace_back(table_schema); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBING ALL TABLES", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { -// if (file_schema.date_ == EmptyDate) { -// file_schema.date_ = Meta::GetDate(); -// } -// TableSchema table_schema; -// table_schema.table_id_ = file_schema.table_id_; -// auto status = DescribeTable(table_schema); -// if (!status.ok()) { -// return status; -// } -// -// NextFileId(file_schema.file_id_); -// file_schema.file_type_ = TableFileSchema::NEW; -// file_schema.dimension_ = table_schema.dimension_; -// file_schema.size_ = 0; -// file_schema.created_on_ = utils::GetMicroSecTimeStamp(); -// file_schema.updated_time_ = file_schema.created_on_; -// file_schema.engine_type_ = table_schema.engine_type_; -// GetTableFilePath(file_schema); -// -// { -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto id = ConnectorPtr->insert(file_schema); -// file_schema.id_ = id; -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// } catch (...) { -// return Status::DBTransactionError("Add file Error"); -// } -// } -// -// auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); -// -// if (!boost::filesystem::is_directory(partition_path)) { -// auto ret = boost::filesystem::create_directory(partition_path); -// if (!ret) { -// ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; -// } -// assert(ret); -// } + if (file_schema.date_ == EmptyDate) { + file_schema.date_ = Meta::GetDate(); + } + TableSchema table_schema; + table_schema.table_id_ = file_schema.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + NextFileId(file_schema.file_id_); + file_schema.file_type_ = TableFileSchema::NEW; + file_schema.dimension_ = table_schema.dimension_; + file_schema.size_ = 0; + file_schema.created_on_ = utils::GetMicroSecTimeStamp(); + file_schema.updated_time_ = file_schema.created_on_; + file_schema.engine_type_ = table_schema.engine_type_; + GetTableFilePath(file_schema); + + { + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query createTableFileQuery = connectionPtr->query(); + std::string id = "NULL"; //auto-increment + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + createTableFileQuery << "INSERT INTO metaFile VALUES" << + "(" << id << ", " << quote << table_id << ", " << engine_type << ", " << + quote << file_id << ", " << file_type << ", " << size << ", " << + updated_time << ", " << created_on << ", " << date << ");"; + + if (SimpleResult res = createTableFileQuery.execute()) { + file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()? + } + else { + return Status::DBTransactionError("Add file Error", createTableFileQuery.error()); + } + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN ADDING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what()); + } + } + + auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_); + + if (!boost::filesystem::is_directory(partition_path)) { + auto ret = boost::filesystem::create_directory(partition_path); + if (!ret) { + ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error"; + } + assert(ret); + } return Status::OK(); } Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { -// files.clear(); -// -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_, -// &TableFileSchema::engine_type_), -// where(c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::TO_INDEX)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// -// std::map groups; -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// table_file.engine_type_ = std::get<6>(file); -// -// GetTableFilePath(table_file); -// auto groupItr = groups.find(table_file.table_id_); -// if (groupItr == groups.end()) { -// TableSchema table_schema; -// table_schema.table_id_ = table_file.table_id_; -// auto status = DescribeTable(table_schema); -// if (!status.ok()) { -// return status; -// } -// groups[table_file.table_id_] = table_schema; -// } -// table_file.dimension_ = groups[table_file.table_id_].dimension_; -// files.push_back(table_file); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + files.clear(); + + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query filesToIndexQuery = connectionPtr->query(); + filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ";"; + StoreQueryResult res = filesToIndexQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + std::map groups; + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + auto groupItr = groups.find(table_file.table_id_); + if (groupItr == groups.end()) { + TableSchema table_schema; + table_schema.table_id_ = table_file.table_id_; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + groups[table_file.table_id_] = table_schema; +// std::cout << table_schema.dimension_ << std::endl; + } + table_file.dimension_ = groups[table_file.table_id_].dimension_; + + GetTableFilePath(table_file); + + files.push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", er.what()); + } return Status::OK(); } @@ -456,440 +595,521 @@ namespace meta { Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) { -// files.clear(); -// -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// if (partition.empty()) { -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_, -// &TableFileSchema::engine_type_), -// where(c(&TableFileSchema::table_id_) == table_id and -// (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or -// c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::TO_INDEX or -// c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::INDEX))); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// TableSchema table_schema; -// table_schema.table_id_ = table_id; -// auto status = DescribeTable(table_schema); -// if (!status.ok()) { -// return status; -// } -// -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// table_file.engine_type_ = std::get<6>(file); -// table_file.dimension_ = table_schema.dimension_; -// GetTableFilePath(table_file); -// auto dateItr = files.find(table_file.date_); -// if (dateItr == files.end()) { -// files[table_file.date_] = TableFilesSchema(); -// } -// files[table_file.date_].push_back(table_file); -// } -// } -// else { -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where(c(&TableFileSchema::table_id_) == table_id and -// in(&TableFileSchema::date_, partition) and -// (c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or -// c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::TO_INDEX or -// c(&TableFileSchema::file_type_) -// == (int) TableFileSchema::INDEX))); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// TableSchema table_schema; -// table_schema.table_id_ = table_id; -// auto status = DescribeTable(table_schema); -// if (!status.ok()) { -// return status; -// } -// -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// table_file.dimension_ = table_schema.dimension_; -// GetTableFilePath(table_file); -// auto dateItr = files.find(table_file.date_); -// if (dateItr == files.end()) { -// files[table_file.date_] = TableFilesSchema(); -// } -// files[table_file.date_].push_back(table_file); -// } -// -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + files.clear(); + + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + StoreQueryResult res; + + if (partition.empty()) { + + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + res = filesToSearchQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + else { + + Query filesToSearchQuery = connectionPtr->query(); + + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE table_id = " << quote << table_id << " AND " << + "date IN (" << partitionListStr << ") AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + res = filesToSearchQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { -// files.clear(); -// -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW and -// c(&TableFileSchema::table_id_) == table_id)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// TableSchema table_schema; -// table_schema.table_id_ = table_id; -// auto status = DescribeTable(table_schema); -// -// if (!status.ok()) { -// return status; -// } -// -// TableFileSchema table_file; -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// table_file.dimension_ = table_schema.dimension_; -// GetTableFilePath(table_file); -// auto dateItr = files.find(table_file.date_); -// if (dateItr == files.end()) { -// files[table_file.date_] = TableFilesSchema(); -// } -// files[table_file.date_].push_back(table_file); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + files.clear(); + + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query filesToMergeQuery = connectionPtr->query(); + filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE table_id = " << quote << table_id << " AND " << + "file_type = " << std::to_string(TableFileSchema::RAW) << ";"; + StoreQueryResult res = filesToMergeQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + GetTableFilePath(table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", er.what()); + } return Status::OK(); } + //ZR: TODO: this function is pending to be removed, so not gonna implemented for now Status MySQLMetaImpl::FilesToDelete(const std::string& table_id, const DatesT& partition, DatePartionedTableFilesSchema& files) { -// auto now = utils::GetMicroSecTimeStamp(); -// try { -// if(partition.empty()) { -// //step 1: get table files by dates -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where(c(&TableFileSchema::file_type_) != -// (int) TableFileSchema::TO_DELETE -// and c(&TableFileSchema::table_id_) == table_id)); -// -// //step 2: erase table files from meta -// for (auto &file : selected) { -// TableFileSchema table_file; -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.size_ = std::get<3>(file); -// table_file.date_ = std::get<4>(file); -// GetTableFilePath(table_file); -// auto dateItr = files.find(table_file.date_); -// if (dateItr == files.end()) { -// files[table_file.date_] = TableFilesSchema(); -// } -// files[table_file.date_].push_back(table_file); -// -// ConnectorPtr->remove(std::get<0>(file)); -// } -// -// } else { -// //step 1: get all table files -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where(c(&TableFileSchema::file_type_) != -// (int) TableFileSchema::TO_DELETE -// and in(&TableFileSchema::date_, partition) -// and c(&TableFileSchema::table_id_) == table_id)); -// -// //step 2: erase table files from meta -// for (auto &file : selected) { -// TableFileSchema table_file; -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.size_ = std::get<3>(file); -// table_file.date_ = std::get<4>(file); -// GetTableFilePath(table_file); -// auto dateItr = files.find(table_file.date_); -// if (dateItr == files.end()) { -// files[table_file.date_] = TableFilesSchema(); -// } -// files[table_file.date_].push_back(table_file); -// -// ConnectorPtr->remove(std::get<0>(file)); -// } -// } -// -// } catch (std::exception &e) { -// HandleException(e); -// } return Status::OK(); } Status MySQLMetaImpl::GetTableFile(TableFileSchema &file_schema) { -// try { -// auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where(c(&TableFileSchema::file_id_) == file_schema.file_id_ and -// c(&TableFileSchema::table_id_) == file_schema.table_id_ -// )); -// assert(files.size() <= 1); -// if (files.size() == 1) { -// file_schema.id_ = std::get<0>(files[0]); -// file_schema.table_id_ = std::get<1>(files[0]); -// file_schema.file_id_ = std::get<2>(files[0]); -// file_schema.file_type_ = std::get<3>(files[0]); -// file_schema.size_ = std::get<4>(files[0]); -// file_schema.date_ = std::get<5>(files[0]); -// } else { -// return Status::NotFound("Table:" + file_schema.table_id_ + -// " File:" + file_schema.file_id_ + " not found"); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + + Query getTableFileQuery = connectionPtr->query(); + getTableFileQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE file_id = " << quote << file_schema.file_id_ << " AND " << + "table_id = " << quote << file_schema.table_id_ << ";"; + StoreQueryResult res = getTableFileQuery.store(); + + assert(res && res.num_rows() <= 1); + if (res.num_rows() == 1) { + + const Row& resRow = res[0]; + + file_schema.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + file_schema.table_id_ = table_id; + + std::string file_id; + resRow["file_id"].to_string(file_id); + file_schema.file_id_ = file_id; + + file_schema.file_type_ = resRow["file_type"]; + + file_schema.size_ = resRow["size"]; + + file_schema.date_ = resRow["date"]; + } + else { + return Status::NotFound("Table:" + file_schema.table_id_ + + " File:" + file_schema.file_id_ + " not found"); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING TABLE FILE", er.what()); + } return Status::OK(); } // PXU TODO: Support Swap Status MySQLMetaImpl::Archive() { -// auto &criterias = options_.archive_conf.GetCriterias(); -// if (criterias.size() == 0) { -// return Status::OK(); -// } -// -// for (auto kv : criterias) { -// auto &criteria = kv.first; -// auto &limit = kv.second; -// if (criteria == "days") { -// long usecs = limit * D_SEC * US_PS; -// long now = utils::GetMicroSecTimeStamp(); -// try { -// ConnectorPtr->update_all( -// set( -// c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE -// ), -// where( -// c(&TableFileSchema::created_on_) < (long) (now - usecs) and -// c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE -// )); -// } catch (std::exception &e) { -// HandleException(e); -// } -// } -// if (criteria == "disk") { -// uint64_t sum = 0; -// Size(sum); -// -// auto to_delete = (sum - limit * G); -// DiscardFiles(to_delete); -// } -// } + auto &criterias = options_.archive_conf.GetCriterias(); + if (criterias.empty()) { + return Status::OK(); + } + + for (auto& kv : criterias) { + auto &criteria = kv.first; + auto &limit = kv.second; + if (criteria == "days") { + size_t usecs = limit * D_SEC * US_PS; + long now = utils::GetMicroSecTimeStamp(); + try { + + Query archiveQuery = connectionPtr->query(); + archiveQuery << "UPDATE metaFile " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE created_on < " << std::to_string(now - usecs) << " AND " << + "file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + if (!archiveQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR DURING ARCHIVE", archiveQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DURING ARCHIVE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DURING ARCHIVE", er.what()); + } + } + if (criteria == "disk") { + uint64_t sum = 0; + Size(sum); + + auto to_delete = (sum - limit * G); + DiscardFiles(to_delete); + } + } return Status::OK(); } Status MySQLMetaImpl::Size(uint64_t &result) { -// result = 0; -// try { -// auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)), -// where( -// c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE -// )); -// -// for (auto &sub_query : selected) { -// if (!std::get<0>(sub_query)) { -// continue; -// } -// result += (uint64_t) (*std::get<0>(sub_query)); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + result = 0; + try { + + Query getSizeQuery = connectionPtr->query(); + getSizeQuery << "SELECT SUM(size) AS sum " << + "FROM metaFile " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";"; + StoreQueryResult res = getSizeQuery.store(); + + assert(res && res.num_rows() == 1); + result = res[0]["sum"]; + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING SIZE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING SIZE", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::DiscardFiles(long to_discard_size) { -// LOG(DEBUG) << "About to discard size=" << to_discard_size; -// if (to_discard_size <= 0) { -// return Status::OK(); -// } -// try { -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::size_), -// where(c(&TableFileSchema::file_type_) -// != (int) TableFileSchema::TO_DELETE), -// order_by(&TableFileSchema::id_), -// limit(10)); -// -// std::vector ids; -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// if (to_discard_size <= 0) break; -// table_file.id_ = std::get<0>(file); -// table_file.size_ = std::get<1>(file); -// ids.push_back(table_file.id_); -// ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ -// << " table_file.size=" << table_file.size_; -// to_discard_size -= table_file.size_; -// } -// -// if (ids.size() == 0) { -// return Status::OK(); -// } -// -// ConnectorPtr->update_all( -// set( -// c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE -// ), -// where( -// in(&TableFileSchema::id_, ids) -// )); -// -// } catch (std::exception &e) { -// HandleException(e); -// } - - return DiscardFiles(to_discard_size); + LOG(DEBUG) << "About to discard size=" << to_discard_size; + if (to_discard_size <= 0) { +// std::cout << "in" << std::endl; + return Status::OK(); + } + try { + + Query discardFilesQuery = connectionPtr->query(); + discardFilesQuery << "SELECT id, size " << + "FROM metaFile " << + "WHERE file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "ORDER BY id ASC " << + "LIMIT 10;"; +// std::cout << discardFilesQuery.str() << std::endl; + StoreQueryResult res = discardFilesQuery.store(); + + assert(res); + if (res.num_rows() == 0) { + return Status::OK(); + } + + TableFileSchema table_file; + std::stringstream idsToDiscardSS; + for (auto& resRow : res) { + if (to_discard_size <= 0) { + break; + } + table_file.id_ = resRow["id"]; + table_file.size_ = resRow["size"]; + idsToDiscardSS << "id = " << std::to_string(table_file.id_) << " OR "; + ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ + << " table_file.size=" << table_file.size_; + to_discard_size -= table_file.size_; + } + + std::string idsToDiscardStr = idsToDiscardSS.str(); + idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); //remove the last " OR " + + discardFilesQuery << "UPDATE metaFile " << + "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << + "WHERE " << idsToDiscardStr << ";"; + + if (discardFilesQuery.exec()) { + return DiscardFiles(to_discard_size); + } + else { + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", discardFilesQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DISCARDING FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DISCARDING FILES", er.what()); + } } + //ZR: this function assumes all fields in file_schema have value Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { -// file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// ConnectorPtr->update(file_schema); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// } catch (std::exception &e) { -// ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; -// HandleException(e); -// } + file_schema.updated_time_ = utils::GetMicroSecTimeStamp(); + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query updateTableFileQuery = connectionPtr->query(); + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFileQuery << "UPDATE metaFile " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + +// std::cout << updateTableFileQuery.str() << std::endl; + + if (!updateTableFileQuery.exec()) { + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", updateTableFileQuery.error()); + } + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + } catch (const BadQuery& er) { + // Handle any query errors + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILE", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) { -// try { -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto commited = ConnectorPtr->transaction([&]() mutable { -// for (auto &file : files) { -// file.updated_time_ = utils::GetMicroSecTimeStamp(); -// ConnectorPtr->update(file); -// } -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// return true; -// }); -// if (!commited) { -// return Status::DBTransactionError("Update files Error"); -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query updateTableFilesQuery = connectionPtr->query(); + + for (auto& file_schema : files) { + + std::string id = std::to_string(file_schema.id_); + std::string table_id = file_schema.table_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string size = std::to_string(file_schema.size_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + updateTableFilesQuery << "UPDATE metaFile " << + "SET table_id = " << quote << table_id << ", " << + "engine_type = " << engine_type << ", " << + "file_id = " << quote << file_id << ", " << + "file_type = " << file_type << ", " << + "size = " << size << ", " << + "updated_time = " << updated_time << ", " << + "created_on = " << created_on << ", " << + "date = " << date << " " << + "WHERE id = " << id << ";"; + + } + + if (!updateTableFilesQuery.exec()) { + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { -// auto now = utils::GetMicroSecTimeStamp(); -// try { -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, -// &TableFileSchema::table_id_, -// &TableFileSchema::file_id_, -// &TableFileSchema::file_type_, -// &TableFileSchema::size_, -// &TableFileSchema::date_), -// where( -// c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE -// and -// c(&TableFileSchema::updated_time_) -// > now - seconds * US_PS)); -// -// TableFilesSchema updated; -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// GetTableFilePath(table_file); -// if (table_file.file_type_ == TableFileSchema::TO_DELETE) { -// boost::filesystem::remove(table_file.location_); -// } -// ConnectorPtr->remove(table_file.id_); -// /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */ -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + auto now = utils::GetMicroSecTimeStamp(); + try { + + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " AND " << + "updated_time > " << std::to_string(now - seconds * US_PS) << ";"; + StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + + assert(res); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + GetTableFilePath(table_file); + + if (table_file.file_type_ == TableFileSchema::TO_DELETE) { + boost::filesystem::remove(table_file.location_); + } + + idsToDelete.emplace_back(std::to_string(table_file.id_)); + } + + std::stringstream idsToDeleteSS; + for (auto& id : idsToDelete) { + idsToDeleteSS << "id = " << id << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpFilesWithTTLQuery << "DELETE FROM metaFile WHERE " << + idsToDeleteStr << ";"; + if (!cleanUpFilesWithTTLQuery.exec()) { + return Status::DBTransactionError("CleanUpFilesWithTTL Error", cleanUpFilesWithTTLQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::CleanUp() { -// try { + try { // auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, // &TableFileSchema::table_id_, // &TableFileSchema::file_id_, @@ -901,75 +1121,137 @@ namespace meta { // or // c(&TableFileSchema::file_type_) // == (int) TableFileSchema::NEW)); -// -// TableFilesSchema updated; -// TableFileSchema table_file; -// -// for (auto &file : selected) { -// table_file.id_ = std::get<0>(file); -// table_file.table_id_ = std::get<1>(file); -// table_file.file_id_ = std::get<2>(file); -// table_file.file_type_ = std::get<3>(file); -// table_file.size_ = std::get<4>(file); -// table_file.date_ = std::get<5>(file); -// GetTableFilePath(table_file); -// if (table_file.file_type_ == TableFileSchema::TO_DELETE) { -// boost::filesystem::remove(table_file.location_); -// } -// ConnectorPtr->remove(table_file.id_); -// /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */ -// } -// } catch (std::exception &e) { -// HandleException(e); -// } + + Query cleanUpQuery = connectionPtr->query(); + cleanUpQuery << "SELECT id, table_id, file_id, file_type, size, date " << + "FROM metaFile " << + "WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " OR " << + "file_type = " << std::to_string(TableFileSchema::NEW) << ";"; + StoreQueryResult res = cleanUpQuery.store(); + + assert(res); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto& resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id; + resRow["table_id"].to_string(table_id); + table_file.table_id_ = table_id; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + GetTableFilePath(table_file); + + if (table_file.file_type_ == TableFileSchema::TO_DELETE) { + boost::filesystem::remove(table_file.location_); + } + + idsToDelete.emplace_back(std::to_string(table_file.id_)); + } + + std::stringstream idsToDeleteSS; + for (auto& id : idsToDelete) { + idsToDeleteSS << "id = " << id << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); //remove the last " OR " + cleanUpQuery << "DELETE FROM metaFile WHERE " << + idsToDeleteStr << ";"; + if (!cleanUpQuery.exec()) { + return Status::DBTransactionError("Clean up Error", cleanUpQuery.error()); + } + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN CLEANING UP FILES", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN CLEANING UP FILES", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) { -// try { -// -// server::Metrics::GetInstance().MetaAccessTotalIncrement(); -// auto start_time = METRICS_NOW_TIME; -// auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_, -// &TableFileSchema::date_), -// where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or -// c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX -// or -// c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX) -// and -// c(&TableFileSchema::table_id_) == table_id)); -// auto end_time = METRICS_NOW_TIME; -// auto total_time = METRICS_MICROSECONDS(start_time, end_time); -// server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); -// TableSchema table_schema; -// table_schema.table_id_ = table_id; -// auto status = DescribeTable(table_schema); -// -// if (!status.ok()) { -// return status; -// } -// -// result = 0; -// for (auto &file : selected) { -// result += std::get<0>(file); -// } -// -// result /= table_schema.dimension_; -// result /= sizeof(float); -// -// } catch (std::exception &e) { -// HandleException(e); -// } + try { + + server::Metrics::GetInstance().MetaAccessTotalIncrement(); + auto start_time = METRICS_NOW_TIME; + + Query countQuery = connectionPtr->query(); + countQuery << "SELECT size " << + "FROM metaFile " << + "WHERE table_id = " << quote << table_id << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + StoreQueryResult res = countQuery.store(); + + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + + if (!status.ok()) { + return status; + } + + result = 0; + for (auto &resRow : res) { + size_t size = resRow["size"]; + result += size; + } + + assert(table_schema.dimension_ != 0); + result /= table_schema.dimension_; + result /= sizeof(float); + + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN RETRIEVING COUNT", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what()); + } return Status::OK(); } Status MySQLMetaImpl::DropAll() { -// if (boost::filesystem::is_directory(options_.path)) { -// boost::filesystem::remove_all(options_.path); -// } - return Status::OK(); + if (boost::filesystem::is_directory(options_.path)) { + boost::filesystem::remove_all(options_.path); + } + try { + Query dropTableQuery = connectionPtr->query(); + dropTableQuery << "DROP TABLE IF EXISTS meta, metaFile;"; + if (dropTableQuery.exec()) { + return Status::OK(); + } + else { + return Status::DBTransactionError("DROP TABLE ERROR", dropTableQuery.error()); + } + } catch (const BadQuery& er) { + // Handle any query errors + return Status::DBTransactionError("QUERY ERROR WHEN DROPPING TABLE", er.what()); + } catch (const Exception& er) { + // Catch-all for any other MySQL++ exceptions + return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what()); + } } MySQLMetaImpl::~MySQLMetaImpl() { diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h index 3e8bcdee..e099fbb2 100644 --- a/cpp/src/db/MySQLMetaImpl.h +++ b/cpp/src/db/MySQLMetaImpl.h @@ -19,8 +19,6 @@ namespace meta { public: MySQLMetaImpl(const DBMetaOptions& options_); - Status Initialize(); - virtual Status CreateTable(TableSchema& table_schema) override; virtual Status DeleteTable(const std::string& table_id) override; virtual Status DescribeTable(TableSchema& group_info_) override; @@ -71,6 +69,7 @@ namespace meta { std::string GetTablePath(const std::string& table_id); std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date); void GetTableFilePath(TableFileSchema& group_file); + Status Initialize(); const DBMetaOptions options_; }; // DBMetaImpl diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index e636ff97..25a99968 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -6,10 +6,10 @@ link_directories( "${CMAKE_BINARY_DIR}/lib" #"${VECWISE_THIRD_PARTY_BUILD}/lib" - "${GTEST_PREFIX}/lib/" +# "${GTEST_PREFIX}/lib/" ) -message(STATUS "GTEST LIB: ${GTEST_PREFIX}/lib") +#message(STATUS "GTEST LIB: ${GTEST_PREFIX}/lib") set(unittest_srcs ${CMAKE_CURRENT_SOURCE_DIR}/vecwise_test.cpp) diff --git a/cpp/unittest/db/MySQLMetaImpl_test.cpp b/cpp/unittest/db/MySQLMetaImpl_test.cpp index 23243a2a..2889a07a 100644 --- a/cpp/unittest/db/MySQLMetaImpl_test.cpp +++ b/cpp/unittest/db/MySQLMetaImpl_test.cpp @@ -15,14 +15,431 @@ #include "db/Utils.h" #include "db/MetaConsts.h" +#include "mysql++/mysql++.h" + +#include + using namespace zilliz::milvus::engine; -TEST_F(MySQLTest, InitializeTest) { +//TEST_F(MySQLTest, InitializeTest) { +// DBMetaOptions options; +// //dialect+driver://username:password@host:port/database +// options.backend_uri = "mysql://root:1234@:/test"; +// meta::MySQLMetaImpl impl(options); +// auto status = impl.Initialize(); +// std::cout << status.ToString() << std::endl; +// ASSERT_TRUE(status.ok()); +//} + +TEST_F(MySQLTest, core) { DBMetaOptions options; //dialect+driver://username:password@host:port/database options.backend_uri = "mysql://root:1234@:/test"; + options.path = "/tmp/vecwise_test"; + meta::MySQLMetaImpl impl(options); +// auto status = impl.Initialize(); +// ASSERT_TRUE(status.ok()); + + meta::TableSchema schema1; + schema1.table_id_ = "test1"; + schema1.dimension_ = 123; + + auto status = impl.CreateTable(schema1); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableSchema schema2; + schema2.table_id_ = "test2"; + schema2.dimension_ = 321; + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + status = impl.CreateTable(schema2); +// std::cout << status.ToString() << std::endl; +// ASSERT_THROW(impl.CreateTable(schema), mysqlpp::BadQuery); + ASSERT_FALSE(status.ok()); + + status = impl.DeleteTable(schema2.table_id_); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + size_t id1 = schema1.id_; + long created_on1 = schema1.created_on_; + status = impl.DescribeTable(schema1); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(schema1.id_, id1); + ASSERT_EQ(schema1.table_id_, "test1"); + ASSERT_EQ(schema1.created_on_, created_on1); + ASSERT_EQ(schema1.files_cnt_, 0); + ASSERT_EQ(schema1.engine_type_, 1); + ASSERT_EQ(schema1.store_raw_data_, false); + + bool check; + status = impl.HasTable("test1", check); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(check, true); + + std::vector table_schema_array; + status = impl.AllTables(table_schema_array); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_schema_array.size(), 1); + meta::TableSchema resultSchema = table_schema_array[0]; + ASSERT_EQ(resultSchema.id_, id1); + ASSERT_EQ(resultSchema.table_id_, "test1"); + ASSERT_EQ(resultSchema.dimension_, 123); + ASSERT_EQ(resultSchema.files_cnt_, 0); + ASSERT_EQ(resultSchema.engine_type_, 1); + ASSERT_EQ(resultSchema.store_raw_data_, false); + + meta::TableFileSchema tableFileSchema; + tableFileSchema.table_id_ = "test1"; + + status = impl.CreateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + tableFileSchema.file_type_ = meta::TableFileSchema::TO_INDEX; + status = impl.UpdateTableFile(tableFileSchema); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + meta::TableFilesSchema filesToIndex; + status = impl.FilesToIndex(filesToIndex); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToIndex.size(), 1); + meta::TableFileSchema fileToIndex = filesToIndex[0]; + ASSERT_EQ(fileToIndex.table_id_, "test1"); + ASSERT_EQ(fileToIndex.dimension_, 123); + +// meta::TableFilesSchema filesToIndex; +// status = impl.FilesToIndex(filesToIndex); +// ASSERT_TRUE(status.ok()); +// ASSERT_EQ(filesToIndex.size(), 0); + + meta::DatesT partition; + partition.push_back(tableFileSchema.date_); + meta::DatePartionedTableFilesSchema filesToSearch; + status = impl.FilesToSearch(tableFileSchema.table_id_, partition, filesToSearch); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToSearch.size(), 1); + ASSERT_EQ(filesToSearch[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToSearch = filesToSearch[tableFileSchema.date_][0]; + ASSERT_EQ(fileToSearch.table_id_, "test1"); + ASSERT_EQ(fileToSearch.dimension_, 123); + + tableFileSchema.file_type_ = meta::TableFileSchema::RAW; + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + meta::DatePartionedTableFilesSchema filesToMerge; + status = impl.FilesToMerge(tableFileSchema.table_id_, filesToMerge); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(filesToMerge.size(), 1); + ASSERT_EQ(filesToMerge[tableFileSchema.date_].size(), 1); + meta::TableFileSchema fileToMerge = filesToMerge[tableFileSchema.date_][0]; + ASSERT_EQ(fileToMerge.table_id_, "test1"); + ASSERT_EQ(fileToMerge.dimension_, 123); + + meta::TableFileSchema resultTableFileSchema; + resultTableFileSchema.table_id_ = tableFileSchema.table_id_; + resultTableFileSchema.file_id_ = tableFileSchema.file_id_; + status = impl.GetTableFile(resultTableFileSchema); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultTableFileSchema.id_, tableFileSchema.id_); + ASSERT_EQ(resultTableFileSchema.table_id_, tableFileSchema.table_id_); + ASSERT_EQ(resultTableFileSchema.file_id_, tableFileSchema.file_id_); + ASSERT_EQ(resultTableFileSchema.file_type_, tableFileSchema.file_type_); + ASSERT_EQ(resultTableFileSchema.size_, tableFileSchema.size_); + ASSERT_EQ(resultTableFileSchema.date_, tableFileSchema.date_); + + tableFileSchema.size_ = 234; + status = impl.CreateTable(schema2); + ASSERT_TRUE(status.ok()); + meta::TableFileSchema tableFileSchema2; + tableFileSchema2.table_id_ = "test2"; + tableFileSchema2.size_ = 345; + status = impl.CreateTableFile(tableFileSchema2); + ASSERT_TRUE(status.ok()); + meta::TableFilesSchema filesToUpdate; + filesToUpdate.emplace_back(tableFileSchema); + filesToUpdate.emplace_back(tableFileSchema2); + status = impl.UpdateTableFile(tableFileSchema); + ASSERT_TRUE(status.ok()); + + uint64_t resultSize; + status = impl.Size(resultSize); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + ASSERT_EQ(resultSize, tableFileSchema.size_ + tableFileSchema2.size_); + + uint64_t countResult; + status = impl.Count(tableFileSchema.table_id_, countResult); + ASSERT_TRUE(status.ok()); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); + +} + +TEST_F(MySQLTest, GROUP_TEST) { + + DBMetaOptions options; + options.backend_uri = "mysql://root:1234@:/test"; + options.path = "/tmp/vecwise_test"; + meta::MySQLMetaImpl impl(options); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); +// std::cout << status.ToString() << std::endl; + ASSERT_TRUE(status.ok()); + + auto gid = group.id_; + group.id_ = -1; + status = impl.DescribeTable(group); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(group.id_, gid); + ASSERT_EQ(group.table_id_, table_id); + + group.table_id_ = "not_found"; + status = impl.DescribeTable(group); + ASSERT_TRUE(!status.ok()); + + group.table_id_ = table_id; + status = impl.CreateTable(group); + ASSERT_TRUE(!status.ok()); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, table_file_TEST) { + + DBMetaOptions options; + options.backend_uri = "mysql://root:1234@:/test"; + options.path = "/tmp/vecwise_test"; + meta::MySQLMetaImpl impl(options); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + status = impl.CreateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); + + auto file_id = table_file.file_id_; + + auto new_file_type = meta::TableFileSchema::INDEX; + table_file.file_type_ = new_file_type; + + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.file_type_, new_file_type); + + meta::DatesT dates; + dates.push_back(meta::Meta::GetDate()); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_FALSE(status.ok()); + + dates.clear(); + for (auto i=2; i < 10; ++i) { + dates.push_back(meta::Meta::GetDateWithDelta(-1*i)); + } + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + + table_file.date_ = meta::Meta::GetDateWithDelta(-2); + status = impl.UpdateTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(table_file.date_, meta::Meta::GetDateWithDelta(-2)); + ASSERT_FALSE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); + + dates.clear(); + dates.push_back(table_file.date_); + status = impl.DropPartitionsByDates(table_file.table_id_, dates); + ASSERT_TRUE(status.ok()); + status = impl.GetTableFile(table_file); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(table_file.file_type_ == meta::TableFileSchema::TO_DELETE); + + status = impl.DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(MySQLTest, ARCHIVE_TEST_DAYS) { + srand(time(0)); + DBMetaOptions options; + options.path = "/tmp/vecwise_test"; + int days_num = rand() % 100; + std::stringstream ss; + ss << "days:" << days_num; + options.archive_conf = ArchiveConf("delete", ss.str()); + options.backend_uri = "mysql://root:1234@:/test"; + + meta::MySQLMetaImpl impl(options); + + auto table_id = "meta_test_group"; + + meta::TableSchema group; + group.table_id_ = table_id; + auto status = impl.CreateTable(group); + + meta::TableFilesSchema files; + meta::TableFileSchema table_file; + table_file.table_id_ = group.table_id_; + + auto cnt = 100; + long ts = utils::GetMicroSecTimeStamp(); + std::vector days; + for (auto i=0; i impl_; +// std::shared_ptr impl_; }; -- GitLab