提交 df1ab644 编写于 作者: G groot

MS-624 Describe index timeout when building index


Former-commit-id: 88c605554c0a55c0fdfb2bec4ecbb77f00d49cef
上级 d7994da7
......@@ -21,6 +21,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-644 - Search crashed with index-type: flat
- MS-624 - Search vectors failed if time ranges long enough
- MS-652 - IVFSQH quantization double free
- MS-654 - Describe index timeout when building index
## Improvement
- MS-552 - Add and change the easylogging library
......
......@@ -39,27 +39,6 @@ mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%
mysql_exc "FLUSH PRIVILEGES;"
mysql_exc "USE ${MYSQL_DB_NAME};"
MYSQL_USER_NAME=root
MYSQL_PASSWORD=Fantast1c
MYSQL_HOST='192.168.1.194'
MYSQL_PORT='3306'
MYSQL_DB_NAME=milvus_`date +%s%N`
function mysql_exc()
{
cmd=$1
mysql -h${MYSQL_HOST} -u${MYSQL_USER_NAME} -p${MYSQL_PASSWORD} -e "${cmd}"
if [ $? -ne 0 ]; then
echo "mysql $cmd run failed"
fi
}
mysql_exc "CREATE DATABASE IF NOT EXISTS ${MYSQL_DB_NAME};"
mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%';"
mysql_exc "FLUSH PRIVILEGES;"
mysql_exc "USE ${MYSQL_DB_NAME};"
# get baseline
${LCOV_CMD} -c -i -d ${DIR_GCNO} -o "${FILE_INFO_BASE}"
if [ $? -ne 0 ]; then
......
......@@ -251,11 +251,6 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
Status status;
milvus::server::CollectInsertMetrics metrics(n, status);
status = mem_mgr_->InsertVectors(table_id, n, vectors, vector_ids);
// std::chrono::microseconds time_span =
// std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
// ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
return status;
}
......@@ -359,7 +354,7 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
// get all table files from table
meta::DatePartionedTableFilesSchema files;
......@@ -377,7 +372,7 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
return status;
}
......@@ -389,7 +384,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
return Status(DB_ERROR, "Milsvus server is shutdown!");
}
ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
// get specified files
std::vector<size_t> ids;
......@@ -418,7 +413,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
return status;
}
......@@ -437,14 +432,13 @@ DBImpl::Size(uint64_t& result) {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
uint64_t nprobe, const float* vectors, QueryResults& results) {
server::CollectQueryMetrics metrics(nq);
TimeRecorder rc("");
// step 1: get files to search
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
<< " date range count: " << dates.size();
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
......@@ -458,32 +452,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
return job->GetStatus();
}
// step 3: print time cost information
// double load_cost = context->LoadCost();
// double search_cost = context->SearchCost();
// double reduce_cost = context->ReduceCost();
// std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
// std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
// std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
// if(search_cost > 0.0 || reduce_cost > 0.0) {
// double total_cost = load_cost + search_cost + reduce_cost;
// double load_percent = load_cost/total_cost;
// double search_percent = search_cost/total_cost;
// double reduce_percent = reduce_cost/total_cost;
//
// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
// << " percent: " << load_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
// << " percent: " << search_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
// << " percent: " << reduce_percent*100 << "%";
// } else {
// ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
// << " search cost: " << search_info
// << " reduce cost: " << reduce_info;
// }
// step 4: construct results
// step 3: construct results
results = job->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
......@@ -695,14 +664,13 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
return status;
}
bool has_merge = false;
for (auto& kv : raw_files) {
auto files = kv.second;
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
has_merge = true;
MergeFiles(table_id, kv.first, kv.second);
if (shutting_down_.load(std::memory_order_acquire)) {
......@@ -770,127 +738,6 @@ DBImpl::StartBuildIndexTask(bool force) {
}
}
Status
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType)file.metric_type_, file.nlist_);
if (to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type");
}
try {
// step 1: load index
Status status = to_index->Load(options_.insert_cache_immediately_);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
return status;
}
// step 2: create table file
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
table_file.file_type_ =
meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
return status;
}
// step 3: build index
std::shared_ptr<ExecutionEngine> index;
try {
server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
if (index == nullptr) {
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
<< " to to_delete";
return status;
}
} catch (std::exception& ex) {
// typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
<< std::endl;
return Status(DB_ERROR, msg);
}
// step 4: if table has been deleted, dont save index file
bool has_table = false;
meta_ptr_->HasTable(file.table_id_, has_table);
if (!has_table) {
meta_ptr_->DeleteTableFiles(file.table_id_);
return Status::OK();
}
// step 5: save index file
try {
index->Serialize();
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
return Status(DB_ERROR, msg);
}
// step 6: update meta
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.file_size_ = index->PhysicalSize();
table_file.row_count_ = index->Count();
auto origin_file = file;
origin_file.file_type_ = meta::TableFileSchema::BACKUP;
meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr_->UpdateTableFiles(update_files);
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
<< " bytes"
<< " from file " << origin_file.file_id_;
if (options_.insert_cache_immediately_) {
index->Cache();
}
} else {
// failed to update meta, mark the new file as to_delete, don't delete old file
origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
status = meta_ptr_->UpdateTableFile(origin_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
}
} catch (std::exception& ex) {
std::string msg = "Build index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
}
return Status::OK();
}
void
DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_TRACE << "Background build index thread start";
......@@ -915,17 +762,6 @@ DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
}
}
// for (auto &file : to_index_files) {
// status = BuildIndex(file);
// if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break;
// }
// }
ENGINE_LOG_TRACE << "Background build index thread exit";
}
......
......@@ -107,7 +107,7 @@ class DBImpl : public DB {
private:
Status
QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results);
uint64_t nprobe, const float* vectors, QueryResults& results);
void
BackgroundTimerTask();
......@@ -133,9 +133,6 @@ class DBImpl : public DB {
void
BackgroundBuildIndex();
Status
BuildIndex(const meta::TableFileSchema&);
Status
MemSerialize();
......
......@@ -91,7 +91,8 @@ IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
std::shared_ptr<faiss::Index> device_index = std::shared_ptr<faiss::Index>(gpu_index);;
std::shared_ptr<faiss::Index> device_index = std::shared_ptr<faiss::Index>(gpu_index);
;
auto new_idx = std::make_shared<IVFSQHybrid>(device_index, device_id, res);
return new_idx;
} else {
......
......@@ -40,7 +40,7 @@ namespace grpc {
static const char* DQL_TASK_GROUP = "dql";
static const char* DDL_DML_TASK_GROUP = "ddl_dml";
static const char* PING_TASK_GROUP = "ping";
static const char* INFO_TASK_GROUP = "info";
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
......@@ -182,7 +182,7 @@ CreateTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) {
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), schema_(schema) {
}
BaseTaskPtr
......@@ -288,7 +288,7 @@ CreateIndexTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
}
BaseTaskPtr
......@@ -373,7 +373,7 @@ DropTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList* table_name_list)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_list_(table_name_list) {
: GrpcBaseTask(INFO_TASK_GROUP), table_name_list_(table_name_list) {
}
BaseTaskPtr
......@@ -683,7 +683,7 @@ SearchTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CountTableTask::CountTableTask(const std::string& table_name, int64_t& row_count)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
}
BaseTaskPtr
......@@ -725,7 +725,7 @@ CountTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CmdTask::CmdTask(const std::string& cmd, std::string& result)
: GrpcBaseTask(PING_TASK_GROUP), cmd_(cmd), result_(result) {
: GrpcBaseTask(INFO_TASK_GROUP), cmd_(cmd), result_(result) {
}
BaseTaskPtr
......@@ -816,7 +816,7 @@ DeleteByRangeTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string& table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
: GrpcBaseTask(DQL_TASK_GROUP), table_name_(table_name) {
}
BaseTaskPtr
......@@ -851,7 +851,7 @@ PreloadTableTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeIndexTask::DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
}
BaseTaskPtr
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册