提交 33d6250e 编写于 作者: J jinhai

Merge branch 'branch-0.3.1' into 'branch-0.3.1'

MS-246 refine log

See merge request megasearch/milvus!267

Former-commit-id: 085c102f99cdbaecbb10101402964285e8e038ec
......@@ -102,6 +102,7 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
//dates partly delete files of the table but currently we don't support
ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
......@@ -132,6 +133,7 @@ Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count
Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
......@@ -140,6 +142,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
......@@ -160,6 +164,8 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by vectors";
//get all table files from table
meta::DatePartionedTableFilesSchema files;
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
......@@ -181,6 +187,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
uint64_t k, uint64_t nq, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by file ids";
//get specified files
std::vector<size_t> ids;
for (auto &id : file_ids) {
......@@ -269,6 +277,8 @@ void DBImpl::BackgroundTimerTask() {
for(auto& iter : index_thread_results_) {
iter.wait();
}
ENGINE_LOG_DEBUG << "DB background thread exit";
break;
}
......@@ -287,6 +297,8 @@ void DBImpl::StartMetricTask() {
return;
}
ENGINE_LOG_DEBUG << "Start metric task";
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
......@@ -299,20 +311,19 @@ void DBImpl::StartMetricTask() {
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
ENGINE_LOG_DEBUG << "Metric task finished";
}
void DBImpl::StartCompactionTask() {
// static int count = 0;
// count++;
// std::cout << "StartCompactionTask: " << count << std::endl;
// std::cout << "c: " << count++ << std::endl;
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
// std::cout << "c r: " << count++ << std::endl;
return;
}
ENGINE_LOG_DEBUG << "Serialize insert cache";
//serialize memory data
std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids);
......@@ -320,6 +331,8 @@ void DBImpl::StartCompactionTask() {
compact_table_ids_.insert(id);
}
ENGINE_LOG_DEBUG << "Insert cache serialized";
//compactiong has been finished?
if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
......@@ -338,13 +351,15 @@ void DBImpl::StartCompactionTask() {
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Merge files for table" << table_id;
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
table_file.date_ = date;
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_INFO << status.ToString() << std::endl;
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
return status;
}
......@@ -396,6 +411,7 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
return status;
}
......@@ -417,12 +433,14 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_DEBUG << " Background compaction thread start";
Status status;
for (auto& table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
return;
continue;//let other table get chance to merge
}
}
......@@ -433,6 +451,8 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ttl = meta::D_SEC;
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
ENGINE_LOG_DEBUG << " Background compaction thread exit";
}
void DBImpl::StartBuildIndexTask(bool force) {
......@@ -477,6 +497,7 @@ Status DBImpl::BuildIndex(const std::string& table_id) {
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status::Error("Invalid engine type");
}
......@@ -491,6 +512,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
return status;
}
......@@ -559,6 +581,8 @@ Status DBImpl::BuildIndexByTable(const std::string& table_id) {
}
void DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_DEBUG << " Background build index thread start";
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
......@@ -574,6 +598,8 @@ void DBImpl::BackgroundBuildIndex() {
break;
}
}
ENGINE_LOG_DEBUG << " Background build index thread exit";
}
Status DBImpl::DropAll() {
......
......@@ -233,9 +233,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
PrintTableSchema(tb_schema);
}
//add vectors
std::vector<std::pair<int64_t, RowRecord>> search_record_array;
{//add vectors
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {
TimeRecorder recorder("Add vector No." + std::to_string(i));
std::vector<RowRecord> record_array;
int64_t begin_index = i * BATCH_ROW_COUNT;
......@@ -245,9 +245,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "AddVector function call status: " << stat.ToString() << std::endl;
std::cout << "Returned id array count: " << record_ids.size() << std::endl;
if(search_record_array.size() < NQ) {
if(i == 0) {
for(int64_t k = SEARCH_TARGET; k < SEARCH_TARGET + NQ; k++) {
search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
std::make_pair(record_ids[k], record_array[k]));
}
}
}
......
......@@ -191,6 +191,7 @@ ServerError CreateTableTask::OnExecute() {
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "CreateTableTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -236,6 +237,7 @@ ServerError DescribeTableTask::OnExecute() {
schema_.store_raw_vector = table_info.store_raw_data_;
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "DescribeTableTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -279,6 +281,7 @@ ServerError BuildIndexTask::OnExecute() {
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "BuildIndexTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -316,6 +319,7 @@ ServerError HasTableTask::OnExecute() {
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "HasTableTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -365,6 +369,7 @@ ServerError DeleteTableTask::OnExecute() {
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "DeleteTableTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -481,6 +486,7 @@ ServerError AddVectorTask::OnExecute() {
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "AddVectorTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -604,6 +610,7 @@ ServerError SearchVectorTaskBase::OnExecute() {
<< " construct result(" << (span_result/total_cost)*100.0 << "%)";
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "SearchVectorTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -739,6 +746,7 @@ ServerError GetTableRowCountTask::OnExecute() {
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "GetTableRowCountTask encounter exception: " << ex.what();
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册