提交 4b10cb3e 编写于 作者: P peng.xu

Merge branch 'branch-0.5.0' into 'branch-0.5.0'

MS-587 Count get wrong result after adding vectors and index built immediately

See merge request megasearch/milvus!619

Former-commit-id: b086ba967fc165e4f6d7ee2c5e57d933ccb8a2ee
......@@ -8,6 +8,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-568 - Fix gpuresource free error
- MS-572 - Milvus crash when get SIGINT
- MS-577 - Unittest Query randomly hung
- MS-587 - Count get wrong result after adding vectors and index built immediately
## Improvement
- MS-552 - Add and change the easylogging library
......
......@@ -275,7 +275,11 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
}
}
//step 3: wait and build index
//step 3: let merge file thread finish
//to avoid duplicate data bug
WaitMergeFileFinish();
//step 4: wait and build index
//for IDMAP type, only wait all NEW file converted to RAW file
//for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
std::vector<int> file_types;
......@@ -470,12 +474,8 @@ void DBImpl::BackgroundTimerTask() {
server::SystemInfo::GetInstance().Init();
while (true) {
if (shutting_down_.load(std::memory_order_acquire)){
for(auto& iter : compact_thread_results_) {
iter.wait();
}
for(auto& iter : index_thread_results_) {
iter.wait();
}
WaitMergeFileFinish();
WaitBuildIndexFinish();
ENGINE_LOG_DEBUG << "DB background thread exit";
break;
......@@ -489,6 +489,20 @@ void DBImpl::BackgroundTimerTask() {
}
}
void DBImpl::WaitMergeFileFinish() {
std::lock_guard<std::mutex> lck(compact_result_mutex_);
for(auto& iter : compact_thread_results_) {
iter.wait();
}
}
void DBImpl::WaitBuildIndexFinish() {
std::lock_guard<std::mutex> lck(index_result_mutex_);
for(auto& iter : index_thread_results_) {
iter.wait();
}
}
void DBImpl::StartMetricTask() {
static uint64_t metric_clock_tick = 0;
metric_clock_tick++;
......@@ -545,18 +559,24 @@ void DBImpl::StartCompactionTask() {
MemSerialize();
//compactiong has been finished?
if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
compact_thread_results_.pop_back();
{
std::lock_guard<std::mutex> lck(compact_result_mutex_);
if (!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
compact_thread_results_.pop_back();
}
}
}
//add new compaction task
if(compact_thread_results_.empty()) {
compact_thread_results_.push_back(
{
std::lock_guard<std::mutex> lck(compact_result_mutex_);
if (compact_thread_results_.empty()) {
compact_thread_results_.push_back(
compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
compact_table_ids_.clear();
compact_table_ids_.clear();
}
}
}
......@@ -700,17 +720,23 @@ void DBImpl::StartBuildIndexTask(bool force) {
}
//build index has been finished?
if(!index_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
index_thread_results_.pop_back();
{
std::lock_guard<std::mutex> lck(index_result_mutex_);
if (!index_thread_results_.empty()) {
std::chrono::milliseconds span(10);
if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
index_thread_results_.pop_back();
}
}
}
//add new build index task
if(index_thread_results_.empty()) {
index_thread_results_.push_back(
{
std::lock_guard<std::mutex> lck(index_result_mutex_);
if (index_thread_results_.empty()) {
index_thread_results_.push_back(
index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
}
}
}
......
......@@ -111,6 +111,8 @@ class DBImpl : public DB {
QueryResults &results);
void BackgroundTimerTask();
void WaitMergeFileFinish();
void WaitBuildIndexFinish();
void StartMetricTask();
......@@ -140,10 +142,12 @@ class DBImpl : public DB {
std::mutex mem_serialize_mutex_;
ThreadPool compact_thread_pool_;
std::mutex compact_result_mutex_;
std::list<std::future<void>> compact_thread_results_;
std::set<std::string> compact_table_ids_;
ThreadPool index_thread_pool_;
std::mutex index_result_mutex_;
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
......
......@@ -35,7 +35,7 @@ EngineFactory::Build(uint16_t dimension,
return nullptr;
}
ENGINE_LOG_DEBUG << "EngineFactory EngineTypee: " << (int)index_type;
ENGINE_LOG_DEBUG << "EngineFactory index type: " << (int)index_type;
ExecutionEnginePtr execution_engine_ptr =
std::make_shared<ExecutionEngineImpl>(dimension, location, index_type, metric_type, nlist);
......
......@@ -373,6 +373,9 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
}
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
}
......@@ -443,6 +446,7 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
}
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (std::exception &e) {
......@@ -589,6 +593,8 @@ Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableI
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
}
......@@ -621,6 +627,8 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
}
......@@ -725,6 +733,8 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
}
......@@ -762,6 +772,8 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
DeleteTableFiles(table_id);
}
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
}
......@@ -795,6 +807,9 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
}
} //Scoped Connection
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
}
......@@ -1001,6 +1016,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
}
} // Scoped Connection
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
return utils::CreateTableFilePath(options_, file_schema);
} catch (std::exception &e) {
......@@ -1082,6 +1098,9 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.push_back(table_file);
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
}
return ret;
} catch (std::exception &e) {
......@@ -1195,6 +1214,9 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
}
return ret;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
......@@ -1285,6 +1307,9 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-merge files";
}
return ret;
} catch (std::exception &e) {
......@@ -1369,6 +1394,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
table_files.emplace_back(file_schema);
}
ENGINE_LOG_DEBUG << "Get table files by id";
return ret;
} catch (std::exception &e) {
......@@ -1386,7 +1412,7 @@ Status MySQLMetaImpl::Archive() {
for (auto &kv : criterias) {
auto &criteria = kv.first;
auto &limit = kv.second;
if (criteria == "days") {
if (criteria == engine::ARCHIVE_CONF_DAYS) {
size_t usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
......@@ -1410,16 +1436,20 @@ Status MySQLMetaImpl::Archive() {
return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
}
ENGINE_LOG_DEBUG << "Archive old files";
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
}
}
if (criteria == "disk") {
if (criteria == engine::ARCHIVE_CONF_DISK) {
uint64_t sum = 0;
Size(sum);
auto to_delete = (sum - limit * G);
DiscardFiles(to_delete);
ENGINE_LOG_DEBUG << "Archive files to free disk";
}
}
......@@ -1596,6 +1626,8 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
}
} //Scoped Connection
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
}
......@@ -1625,6 +1657,8 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
}
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
}
......@@ -1705,6 +1739,8 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
}
} //Scoped Connection
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
}
......@@ -1782,6 +1818,11 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error());
}
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds";
}
} //Scoped Connection
} catch (std::exception &e) {
......@@ -1832,6 +1873,10 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", cleanUpFilesWithTTLQuery.error());
}
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << res.size() << " tables from meta";
}
} //Scoped Connection
} catch (std::exception &e) {
......@@ -1864,6 +1909,10 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
utils::DeleteTablePath(options_, table_id);
}
}
if(table_ids.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
}
}
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
......@@ -1904,6 +1953,9 @@ Status MySQLMetaImpl::CleanUp() {
}
}
if(res.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
}
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
}
......
......@@ -177,6 +177,9 @@ Status SqliteMetaImpl::DropPartitionsByDates(const std::string &table_id,
c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::date_, dates)
));
ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;
} catch (std::exception &e) {
return HandleException("Encounter exception when drop partition", e.what());
}
......@@ -217,6 +220,8 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
return HandleException("Encounter exception when create table", e.what());
}
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (std::exception &e) {
......@@ -241,6 +246,8 @@ Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
c(&TableSchema::state_) != (int) TableSchema::TO_DELETE
));
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table", e.what());
}
......@@ -266,6 +273,8 @@ Status SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table files", e.what());
}
......@@ -414,6 +423,8 @@ Status SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const Table
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP
));
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table index: table_id = " + table_id;
return HandleException(msg, e.what());
......@@ -434,6 +445,7 @@ Status SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag
where(
c(&TableSchema::table_id_) == table_id
));
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
} catch (std::exception &e) {
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
......@@ -508,6 +520,8 @@ Status SqliteMetaImpl::DropTableIndex(const std::string &table_id) {
c(&TableSchema::table_id_) == table_id
));
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table index files", e.what());
}
......@@ -603,6 +617,7 @@ Status SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
return utils::CreateTableFilePath(options_, file_schema);
} catch (std::exception& e) {
......@@ -666,6 +681,9 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.push_back(table_file);
}
if(selected.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
}
return ret;
} catch (std::exception &e) {
......@@ -704,31 +722,31 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
auto status = DescribeTable(table_schema);
if (!status.ok()) { return status; }
decltype(ConnectorPtr->select(select_columns)) result;
decltype(ConnectorPtr->select(select_columns)) selected;
if (partition.empty() && ids.empty()) {
auto filter = where(match_tableid and match_type);
result = ConnectorPtr->select(select_columns, filter);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto filter = where(match_tableid and match_fileid and match_type);
result = ConnectorPtr->select(select_columns, filter);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && ids.empty()) {
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_date and match_type);
result = ConnectorPtr->select(select_columns, filter);
selected = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_fileid and match_date and match_type);
result = ConnectorPtr->select(select_columns, filter);
selected = ConnectorPtr->select(select_columns, filter);
}
Status ret;
TableFileSchema table_file;
for (auto &file : result) {
for (auto &file : selected) {
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
......@@ -757,6 +775,9 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
}
if(selected.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-search files";
}
return ret;
} catch (std::exception &e) {
......@@ -824,6 +845,9 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
if(selected.size() > 0) {
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-merge files";
}
return result;
} catch (std::exception &e) {
......@@ -878,6 +902,7 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
table_files.emplace_back(file_schema);
}
ENGINE_LOG_DEBUG << "Get table files by id";
return result;
} catch (std::exception &e) {
return HandleException("Encounter exception when lookup table files", e.what());
......@@ -912,6 +937,8 @@ Status SqliteMetaImpl::Archive() {
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files", e.what());
}
ENGINE_LOG_DEBUG << "Archive old files";
}
if (criteria == engine::ARCHIVE_CONF_DISK) {
uint64_t sum = 0;
......@@ -919,6 +946,8 @@ Status SqliteMetaImpl::Archive() {
int64_t to_delete = (int64_t)sum - limit * G;
DiscardFiles(to_delete);
ENGINE_LOG_DEBUG << "Archive files to free disk";
}
}
......@@ -1026,6 +1055,8 @@ Status SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
ConnectorPtr->update(file_schema);
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
} catch (std::exception &e) {
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
+ " file_id = " + file_schema.file_id_;
......@@ -1049,6 +1080,9 @@ Status SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
));
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files to to_index", e.what());
}
......@@ -1094,6 +1128,7 @@ Status SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
return HandleException("UpdateTableFiles error: sqlite transaction failed");
}
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files", e.what());
}
......@@ -1143,6 +1178,10 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
}
if(files.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table files", e.what());
}
......@@ -1171,6 +1210,10 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
}
if(tables.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << tables.size() << " tables from meta";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table files", e.what());
}
......@@ -1188,6 +1231,10 @@ Status SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
}
if(table_ids.size() > 0) {
ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when delete table folder", e.what());
}
......@@ -1221,6 +1268,10 @@ Status SqliteMetaImpl::CleanUp() {
return HandleException("CleanUp error: sqlite transaction failed");
}
if(files.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
}
} catch (std::exception &e) {
return HandleException("Encounter exception when clean table file", e.what());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册