未验证 提交 49ec7e1a 编写于 作者: J Jin Hai 提交者: GitHub

Merge pull request #334 from yhmo/0.6.0

#316 Some files not merged after vectors added
......@@ -7,6 +7,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#228 - memory usage increased slowly during searching vectors
- \#246 - Exclude src/external folder from code coverage for jenkin ci
- \#248 - Reside src/external in thirdparty
- \#316 - Some files not merged after vectors added
## Feature
- \#12 - Pure CPU version for Milvus
......
......@@ -619,6 +619,18 @@ DBImpl::StartCompactionTask() {
{
std::lock_guard<std::mutex> lck(compact_result_mutex_);
if (compact_thread_results_.empty()) {
// collect merge files for all tables(if compact_table_ids_ is empty) for two reasons:
// 1. other tables may still has un-merged files
// 2. server may be closed unexpected, these un-merge files need to be merged when server restart
if (compact_table_ids_.empty()) {
std::vector<meta::TableSchema> table_schema_array;
meta_ptr_->AllTables(table_schema_array);
for (auto& schema : table_schema_array) {
compact_table_ids_.insert(schema.table_id_);
}
}
// start merge file thread
compact_thread_results_.push_back(
compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
compact_table_ids_.clear();
......@@ -717,7 +729,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
......@@ -734,7 +746,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << " Background compaction thread start";
ENGINE_LOG_TRACE << "Background compaction thread start";
Status status;
for (auto& table_id : table_ids) {
......@@ -757,7 +769,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
ENGINE_LOG_TRACE << " Background compaction thread exit";
ENGINE_LOG_TRACE << "Background compaction thread exit";
}
void
......
......@@ -1392,6 +1392,7 @@ MySQLMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFiles
} // Scoped Connection
Status ret;
int64_t to_merge_files = 0;
for (auto& resRow : res) {
TableFileSchema table_file;
table_file.file_size_ = resRow["file_size"];
......@@ -1420,13 +1421,14 @@ MySQLMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFiles
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
to_merge_files++;
}
files[table_file.date_].push_back(table_file);
}
if (res.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-merge files";
if (to_merge_files > 0) {
ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
}
return ret;
} catch (std::exception& e) {
......@@ -1809,6 +1811,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
int64_t remove_tables = 0;
if (!res.empty()) {
std::stringstream idsToDeleteSS;
for (auto& resRow : res) {
......@@ -1817,7 +1820,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
resRow["table_id"].to_string(table_id);
utils::DeleteTablePath(options_, table_id, false); // only delete empty folder
remove_tables++;
idsToDeleteSS << "id = " << std::to_string(id) << " OR ";
}
std::string idsToDeleteStr = idsToDeleteSS.str();
......@@ -1832,8 +1835,8 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
}
if (res.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << res.size() << " tables from meta";
if (remove_tables > 0) {
ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables from meta";
}
} // Scoped Connection
} catch (std::exception& e) {
......
......@@ -971,6 +971,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFile
order_by(&TableFileSchema::file_size_).desc());
Status result;
int64_t to_merge_files = 0;
for (auto& file : selected) {
TableFileSchema table_file;
table_file.file_size_ = std::get<4>(file);
......@@ -999,11 +1000,13 @@ SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFile
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
}
files[table_file.date_].push_back(table_file);
to_merge_files++;
}
if (selected.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
if (to_merge_files > 0) {
ENGINE_LOG_TRACE << "Collect " << to_merge_files << " to-merge files";
}
return result;
} catch (std::exception& e) {
......@@ -1313,16 +1316,18 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
try {
server::MetricCollector metric;
int64_t remove_tables = 0;
for (auto& table_id : table_ids) {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_),
where(c(&TableFileSchema::table_id_) == table_id));
if (selected.size() == 0) {
utils::DeleteTablePath(options_, table_id);
remove_tables++;
}
}
if (table_ids.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
if (remove_tables) {
ENGINE_LOG_DEBUG << "Remove " << remove_tables << " tables folder";
}
} catch (std::exception& e) {
return HandleException("Encounter exception when delete table folder", e.what());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册