diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 9a27f09b3dde86c91dc8f6ba39a1a0edbf53a892..4720e08aeb2d85cf409c27152a988c3d2b18c63d 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -198,18 +198,25 @@ Status DBImpl::Query(const std::string& table_id, const std::vector ids.push_back(std::stoul(id, &sz)); } - meta::TableFilesSchema files_array; - auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array); + meta::DatePartionedTableFilesSchema files_array; + auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array); if (!status.ok()) { return status; } - if(files_array.empty()) { + meta::TableFilesSchema file_id_array; + for (auto &day_files : files_array) { + for (auto &file : day_files.second) { + file_id_array.push_back(file); + } + } + + if(file_id_array.empty()) { return Status::Error("Invalid file id"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query - status = QueryAsync(table_id, files_array, k, nq, vectors, dates, results); + status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results); cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query return status; } diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 56f741c4dc62447dd5e5565d0f4e864c25b0fb74..2cea16e6bb37e9e3a4204c490d142a904bd76c3d 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -544,6 +544,79 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, return Status::OK(); } +Status DBMetaImpl::FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + files.clear(); + MetricCollector metric; + + try { + auto select_columns = columns(&TableFileSchema::id_, + &TableFileSchema::table_id_, + &TableFileSchema::file_id_, + &TableFileSchema::file_type_, + &TableFileSchema::size_, + &TableFileSchema::date_, + &TableFileSchema::engine_type_); + + auto match_tableid = c(&TableFileSchema::table_id_) == table_id; + auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW; + auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX; + auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX; + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { return status; } + + decltype(ConnectorPtr->select(select_columns)) result; + if (partition.empty() && ids.empty()) { + auto filter = where(match_tableid and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (partition.empty() && !ids.empty()) { + auto match_fileid = in(&TableFileSchema::id_, ids); + auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (!partition.empty() && ids.empty()) { + auto match_date = in(&TableFileSchema::date_, partition); + auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + else if (!partition.empty() && !ids.empty()) { + auto match_fileid = in(&TableFileSchema::id_, ids); + auto match_date = in(&TableFileSchema::date_, partition); + auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index)); + result = ConnectorPtr->select(select_columns, filter); + } + + TableFileSchema table_file; + for (auto &file : result) { + table_file.id_ = std::get<0>(file); + table_file.table_id_ = std::get<1>(file); + table_file.file_id_ = std::get<2>(file); + table_file.file_type_ = std::get<3>(file); + table_file.size_ = std::get<4>(file); + table_file.date_ = std::get<5>(file); + table_file.engine_type_ = std::get<6>(file); + table_file.dimension_ = table_schema.dimension_; + utils::GetTableFilePath(options_, table_file); + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + files[table_file.date_].push_back(table_file); + } + + } catch (std::exception &e) { + return HandleException("Encounter exception when iterate index files", e); + } + + return Status::OK(); +} + Status DBMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { files.clear(); diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index 6187ad7eae3dc30a48473cd5a518b1ca102f6c2d..a163d450a14a7ac19914415bb0eeae75e6ab6b88 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -62,6 +62,11 @@ class DBMetaImpl : public Meta { Status FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override; + Status FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) override; + Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override; diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index 5275605611271e2b453757cea6a0905b3ecdffb0..7e826f63354599a161444ee644a5fb66e6e9861f 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -65,6 +65,9 @@ class Meta { virtual Status FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; + virtual Status + FilesToSearch(const std::string &table_id, const std::vector &ids, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; + virtual Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 14879d81fece0ae579b7316641353d25e5f4237a..12bfc55c1b89239bc5619b083ccc9cce92e645a1 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -965,6 +965,117 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, return Status::OK(); } +Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) { + + + files.clear(); + + try { + + MetricCollector metric; + + StoreQueryResult res; + + { + ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); + + if (connectionPtr == nullptr) { + return Status::Error("Failed to connect to database server"); + } + + Query filesToSearchQuery = connectionPtr->query(); + filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << + "FROM TableFiles " << + "WHERE table_id = " << quote << table_id; + + if (!partition.empty()) { + std::stringstream partitionListSS; + for (auto &date : partition) { + partitionListSS << std::to_string(date) << ", "; + } + std::string partitionListStr = partitionListSS.str(); + + partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", " + filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")"; + } + + if (!ids.empty()) { + std::stringstream idSS; + for (auto &id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR " + + filesToSearchQuery << " AND " << "(" << idStr << ")"; + + } + // End + filesToSearchQuery << " AND " << + "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " << + "file_type = " << std::to_string(TableFileSchema::INDEX) << ");"; + + ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str(); + + res = filesToSearchQuery.store(); + } //Scoped Connection + + TableSchema table_schema; + table_schema.table_id_ = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + for (auto &resRow : res) { + + table_file.id_ = resRow["id"]; //implicit conversion + + std::string table_id_str; + resRow["table_id"].to_string(table_id_str); + table_file.table_id_ = table_id_str; + + table_file.engine_type_ = resRow["engine_type"]; + + std::string file_id; + resRow["file_id"].to_string(file_id); + table_file.file_id_ = file_id; + + table_file.file_type_ = resRow["file_type"]; + + table_file.size_ = resRow["size"]; + + table_file.date_ = resRow["date"]; + + table_file.dimension_ = table_schema.dimension_; + + utils::GetTableFilePath(options_, table_file); + + auto dateItr = files.find(table_file.date_); + if (dateItr == files.end()) { + files[table_file.date_] = TableFilesSchema(); + } + + files[table_file.date_].push_back(table_file); + } + } catch (const BadQuery &er) { + // Handle any query errors + ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } catch (const Exception &er) { + // Catch-all for any other MySQL++ exceptions + ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what(); + return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what()); + } + + return Status::OK(); +} + Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) { diff --git a/cpp/src/db/MySQLMetaImpl.h b/cpp/src/db/MySQLMetaImpl.h index 87bc1783c764ae64f80552c0c5b7621701807c04..7822b99f6488e7af70bc2ab58cab401150f2cfd7 100644 --- a/cpp/src/db/MySQLMetaImpl.h +++ b/cpp/src/db/MySQLMetaImpl.h @@ -53,6 +53,11 @@ class MySQLMetaImpl : public Meta { const DatesT &partition, DatePartionedTableFilesSchema &files) override; + Status FilesToSearch(const std::string &table_id, + const std::vector &ids, + const DatesT &partition, + DatePartionedTableFilesSchema &files) override; + Status FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override; diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index 70f7da43c474b10f29912561399b21a39fc23f68..81f2279d1cf5f4230f9737cfdd7bf942d2b80e80 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -215,14 +215,13 @@ TEST_F(DBTest, SEARCH_TEST) { ASSERT_STATS(stat); } - // TODO: FIX HERE - //{//search by specify index file - // engine::meta::DatesT dates; - // std::vector file_ids = {"1", "2", "3", "4"}; - // engine::QueryResults results; - // stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results); - // ASSERT_STATS(stat); - //} + {//search by specify index file + engine::meta::DatesT dates; + std::vector file_ids = {"4", "5", "6"}; + engine::QueryResults results; + stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results); + ASSERT_STATS(stat); + } // TODO(linxj): add groundTruth assert }; diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 5bce4058b133604665d600356131fcb98132e7a9..41b145ee854fcf5a0d2a09439d741bb27429d999 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -269,4 +269,15 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(dated_files[table_file.date_].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); + + std::vector ids; + status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(), + to_index_files_cnt+raw_files_cnt+index_files_cnt); + + ids.push_back(size_t(9999999999)); + status = impl_->FilesToSearch(table_id, ids, dates, dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(),0); } diff --git a/cpp/unittest/db/mysql_meta_test.cpp b/cpp/unittest/db/mysql_meta_test.cpp index 76d7846362f8bf447fd09ccdce0b8ffbb89cc5ed..aead509a2c710371f70d02496daf5d40a8fbd9e5 100644 --- a/cpp/unittest/db/mysql_meta_test.cpp +++ b/cpp/unittest/db/mysql_meta_test.cpp @@ -328,6 +328,17 @@ TEST_F(MySQLTest, TABLE_FILES_TEST) { ASSERT_EQ(dated_files[table_file.date_].size(), to_index_files_cnt+raw_files_cnt+index_files_cnt); + std::vector ids; + status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(), + to_index_files_cnt+raw_files_cnt+index_files_cnt); + + ids.push_back(size_t(9999999999)); + status = impl.FilesToSearch(table_id, ids, dates, dated_files); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(dated_files[table_file.date_].size(),0); + status = impl.DropAll(); ASSERT_TRUE(status.ok()); }