提交 bd08f75d 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-430 Search no result if index created with FLAT

See merge request megasearch/milvus!448

Former-commit-id: d8fdbcf3a6ce938af6ab2f8ef4cad9dff8dfe3c2
...@@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-432 - Search vectors params nprobe need to check max number - MS-432 - Search vectors params nprobe need to check max number
- MS-431 - Search vectors params nprobe: 0/-1, expected result: raise exception - MS-431 - Search vectors params nprobe: 0/-1, expected result: raise exception
- MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9)) - MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9))
- MS-430 - Search no result if index created with FLAT
## Improvement ## Improvement
- MS-327 - Clean code for milvus - MS-327 - Clean code for milvus
......
...@@ -98,7 +98,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) { ...@@ -98,7 +98,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
meta::DatePartionedTableFilesSchema files; meta::DatePartionedTableFilesSchema files;
meta::DatesT dates; meta::DatesT dates;
auto status = meta_ptr_->FilesToSearch(table_id, dates, files); std::vector<size_t> ids;
auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
...@@ -173,7 +174,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6 ...@@ -173,7 +174,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6
//get all table files from table //get all table files from table
meta::DatePartionedTableFilesSchema files; meta::DatePartionedTableFilesSchema files;
auto status = meta_ptr_->FilesToSearch(table_id, dates, files); std::vector<size_t> ids;
auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
if (!status.ok()) { return status; } if (!status.ok()) { return status; }
meta::TableFilesSchema file_id_array; meta::TableFilesSchema file_id_array;
...@@ -334,14 +336,8 @@ void DBImpl::StartMetricTask() { ...@@ -334,14 +336,8 @@ void DBImpl::StartMetricTask() {
ENGINE_LOG_TRACE << "Metric task finished"; ENGINE_LOG_TRACE << "Metric task finished";
} }
void DBImpl::StartCompactionTask() { Status DBImpl::MemSerialize() {
static uint64_t compact_clock_tick = 0; std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
return;
}
//serialize memory data
std::set<std::string> temp_table_ids; std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids); mem_mgr_->Serialize(temp_table_ids);
for(auto& id : temp_table_ids) { for(auto& id : temp_table_ids) {
...@@ -352,6 +348,19 @@ void DBImpl::StartCompactionTask() { ...@@ -352,6 +348,19 @@ void DBImpl::StartCompactionTask() {
SERVER_LOG_DEBUG << "Insert cache serialized"; SERVER_LOG_DEBUG << "Insert cache serialized";
} }
return Status::OK();
}
void DBImpl::StartCompactionTask() {
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
return;
}
//serialize memory data
MemSerialize();
//compactiong has been finished? //compactiong has been finished?
if(!compact_thread_results_.empty()) { if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10); std::chrono::milliseconds span(10);
...@@ -535,39 +544,49 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) ...@@ -535,39 +544,49 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
return status; return status;
} }
if(utils::IsSameIndex(old_index, index)) { //step 2: update index info
ENGINE_LOG_DEBUG << "Same index setting, no need to create index again"; if(!utils::IsSameIndex(old_index, index)) {
return Status::OK(); DropIndex(table_id);
}
//step 2: drop old index files status = meta_ptr_->UpdateTableIndexParam(table_id, index);
DropIndex(table_id); if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to update table index info";
//step 3: update index info return status;
status = meta_ptr_->UpdateTableIndexParam(table_id, index); }
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to update table index info";
return status;
} }
}
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) { //step 3: wait and build index
ENGINE_LOG_DEBUG << "index type = IDMAP, no need to build index"; //for IDMAP type, only wait all NEW file converted to RAW file
return Status::OK(); //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
} std::vector<int> file_types;
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
file_types = {
(int) meta::TableFileSchema::NEW,
(int) meta::TableFileSchema::NEW_MERGE,
};
} else {
file_types = {
(int) meta::TableFileSchema::RAW,
(int) meta::TableFileSchema::NEW,
(int) meta::TableFileSchema::NEW_MERGE,
(int) meta::TableFileSchema::NEW_INDEX,
(int) meta::TableFileSchema::TO_INDEX,
};
} }
bool has = false; std::vector<std::string> file_ids;
auto status = meta_ptr_->HasNonIndexFiles(table_id, has); auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
int times = 1; int times = 1;
while (has) { while (!file_ids.empty()) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times; ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
/* StartBuildIndexTask(true); */
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100))); std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
status = meta_ptr_->HasNonIndexFiles(table_id, has); status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
times++; times++;
} }
return Status::OK(); return Status::OK();
} }
...@@ -576,6 +595,7 @@ Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { ...@@ -576,6 +595,7 @@ Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
} }
Status DBImpl::DropIndex(const std::string& table_id) { Status DBImpl::DropIndex(const std::string& table_id) {
ENGINE_LOG_DEBUG << "drop index for table: " << table_id;
return meta_ptr_->DropTableIndex(table_id); return meta_ptr_->DropTableIndex(table_id);
} }
......
...@@ -117,8 +117,9 @@ class DBImpl : public DB { ...@@ -117,8 +117,9 @@ class DBImpl : public DB {
void StartBuildIndexTask(bool force=false); void StartBuildIndexTask(bool force=false);
void BackgroundBuildIndex(); void BackgroundBuildIndex();
Status Status BuildIndex(const meta::TableFileSchema &);
BuildIndex(const meta::TableFileSchema &);
Status MemSerialize();
private: private:
const Options options_; const Options options_;
...@@ -129,6 +130,7 @@ class DBImpl : public DB { ...@@ -129,6 +130,7 @@ class DBImpl : public DB {
MetaPtr meta_ptr_; MetaPtr meta_ptr_;
MemManagerAbstractPtr mem_mgr_; MemManagerAbstractPtr mem_mgr_;
std::mutex mem_serialize_mutex_;
server::ThreadPool compact_thread_pool_; server::ThreadPool compact_thread_pool_;
std::list<std::future<void>> compact_thread_results_; std::list<std::future<void>> compact_thread_results_;
......
...@@ -70,10 +70,10 @@ class Meta { ...@@ -70,10 +70,10 @@ class Meta {
UpdateTableFiles(TableFilesSchema &files) = 0; UpdateTableFiles(TableFilesSchema &files) = 0;
virtual Status virtual Status
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids,
virtual Status const DatesT &partition,
FilesToSearch(const std::string &table_id, const std::vector<size_t> &ids, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0; DatePartionedTableFilesSchema &files) = 0;
virtual Status virtual Status
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0; FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
...@@ -88,7 +88,9 @@ class Meta { ...@@ -88,7 +88,9 @@ class Meta {
FilesToIndex(TableFilesSchema &) = 0; FilesToIndex(TableFilesSchema &) = 0;
virtual Status virtual Status
HasNonIndexFiles(const std::string &table_id, bool &has) = 0; FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string>& file_ids) = 0;
virtual Status virtual Status
DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0; DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0;
......
...@@ -326,10 +326,16 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) { ...@@ -326,10 +326,16 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
} }
} }
Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) { Status MySQLMetaImpl::FilesByType(const std::string &table_id,
has = false; const std::vector<int> &file_types,
std::vector<std::string> &file_ids) {
if(file_types.empty()) {
return Status::Error("file types array is empty");
}
try { try {
file_ids.clear();
StoreQueryResult res; StoreQueryResult res;
{ {
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab); ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
...@@ -338,34 +344,75 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) { ...@@ -338,34 +344,75 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
return Status::Error("Failed to connect to database server"); return Status::Error("Failed to connect to database server");
} }
std::string types;
for(auto type : file_types) {
if(!types.empty()) {
types += ",";
}
types += std::to_string(type);
}
Query hasNonIndexFilesQuery = connectionPtr->query(); Query hasNonIndexFilesQuery = connectionPtr->query();
//since table_id is a unique column we just need to check whether it exists or not //since table_id is a unique column we just need to check whether it exists or not
hasNonIndexFilesQuery << "SELECT EXISTS " << hasNonIndexFilesQuery << "SELECT file_id, file_type FROM TableFiles " <<
"(SELECT 1 FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " << "WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << "file_type in (" << types << ");";
"file_type = " << std::to_string(TableFileSchema::NEW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " <<
"AS " << quote << "check" << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasNonIndexFiles: " << hasNonIndexFilesQuery.str(); ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
res = hasNonIndexFilesQuery.store(); res = hasNonIndexFilesQuery.store();
} //Scoped Connection } //Scoped Connection
int check = res[0]["check"]; if (res.num_rows() > 0) {
has = (check == 1); int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
int to_index_count = 0, index_count = 0, backup_count = 0;
for (auto &resRow : res) {
std::string file_id;
resRow["file_id"].to_string(file_id);
file_ids.push_back(file_id);
int32_t file_type = resRow["file_type"];
switch (file_type) {
case (int) TableFileSchema::RAW:
raw_count++;
break;
case (int) TableFileSchema::NEW:
new_count++;
break;
case (int) TableFileSchema::NEW_MERGE:
new_merge_count++;
break;
case (int) TableFileSchema::NEW_INDEX:
new_index_count++;
break;
case (int) TableFileSchema::TO_INDEX:
to_index_count++;
break;
case (int) TableFileSchema::INDEX:
index_count++;
break;
case (int) TableFileSchema::BACKUP:
backup_count++;
break;
default:
break;
}
}
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
<< " new files:" << new_count << " new_merge files:" << new_merge_count
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
<< " index files:" << index_count << " backup files:" << backup_count;
}
} catch (const BadQuery &er) { } catch (const BadQuery &er) {
// Handle any query errors // Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what(); ENGINE_LOG_ERROR << "QUERY ERROR WHEN GET FILE BY TYPE" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what()); return Status::DBTransactionError("QUERY ERROR WHEN GET FILE BY TYPE", er.what());
} catch (const Exception &er) { } catch (const Exception &er) {
// Catch-all for any other MySQL++ exceptions // Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what(); ENGINE_LOG_ERROR << "GENERAL ERROR WHEN GET FILE BY TYPE" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what()); return Status::DBTransactionError("GENERAL ERROR WHEN GET FILE BY TYPE", er.what());
} }
return Status::OK(); return Status::OK();
...@@ -987,121 +1034,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { ...@@ -987,121 +1034,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
return Status::OK(); return Status::OK();
} }
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
try {
server::MetricCollector metric;
StoreQueryResult res;
{
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
if (connectionPtr == nullptr) {
return Status::Error("Failed to connect to database server");
}
if (partition.empty()) {
Query filesToSearchQuery = connectionPtr->query();
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
res = filesToSearchQuery.store();
} else {
Query filesToSearchQuery = connectionPtr->query();
std::stringstream partitionListSS;
for (auto &date : partition) {
partitionListSS << std::to_string(date) << ", ";
}
std::string partitionListStr = partitionListSS.str();
partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"date IN (" << partitionListStr << ") AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
res = filesToSearchQuery.store();
}
} //Scoped Connection
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &resRow : res) {
table_file.id_ = resRow["id"]; //implicit conversion
std::string table_id_str;
resRow["table_id"].to_string(table_id_str);
table_file.table_id_ = table_id_str;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.engine_type_ = resRow["engine_type"];
table_file.nlist_ = table_schema.nlist_;
table_file.metric_type_ = table_schema.metric_type_;
std::string file_id;
resRow["file_id"].to_string(file_id);
table_file.file_id_ = file_id;
table_file.file_type_ = resRow["file_type"];
table_file.file_size_ = resRow["file_size"];
table_file.row_count_ = resRow["row_count"];
table_file.date_ = resRow["date"];
table_file.dimension_ = table_schema.dimension_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
}
} catch (const BadQuery &er) {
// Handle any query errors
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
} catch (const Exception &er) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
const DatesT &partition, const DatesT &partition,
......
...@@ -46,7 +46,9 @@ class MySQLMetaImpl : public Meta { ...@@ -46,7 +46,9 @@ class MySQLMetaImpl : public Meta {
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
TableFilesSchema &table_files) override; TableFilesSchema &table_files) override;
Status HasNonIndexFiles(const std::string &table_id, bool &has) override; Status FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override; Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
...@@ -62,10 +64,6 @@ class MySQLMetaImpl : public Meta { ...@@ -62,10 +64,6 @@ class MySQLMetaImpl : public Meta {
Status UpdateTableFiles(TableFilesSchema &files) override; Status UpdateTableFiles(TableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id, Status FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
const DatesT &partition, const DatesT &partition,
......
...@@ -279,28 +279,26 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) { ...@@ -279,28 +279,26 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::OK(); return Status::OK();
} }
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) { Status SqliteMetaImpl::FilesByType(const std::string& table_id,
has = false; const std::vector<int>& file_types,
std::vector<std::string>& file_ids) {
if(file_types.empty()) {
return Status::Error("file types array is empty");
}
try { try {
std::vector<int> file_types = { file_ids.clear();
(int) TableFileSchema::RAW, auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
(int) TableFileSchema::NEW,
(int) TableFileSchema::NEW_MERGE,
(int) TableFileSchema::NEW_INDEX,
(int) TableFileSchema::TO_INDEX,
};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::file_type_), &TableFileSchema::file_type_),
where(in(&TableFileSchema::file_type_, file_types) where(in(&TableFileSchema::file_type_, file_types)
and c(&TableFileSchema::table_id_) == table_id and c(&TableFileSchema::table_id_) == table_id
)); ));
if (selected.size() >= 1) { if (selected.size() >= 1) {
has = true; int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
int to_index_count = 0, index_count = 0, backup_count = 0;
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
std::vector<std::string> file_ids;
for (auto &file : selected) { for (auto &file : selected) {
file_ids.push_back(std::get<0>(file));
switch (std::get<1>(file)) { switch (std::get<1>(file)) {
case (int) TableFileSchema::RAW: case (int) TableFileSchema::RAW:
raw_count++; raw_count++;
...@@ -317,14 +315,21 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) ...@@ -317,14 +315,21 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
case (int) TableFileSchema::TO_INDEX: case (int) TableFileSchema::TO_INDEX:
to_index_count++; to_index_count++;
break; break;
case (int) TableFileSchema::INDEX:
index_count++;
break;
case (int) TableFileSchema::BACKUP:
backup_count++;
break;
default: default:
break; break;
} }
} }
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
<< " new files:" << new_count << " new_merge files:" << new_merge_count << " new files:" << new_count << " new_merge files:" << new_merge_count
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count; << " new_index files:" << new_index_count << " to_index files:" << to_index_count
<< " index files:" << index_count << " backup files:" << backup_count;
} }
} catch (std::exception &e) { } catch (std::exception &e) {
...@@ -633,111 +638,6 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { ...@@ -633,111 +638,6 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
return Status::OK(); return Status::OK();
} }
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
try {
server::MetricCollector metric;
if (partition.empty()) {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::file_type_, file_type)));
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &file : selected) {
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.nlist_ = table_schema.nlist_;
table_file.metric_type_ = table_schema.metric_type_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
}
}
else {
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::file_size_,
&TableFileSchema::row_count_,
&TableFileSchema::date_,
&TableFileSchema::engine_type_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::date_, partition) and
in(&TableFileSchema::file_type_, file_type)));
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto &file : selected) {
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.file_type_ = std::get<3>(file);
table_file.file_size_ = std::get<4>(file);
table_file.row_count_ = std::get<5>(file);
table_file.date_ = std::get<6>(file);
table_file.engine_type_ = std::get<7>(file);
table_file.dimension_ = table_schema.dimension_;
table_file.index_file_size_ = table_schema.index_file_size_;
table_file.nlist_ = table_schema.nlist_;
table_file.metric_type_ = table_schema.metric_type_;
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
}
}
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate index files", e);
}
return Status::OK();
}
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
const DatesT &partition, const DatesT &partition,
......
...@@ -41,7 +41,9 @@ class SqliteMetaImpl : public Meta { ...@@ -41,7 +41,9 @@ class SqliteMetaImpl : public Meta {
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
TableFilesSchema &table_files) override; TableFilesSchema &table_files) override;
Status HasNonIndexFiles(const std::string &table_id, bool &has) override; Status FilesByType(const std::string &table_id,
const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override; Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
...@@ -57,10 +59,6 @@ class SqliteMetaImpl : public Meta { ...@@ -57,10 +59,6 @@ class SqliteMetaImpl : public Meta {
Status UpdateTableFiles(TableFilesSchema &files) override; Status UpdateTableFiles(TableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
Status FilesToSearch(const std::string &table_id, Status FilesToSearch(const std::string &table_id,
const std::vector<size_t> &ids, const std::vector<size_t> &ids,
const DatesT &partition, const DatesT &partition,
......
...@@ -201,6 +201,7 @@ DescribeTableTask::OnExecute() { ...@@ -201,6 +201,7 @@ DescribeTableTask::OnExecute() {
schema_->mutable_table_name()->set_table_name(table_info.table_id_); schema_->mutable_table_name()->set_table_name(table_info.table_id_);
schema_->set_dimension(table_info.dimension_); schema_->set_dimension(table_info.dimension_);
schema_->set_index_file_size(table_info.index_file_size_);
} catch (std::exception &ex) { } catch (std::exception &ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
......
...@@ -307,8 +307,6 @@ TEST_F(DBTest, PRELOADTABLE_TEST) { ...@@ -307,8 +307,6 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
ASSERT_EQ(target_ids.size(), nb); ASSERT_EQ(target_ids.size(), nb);
} }
sleep(2);
engine::TableIndex index; engine::TableIndex index;
index.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; index.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
db_->CreateIndex(TABLE_NAME, index); // wait until build index finish db_->CreateIndex(TABLE_NAME, index); // wait until build index finish
......
...@@ -260,17 +260,17 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ...@@ -260,17 +260,17 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
ASSERT_EQ(files.size(), to_index_files_cnt); ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {table_file.date_}; meta::DatesT dates = {table_file.date_};
status = impl_->FilesToSearch(table_id, dates, dated_files); std::vector<size_t> ids;
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt); to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl_->FilesToSearch(table_id, meta::DatesT(), dated_files); status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt); to_index_files_cnt+raw_files_cnt+index_files_cnt);
std::vector<size_t> ids;
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files); status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
......
...@@ -197,9 +197,12 @@ TEST_F(DISABLED_MySQLTest, ARCHIVE_TEST_DAYS) { ...@@ -197,9 +197,12 @@ TEST_F(DISABLED_MySQLTest, ARCHIVE_TEST_DAYS) {
i++; i++;
} }
bool has; std::vector<int> file_types = {
status = impl.HasNonIndexFiles(table_id, has); (int) meta::TableFileSchema::NEW,
ASSERT_TRUE(status.ok()); };
std::vector<std::string> file_ids;
status = impl.FilesByType(table_id, file_types, file_ids);
ASSERT_FALSE(file_ids.empty());
status = impl.UpdateTableFilesToIndex(table_id); status = impl.UpdateTableFilesToIndex(table_id);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
...@@ -332,17 +335,17 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILES_TEST) { ...@@ -332,17 +335,17 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILES_TEST) {
ASSERT_EQ(files.size(), to_index_files_cnt); ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {table_file.date_}; meta::DatesT dates = {table_file.date_};
status = impl.FilesToSearch(table_id, dates, dated_files); std::vector<size_t> ids;
status = impl.FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt); to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl.FilesToSearch(table_id, meta::DatesT(), dated_files); status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt); to_index_files_cnt+raw_files_cnt+index_files_cnt);
std::vector<size_t> ids;
status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files); status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), ASSERT_EQ(dated_files[table_file.date_].size(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册