diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 4645ffcbe60828f8ff0695ee0eb57d98e0168021..807c16f5b301dc7a86824dd5d7d474bf5973e58d 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -10,10 +10,18 @@ aux_source_directory(config config_files) aux_source_directory(server server_files) aux_source_directory(utils utils_files) aux_source_directory(db db_files) -aux_source_directory(db/scheduler db_scheduler_files) aux_source_directory(wrapper wrapper_files) aux_source_directory(metrics metrics_files) +aux_source_directory(db/scheduler scheduler_files) +aux_source_directory(db/scheduler/context scheduler_context_files) +aux_source_directory(db/scheduler/task scheduler_task_files) +set(db_scheduler_files + ${scheduler_files} + ${scheduler_context_files} + ${scheduler_task_files} + ) + set(license_check_files license/LicenseLibrary.cpp license/LicenseCheck.cpp diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 9b541b87262bd77023a4a841533cc8533f34384f..8202c8a7db510274b2d8ab9236a4f35606e3ba9e 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -8,7 +8,9 @@ #include "Log.h" #include "EngineFactory.h" #include "metrics/Metrics.h" -#include "scheduler/SearchScheduler.h" +#include "scheduler/TaskScheduler.h" +#include "scheduler/context/SearchContext.h" +#include "scheduler/context/DeleteContext.h" #include "utils/TimeRecorder.h" #include @@ -130,48 +132,52 @@ void CalcScore(uint64_t vector_count, DBImpl::DBImpl(const Options& options) : options_(options), shutting_down_(false), - pMeta_(new meta::DBMetaImpl(options_.meta)), - pMemMgr_(new MemManager(pMeta_, options_)), + meta_ptr_(new meta::DBMetaImpl(options_.meta)), + mem_mgr_(new MemManager(meta_ptr_, options_)), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { StartTimerTasks(); } Status DBImpl::CreateTable(meta::TableSchema& table_schema) { - return pMeta_->CreateTable(table_schema); + return meta_ptr_->CreateTable(table_schema); } Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { - //dates empty means delete all files of the table - if(dates.empty()) { - pMemMgr_->EraseMemVector(table_id); //not allow insert - pMeta_->DeleteTable(table_id); //soft delete - } + //dates partly delete files of the table but currently we don't support + + mem_mgr_->EraseMemVector(table_id); //not allow insert + meta_ptr_->DeleteTable(table_id); //soft delete table + + //scheduler will determine when to delete table files + TaskScheduler& scheduler = TaskScheduler::GetInstance(); + DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); + scheduler.Schedule(context); return Status::OK(); } Status DBImpl::DescribeTable(meta::TableSchema& table_schema) { - return pMeta_->DescribeTable(table_schema); + return meta_ptr_->DescribeTable(table_schema); } Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { - return pMeta_->HasTable(table_id, has_or_not); + return meta_ptr_->HasTable(table_id, has_or_not); } Status DBImpl::AllTables(std::vector& table_schema_array) { - return pMeta_->AllTables(table_schema_array); + return meta_ptr_->AllTables(table_schema_array); } Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { - return pMeta_->Count(table_id, row_count); + return meta_ptr_->Count(table_id, row_count); } Status DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vectors, IDNumbers& vector_ids_) { auto start_time = METRICS_NOW_TIME; - Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_); + Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_); auto end_time = METRICS_NOW_TIME; double total_time = METRICS_MICROSECONDS(start_time,end_time); // std::chrono::microseconds time_span = std::chrono::duration_cast(end_time - start_time); @@ -203,7 +209,7 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, //get all table files from table meta::DatePartionedTableFilesSchema files; - auto status = pMeta_->FilesToSearch(table_id, dates, files); + auto status = meta_ptr_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } meta::TableFilesSchema file_id_array; @@ -225,7 +231,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector for (auto &id : file_ids) { meta::TableFileSchema table_file; table_file.table_id_ = id; - auto status = pMeta_->GetTableFile(table_file); + auto status = meta_ptr_->GetTableFile(table_file); if (!status.ok()) { return status; } @@ -238,7 +244,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { meta::DatePartionedTableFilesSchema files; - auto status = pMeta_->FilesToSearch(table_id, dates, files); + auto status = meta_ptr_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size(); @@ -387,8 +393,8 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch } //step 2: put search task to scheduler - SearchScheduler& scheduler = SearchScheduler::GetInstance(); - scheduler.ScheduleSearchTask(context); + TaskScheduler& scheduler = TaskScheduler::GetInstance(); + scheduler.Schedule(context); context->WaitResult(); @@ -396,7 +402,7 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch auto& context_result = context->GetResult(); meta::TableSchema table_schema; table_schema.table_id_ = table_id; - pMeta_->DescribeTable(table_schema); + meta_ptr_->DescribeTable(table_schema); CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results); @@ -460,7 +466,7 @@ void DBImpl::StartCompactionTask() { //serialize memory data std::vector temp_table_ids; - pMemMgr_->Serialize(temp_table_ids); + mem_mgr_->Serialize(temp_table_ids); for(auto& id : temp_table_ids) { compact_table_ids_.insert(id); } @@ -486,10 +492,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, meta::TableFileSchema table_file; table_file.table_id_ = table_id; table_file.date_ = date; - Status status = pMeta_->CreateTableFile(table_file); + Status status = meta_ptr_->CreateTableFile(table_file); if (!status.ok()) { - LOG(INFO) << status.ToString() << std::endl; + ENGINE_LOG_INFO << status.ToString() << std::endl; return status; } @@ -510,7 +516,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); - LOG(DEBUG) << "Merging file " << file_schema.file_id_; + ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; index_size = index->Size(); if (index_size >= options_.index_trigger_size) break; @@ -526,8 +532,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, } table_file.size_ = index_size; updated.push_back(table_file); - status = pMeta_->UpdateTableFiles(updated); - LOG(DEBUG) << "New merged file " << table_file.file_id_ << + status = meta_ptr_->UpdateTableFiles(updated); + ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size=" << index->PhysicalSize()/(1024*1024) << " M"; index->Cache(); @@ -537,7 +543,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { meta::DatePartionedTableFilesSchema raw_files; - auto status = pMeta_->FilesToMerge(table_id, raw_files); + auto status = meta_ptr_->FilesToMerge(table_id, raw_files); if (!status.ok()) { return status; } @@ -569,8 +575,8 @@ void DBImpl::BackgroundCompaction(std::set table_ids) { } } - pMeta_->Archive(); - pMeta_->CleanUpFilesWithTTL(1); + meta_ptr_->Archive(); + meta_ptr_->CleanUpFilesWithTTL(1); } void DBImpl::StartBuildIndexTask() { @@ -596,27 +602,43 @@ void DBImpl::StartBuildIndexTask() { } Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { - meta::TableFileSchema table_file; - table_file.table_id_ = file.table_id_; - table_file.date_ = file.date_; - Status status = pMeta_->CreateTableFile(table_file); - if (!status.ok()) { - return status; - } - ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); if(to_index == nullptr) { return Status::Error("Invalid engine type"); } try { + //step 1: load index to_index->Load(); + + //step 2: create table file + meta::TableFileSchema table_file; + table_file.table_id_ = file.table_id_; + table_file.date_ = file.date_; + Status status = meta_ptr_->CreateTableFile(table_file); + if (!status.ok()) { + return status; + } + + //step 3: build index auto start_time = METRICS_NOW_TIME; auto index = to_index->BuildIndex(table_file.location_); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time); + //step 4: if table has been deleted, dont save index file + bool has_table = false; + meta_ptr_->HasTable(file.table_id_, has_table); + if(!has_table) { + meta_ptr_->DeleteTableFiles(file.table_id_); + return Status::OK(); + } + + //step 5: save index file + index->Serialize(); + + //step 6: update meta table_file.file_type_ = meta::TableFileSchema::INDEX; table_file.size_ = index->Size(); @@ -624,13 +646,13 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; meta::TableFilesSchema update_files = {to_remove, table_file}; - pMeta_->UpdateTableFiles(update_files); + meta_ptr_->UpdateTableFiles(update_files); - LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size " + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()/(1024*1024) << " M" << " from file " << to_remove.file_id_; - index->Cache(); + //index->Cache(); } catch (std::exception& ex) { return Status::Error("Build index encounter exception", ex.what()); @@ -641,10 +663,10 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { void DBImpl::BackgroundBuildIndex() { meta::TableFilesSchema to_index_files; - pMeta_->FilesToIndex(to_index_files); + meta_ptr_->FilesToIndex(to_index_files); Status status; for (auto& file : to_index_files) { - /* LOG(DEBUG) << "Buiding index for " << file.location; */ + /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */ status = BuildIndex(file); if (!status.ok()) { bg_error_ = status; @@ -655,22 +677,22 @@ void DBImpl::BackgroundBuildIndex() { break; } } - /* LOG(DEBUG) << "All Buiding index Done"; */ + /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */ } Status DBImpl::DropAll() { - return pMeta_->DropAll(); + return meta_ptr_->DropAll(); } Status DBImpl::Size(uint64_t& result) { - return pMeta_->Size(result); + return meta_ptr_->Size(result); } DBImpl::~DBImpl() { shutting_down_.store(true, std::memory_order_release); bg_timer_thread_.join(); std::vector ids; - pMemMgr_->Serialize(ids); + mem_mgr_->Serialize(ids); } } // namespace engine diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 177faeb9c111e826c993c3490d3192ce70c82df0..b4d60a27a9d518cbbb7f1c3a71ae6df5c07dd1fa 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -93,8 +93,8 @@ private: std::thread bg_timer_thread_; - MetaPtr pMeta_; - MemManagerPtr pMemMgr_; + MetaPtr meta_ptr_; + MemManagerPtr mem_mgr_; server::ThreadPool compact_thread_pool_; std::list> compact_thread_results_; diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 3ae34c1c2e505a752b5349ae1c25ec38eed9c42c..dbbc985067f60b7929cee418c9b5d716c31e689e 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -258,6 +258,28 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) { return Status::OK(); } +Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) { + try { + MetricCollector metric; + + //soft delete table files + ConnectorPtr->update_all( + set( + c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp() + ), + where( + c(&TableFileSchema::table_id_) == table_id and + c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE + )); + + } catch (std::exception &e) { + return HandleException("Encounter exception when delete table files", e); + } + + return Status::OK(); +} + Status DBMetaImpl::DescribeTable(TableSchema &table_schema) { try { MetricCollector metric; @@ -582,74 +604,6 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id, return Status::OK(); } -Status DBMetaImpl::FilesToDelete(const std::string& table_id, - const DatesT& partition, - DatePartionedTableFilesSchema& files) { - auto now = utils::GetMicroSecTimeStamp(); - try { - if(partition.empty()) { - //step 1: get table files by dates - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::table_id_, - &TableFileSchema::file_id_, - &TableFileSchema::size_, - &TableFileSchema::date_), - where(c(&TableFileSchema::table_id_) == table_id)); - - //step 2: erase table files from meta - for (auto &file : selected) { - TableFileSchema table_file; - table_file.id_ = std::get<0>(file); - table_file.table_id_ = std::get<1>(file); - table_file.file_id_ = std::get<2>(file); - table_file.size_ = std::get<3>(file); - table_file.date_ = std::get<4>(file); - GetTableFilePath(table_file); - auto dateItr = files.find(table_file.date_); - if (dateItr == files.end()) { - files[table_file.date_] = TableFilesSchema(); - } - files[table_file.date_].push_back(table_file); - - ConnectorPtr->remove(std::get<0>(file)); - } - - } else { - //step 1: get all table files - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::table_id_, - &TableFileSchema::file_id_, - &TableFileSchema::size_, - &TableFileSchema::date_), - where(in(&TableFileSchema::date_, partition) - and c(&TableFileSchema::table_id_) == table_id)); - - //step 2: erase table files from meta - for (auto &file : selected) { - TableFileSchema table_file; - table_file.id_ = std::get<0>(file); - table_file.table_id_ = std::get<1>(file); - table_file.file_id_ = std::get<2>(file); - table_file.size_ = std::get<3>(file); - table_file.date_ = std::get<4>(file); - GetTableFilePath(table_file); - auto dateItr = files.find(table_file.date_); - if (dateItr == files.end()) { - files[table_file.date_] = TableFilesSchema(); - } - files[table_file.date_].push_back(table_file); - - ConnectorPtr->remove(std::get<0>(file)); - } - } - - } catch (std::exception &e) { - return HandleException("Encounter exception when iterate delete files", e); - } - - return Status::OK(); -} - Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) { try { @@ -745,40 +699,51 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) { return Status::OK(); } - LOG(DEBUG) << "About to discard size=" << to_discard_size; + ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size; try { - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::size_), - where(c(&TableFileSchema::file_type_) + MetricCollector metric; + + auto commited = ConnectorPtr->transaction([&]() mutable { + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::size_), + where(c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE), - order_by(&TableFileSchema::id_), - limit(10)); + order_by(&TableFileSchema::id_), + limit(10)); - std::vector ids; - TableFileSchema table_file; + std::vector ids; + TableFileSchema table_file; - for (auto &file : selected) { - if (to_discard_size <= 0) break; - table_file.id_ = std::get<0>(file); - table_file.size_ = std::get<1>(file); - ids.push_back(table_file.id_); - ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ - << " table_file.size=" << table_file.size_; - to_discard_size -= table_file.size_; - } + for (auto &file : selected) { + if (to_discard_size <= 0) break; + table_file.id_ = std::get<0>(file); + table_file.size_ = std::get<1>(file); + ids.push_back(table_file.id_); + ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_ + << " table_file.size=" << table_file.size_; + to_discard_size -= table_file.size_; + } - if (ids.size() == 0) { - return Status::OK(); - } + if (ids.size() == 0) { + return true; + } - ConnectorPtr->update_all( - set( - c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE - ), - where( - in(&TableFileSchema::id_, ids) - )); + ConnectorPtr->update_all( + set( + c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE, + c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp() + ), + where( + in(&TableFileSchema::id_, ids) + )); + + return true; + }); + + if (!commited) { + return Status::DBTransactionError("Update table file error"); + } } catch (std::exception &e) { return HandleException("Encounter exception when discard table file", e); @@ -792,11 +757,21 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) { try { MetricCollector metric; + auto tables = ConnectorPtr->select(columns(&TableSchema::state_), + where(c(&TableSchema::table_id_) == file_schema.table_id_)); + + //if the table has been deleted, just mark the table file as TO_DELETE + //clean thread will delete the file later + if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) { + file_schema.file_type_ = TableFileSchema::TO_DELETE; + } + ConnectorPtr->update(file_schema); } catch (std::exception &e) { - ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_; - return HandleException("Encounter exception when update table file", e); + std::string msg = "Exception update table file: table_id = " + file_schema.table_id_ + + " file_id = " + file_schema.file_id_; + return HandleException(msg, e); } return Status::OK(); } @@ -805,16 +780,37 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { try { MetricCollector metric; + std::map has_tables; + for (auto &file : files) { + if(has_tables.find(file.table_id_) != has_tables.end()) { + continue; + } + auto tables = ConnectorPtr->select(columns(&TableSchema::id_), + where(c(&TableSchema::table_id_) == file.table_id_ + and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE)); + if(tables.size() >= 1) { + has_tables[file.table_id_] = true; + } else { + has_tables[file.table_id_] = false; + } + } + auto commited = ConnectorPtr->transaction([&]() mutable { for (auto &file : files) { + if(!has_tables[file.table_id_]) { + file.file_type_ = TableFileSchema::TO_DELETE; + } + file.updated_time_ = utils::GetMicroSecTimeStamp(); ConnectorPtr->update(file); } return true; }); + if (!commited) { - return Status::DBTransactionError("Update files Error"); + return Status::DBTransactionError("Update table files error"); } + } catch (std::exception &e) { return HandleException("Encounter exception when update table files", e); } @@ -824,35 +820,67 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) { Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { auto now = utils::GetMicroSecTimeStamp(); try { - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::table_id_, - &TableFileSchema::file_id_, - &TableFileSchema::file_type_, - &TableFileSchema::size_, - &TableFileSchema::date_), - where( - c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE - and - c(&TableFileSchema::updated_time_) - > now - seconds * US_PS)); + MetricCollector metric; - TableFilesSchema updated; - TableFileSchema table_file; + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::table_id_, + &TableFileSchema::file_id_, + &TableFileSchema::date_), + where( + c(&TableFileSchema::file_type_) == + (int) TableFileSchema::TO_DELETE + and + c(&TableFileSchema::updated_time_) + < now - seconds * US_PS)); - for (auto &file : selected) { - table_file.id_ = std::get<0>(file); - table_file.table_id_ = std::get<1>(file); - table_file.file_id_ = std::get<2>(file); - table_file.file_type_ = std::get<3>(file); - table_file.size_ = std::get<4>(file); - table_file.date_ = std::get<5>(file); - GetTableFilePath(table_file); - if (table_file.file_type_ == TableFileSchema::TO_DELETE) { + auto commited = ConnectorPtr->transaction([&]() mutable { + TableFileSchema table_file; + for (auto &file : files) { + table_file.id_ = std::get<0>(file); + table_file.table_id_ = std::get<1>(file); + table_file.file_id_ = std::get<2>(file); + table_file.date_ = std::get<3>(file); + GetTableFilePath(table_file); + + ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl; boost::filesystem::remove(table_file.location_); + ConnectorPtr->remove(table_file.id_); + + } + return true; + }); + + if (!commited) { + return Status::DBTransactionError("Clean files error"); + } + + } catch (std::exception &e) { + return HandleException("Encounter exception when clean table files", e); + } + + try { + MetricCollector metric; + + auto tables = ConnectorPtr->select(columns(&TableSchema::id_, + &TableSchema::table_id_), + where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE)); + + auto commited = ConnectorPtr->transaction([&]() mutable { + for (auto &table : tables) { + auto table_path = GetTablePath(std::get<1>(table)); + + ENGINE_LOG_DEBUG << "Remove table folder: " << table_path; + boost::filesystem::remove_all(table_path); + ConnectorPtr->remove(std::get<0>(table)); } - ConnectorPtr->remove(table_file.id_); - /* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */ + + return true; + }); + + if (!commited) { + return Status::DBTransactionError("Clean files error"); } + } catch (std::exception &e) { return HandleException("Encounter exception when clean table files", e); } @@ -862,35 +890,21 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { Status DBMetaImpl::CleanUp() { try { - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::table_id_, - &TableFileSchema::file_id_, - &TableFileSchema::file_type_, - &TableFileSchema::size_, - &TableFileSchema::date_), - where( - c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE - or - c(&TableFileSchema::file_type_) - == (int) TableFileSchema::NEW)); + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), + where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW)); - TableFilesSchema updated; - TableFileSchema table_file; - - for (auto &file : selected) { - table_file.id_ = std::get<0>(file); - table_file.table_id_ = std::get<1>(file); - table_file.file_id_ = std::get<2>(file); - table_file.file_type_ = std::get<3>(file); - table_file.size_ = std::get<4>(file); - table_file.date_ = std::get<5>(file); - GetTableFilePath(table_file); - if (table_file.file_type_ == TableFileSchema::TO_DELETE) { - boost::filesystem::remove(table_file.location_); + auto commited = ConnectorPtr->transaction([&]() mutable { + for (auto &file : files) { + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; + ConnectorPtr->remove(std::get<0>(file)); } - ConnectorPtr->remove(table_file.id_); - /* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */ + return true; + }); + + if (!commited) { + return Status::DBTransactionError("Clean files error"); } + } catch (std::exception &e) { return HandleException("Encounter exception when clean table file", e); } @@ -903,14 +917,12 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) { try { MetricCollector metric; - auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_, - &TableFileSchema::date_), - where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or - c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX - or - c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX) - and - c(&TableFileSchema::table_id_) == table_id)); + auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_), + where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW + or + c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX + or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX) + and c(&TableFileSchema::table_id_) == table_id)); TableSchema table_schema; table_schema.table_id_ = table_id; diff --git a/cpp/src/db/DBMetaImpl.h b/cpp/src/db/DBMetaImpl.h index dcac77ded837157ac8e19cd11da645108c5928c3..21ce8d1df6a96306155843cf84b73745c7e9b1a3 100644 --- a/cpp/src/db/DBMetaImpl.h +++ b/cpp/src/db/DBMetaImpl.h @@ -20,11 +20,13 @@ public: DBMetaImpl(const DBMetaOptions& options_); virtual Status CreateTable(TableSchema& table_schema) override; - virtual Status DeleteTable(const std::string& table_id) override; virtual Status DescribeTable(TableSchema& group_info_) override; virtual Status HasTable(const std::string& table_id, bool& has_or_not) override; virtual Status AllTables(std::vector& table_schema_array) override; + virtual Status DeleteTable(const std::string& table_id) override; + virtual Status DeleteTableFiles(const std::string& table_id) override; + virtual Status CreateTableFile(TableFileSchema& file_schema) override; virtual Status DropPartitionsByDates(const std::string& table_id, const DatesT& dates) override; @@ -42,10 +44,6 @@ public: virtual Status FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) override; - virtual Status FilesToDelete(const std::string& table_id, - const DatesT& partition, - DatePartionedTableFilesSchema& files) override; - virtual Status FilesToIndex(TableFilesSchema&) override; virtual Status Archive() override; diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index 467ebe31e0881a19977425a4288a3291d8b09d8f..9dfdd978c35d9d2919acb17e07f0e772be2e095e 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -4,8 +4,8 @@ * Proprietary and confidential. ******************************************************************************/ #include "FaissExecutionEngine.h" +#include "Log.h" -#include #include #include #include @@ -74,7 +74,7 @@ Status FaissExecutionEngine::Load() { if (!index) { index = read_index(location_); to_cache = true; - LOG(DEBUG) << "Disk io from: " << location_; + ENGINE_LOG_DEBUG << "Disk io from: " << location_; } pIndex_ = index->data(); @@ -98,6 +98,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) { if (location == location_) { return Status::Error("Cannot Merge Self"); } + ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_; + auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location); if (!to_merge) { to_merge = read_index(location); @@ -110,6 +112,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) { ExecutionEnginePtr FaissExecutionEngine::BuildIndex(const std::string& location) { + ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_; + auto opd = std::make_shared(); opd->d = pIndex_->d; opd->index_type = build_index_type_; @@ -122,7 +126,6 @@ FaissExecutionEngine::BuildIndex(const std::string& location) { from_index->id_map.data()); ExecutionEnginePtr new_ee(new FaissExecutionEngine(index->data(), location, build_index_type_, raw_index_type_)); - new_ee->Serialize(); return new_ee; } diff --git a/cpp/src/db/Meta.h b/cpp/src/db/Meta.h index d8242db884537faf29b5cb05c03d61408c635378..3bd60cc7feaaeba5817629df48cb02ec753d1222 100644 --- a/cpp/src/db/Meta.h +++ b/cpp/src/db/Meta.h @@ -24,11 +24,13 @@ public: using Ptr = std::shared_ptr; virtual Status CreateTable(TableSchema& table_schema) = 0; - virtual Status DeleteTable(const std::string& table_id) = 0; virtual Status DescribeTable(TableSchema& table_schema) = 0; virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0; virtual Status AllTables(std::vector& table_schema_array) = 0; + virtual Status DeleteTable(const std::string& table_id) = 0; + virtual Status DeleteTableFiles(const std::string& table_id) = 0; + virtual Status CreateTableFile(TableFileSchema& file_schema) = 0; virtual Status DropPartitionsByDates(const std::string& table_id, const DatesT& dates) = 0; @@ -45,10 +47,6 @@ public: virtual Status FilesToMerge(const std::string& table_id, DatePartionedTableFilesSchema& files) = 0; - virtual Status FilesToDelete(const std::string& table_id, - const DatesT& partition, - DatePartionedTableFilesSchema& files) = 0; - virtual Status Size(uint64_t& result) = 0; virtual Status Archive() = 0; diff --git a/cpp/src/db/scheduler/ScheduleStrategy.cpp b/cpp/src/db/scheduler/ScheduleStrategy.cpp deleted file mode 100644 index f80e01a1f4cefa6868625ca54678bc64e145c1bb..0000000000000000000000000000000000000000 --- a/cpp/src/db/scheduler/ScheduleStrategy.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ - - -#include "ScheduleStrategy.h" -#include "cache/CpuCacheMgr.h" -#include "utils/Error.h" -#include "utils/Log.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class MemScheduleStrategy : public IScheduleStrategy { -public: - bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override { - if(search_context == nullptr) { - return false; - } - - SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); - //some index loader alread exists - for(auto& loader : loader_list) { - if(index_files.find(loader->file_->id_) != index_files.end()){ - SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; - index_files.erase(loader->file_->id_); - loader->search_contexts_.push_back(search_context); - } - } - - //index_files still contains some index files, create new loader - for(auto& pair : index_files) { - SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_; - IndexLoaderContextPtr new_loader = std::make_shared(); - new_loader->search_contexts_.push_back(search_context); - new_loader->file_ = pair.second; - - auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_); - if(index != nullptr) { - //if the index file has been in memory, increase its priority - loader_list.push_front(new_loader); - } else { - //index file not in memory, put it to tail - loader_list.push_back(new_loader); - } - } - - return true; - } -}; - - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() { - ScheduleStrategyPtr strategy(new MemScheduleStrategy()); - return strategy; -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp deleted file mode 100644 index fa0dee88fb45ac7110f6ed0563f7215195027904..0000000000000000000000000000000000000000 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ /dev/null @@ -1,180 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ - -#include "SearchScheduler.h" -#include "IndexLoaderQueue.h" -#include "SearchTask.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" -#include "metrics/Metrics.h" -#include "db/EngineFactory.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -namespace { -void CollectFileMetrics(int file_type, size_t file_size) { - switch(file_type) { - case meta::TableFileSchema::RAW: - case meta::TableFileSchema::TO_INDEX: { - server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); - break; - } - default: { - server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); - break; - } - } -} - -void CollectDurationMetrics(int index_type, double total_time) { - switch(index_type) { - case meta::TableFileSchema::RAW: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - case meta::TableFileSchema::TO_INDEX: { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - break; - } - default: { - server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); - break; - } - } -} - -} - -SearchScheduler::SearchScheduler() - : stopped_(true) { - Start(); -} - -SearchScheduler::~SearchScheduler() { - Stop(); -} - -SearchScheduler& SearchScheduler::GetInstance() { - static SearchScheduler s_instance; - return s_instance; -} - -bool -SearchScheduler::Start() { - if(!stopped_) { - return true; - } - - stopped_ = false; - - search_queue_.SetCapacity(2); - - index_load_thread_ = std::make_shared(&SearchScheduler::IndexLoadWorker, this); - search_thread_ = std::make_shared(&SearchScheduler::SearchWorker, this); - - return true; -} - -bool -SearchScheduler::Stop() { - if(stopped_) { - return true; - } - - if(index_load_thread_) { - index_load_queue_.Put(nullptr); - index_load_thread_->join(); - index_load_thread_ = nullptr; - } - - if(search_thread_) { - search_queue_.Put(nullptr); - search_thread_->join(); - search_thread_ = nullptr; - } - - stopped_ = true; - - return true; -} - -bool -SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { - index_load_queue_.Put(search_context); - - return true; -} - -bool -SearchScheduler::IndexLoadWorker() { - while(true) { - IndexLoaderContextPtr context = index_load_queue_.Take(); - if(context == nullptr) { - SERVER_LOG_INFO << "Stop thread for index loading"; - break;//exit - } - - SERVER_LOG_INFO << "Loading index(" << context->file_->id_ << ") from location: " << context->file_->location_; - - server::TimeRecorder rc("Load index"); - //step 1: load index - ExecutionEnginePtr index_ptr = EngineFactory::Build(context->file_->dimension_, - context->file_->location_, - (EngineType)context->file_->engine_type_); - index_ptr->Load(); - - rc.Record("load index file to memory"); - - size_t file_size = index_ptr->PhysicalSize(); - LOG(DEBUG) << "Index file type " << context->file_->file_type_ << " Of Size: " - << file_size/(1024*1024) << " M"; - - CollectFileMetrics(context->file_->file_type_, file_size); - - //step 2: put search task into another queue - SearchTaskPtr task_ptr = std::make_shared(); - task_ptr->index_id_ = context->file_->id_; - task_ptr->index_type_ = context->file_->file_type_; - task_ptr->index_engine_ = index_ptr; - task_ptr->search_contexts_.swap(context->search_contexts_); - search_queue_.Put(task_ptr); - } - - return true; -} - -bool -SearchScheduler::SearchWorker() { - while(true) { - SearchTaskPtr task_ptr = search_queue_.Take(); - if(task_ptr == nullptr) { - SERVER_LOG_INFO << "Stop thread for searching"; - break;//exit - } - - SERVER_LOG_INFO << "Searching in index(" << task_ptr->index_id_<< ") with " - << task_ptr->search_contexts_.size() << " tasks"; - - //do search - auto start_time = METRICS_NOW_TIME; - task_ptr->DoSearch(); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - CollectDurationMetrics(task_ptr->index_type_, total_time); - } - - return true; -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h deleted file mode 100644 index 673d759056d64e867b175b5fd544939b8a74f170..0000000000000000000000000000000000000000 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ /dev/null @@ -1,48 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#pragma once - -#include "SearchContext.h" -#include "IndexLoaderQueue.h" -#include "SearchTask.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class SearchScheduler { -private: - SearchScheduler(); - virtual ~SearchScheduler(); - -public: - static SearchScheduler& GetInstance(); - - bool ScheduleSearchTask(SearchContextPtr& search_context); - -private: - bool Start(); - bool Stop(); - - bool IndexLoadWorker(); - bool SearchWorker(); - -private: - std::shared_ptr index_load_thread_; - std::shared_ptr search_thread_; - - IndexLoaderQueue index_load_queue_; - - using SearchTaskQueue = server::BlockingQueue; - SearchTaskQueue search_queue_; - - bool stopped_ = true; -}; - - -} -} -} diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/TaskDispatchQueue.cpp similarity index 76% rename from cpp/src/db/scheduler/IndexLoaderQueue.cpp rename to cpp/src/db/scheduler/TaskDispatchQueue.cpp index 2522815520ccc512f74d86118eccc54f7fe0da95..2ce0e933b46ea3e106f1d319e7254e7d905edc74 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/TaskDispatchQueue.cpp @@ -4,8 +4,8 @@ * Proprietary and confidential. ******************************************************************************/ -#include "IndexLoaderQueue.h" -#include "ScheduleStrategy.h" +#include "TaskDispatchQueue.h" +#include "TaskDispatchStrategy.h" #include "utils/Error.h" #include "utils/Log.h" @@ -14,12 +14,12 @@ namespace milvus { namespace engine { void -IndexLoaderQueue::Put(const SearchContextPtr &search_context) { +TaskDispatchQueue::Put(const ScheduleContextPtr &context) { std::unique_lock lock(mtx); full_.wait(lock, [this] { return (queue_.size() < capacity_); }); - if(search_context == nullptr) { - queue_.push_back(nullptr); + if(context == nullptr) { + queue_.push_front(nullptr); empty_.notify_all(); return; } @@ -32,14 +32,13 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy(); - strategy->Schedule(search_context, queue_); + TaskDispatchStrategy::Schedule(context, queue_); empty_.notify_all(); } -IndexLoaderContextPtr -IndexLoaderQueue::Take() { +ScheduleTaskPtr +TaskDispatchQueue::Take() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); @@ -49,20 +48,20 @@ IndexLoaderQueue::Take() { throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - IndexLoaderContextPtr front(queue_.front()); + ScheduleTaskPtr front(queue_.front()); queue_.pop_front(); full_.notify_all(); return front; } size_t -IndexLoaderQueue::Size() { +TaskDispatchQueue::Size() { std::lock_guard lock(mtx); return queue_.size(); } -IndexLoaderContextPtr -IndexLoaderQueue::Front() { +ScheduleTaskPtr +TaskDispatchQueue::Front() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); if (queue_.empty()) { @@ -70,12 +69,12 @@ IndexLoaderQueue::Front() { SERVER_LOG_ERROR << error_msg; throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - IndexLoaderContextPtr front(queue_.front()); + ScheduleTaskPtr front(queue_.front()); return front; } -IndexLoaderContextPtr -IndexLoaderQueue::Back() { +ScheduleTaskPtr +TaskDispatchQueue::Back() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); @@ -85,18 +84,18 @@ IndexLoaderQueue::Back() { throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - IndexLoaderContextPtr back(queue_.back()); + ScheduleTaskPtr back(queue_.back()); return back; } bool -IndexLoaderQueue::Empty() { +TaskDispatchQueue::Empty() { std::unique_lock lock(mtx); return queue_.empty(); } void -IndexLoaderQueue::SetCapacity(const size_t capacity) { +TaskDispatchQueue::SetCapacity(const size_t capacity) { capacity_ = (capacity > 0 ? capacity : capacity_); } diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/TaskDispatchQueue.h similarity index 53% rename from cpp/src/db/scheduler/IndexLoaderQueue.h rename to cpp/src/db/scheduler/TaskDispatchQueue.h index 4f6dcfcd67c50bf4c2bdabc5de14925e5d4eb65e..435be4f5972e88fa0c312530cf6b663134768d51 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.h +++ b/cpp/src/db/scheduler/TaskDispatchQueue.h @@ -5,7 +5,8 @@ ******************************************************************************/ #pragma once -#include "SearchContext.h" +#include "context/IScheduleContext.h" +#include "task/IScheduleTask.h" #include #include @@ -17,31 +18,23 @@ namespace zilliz { namespace milvus { namespace engine { - -class IndexLoaderContext { -public: - TableFileSchemaPtr file_; - std::vector search_contexts_; -}; -using IndexLoaderContextPtr = std::shared_ptr; - -class IndexLoaderQueue { +class TaskDispatchQueue { public: - IndexLoaderQueue() : mtx(), full_(), empty_() {} + TaskDispatchQueue() : mtx(), full_(), empty_() {} - IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; + TaskDispatchQueue(const TaskDispatchQueue &rhs) = delete; - IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; + TaskDispatchQueue &operator=(const TaskDispatchQueue &rhs) = delete; - using LoaderQueue = std::list; + using TaskList = std::list; - void Put(const SearchContextPtr &search_context); + void Put(const ScheduleContextPtr &context); - IndexLoaderContextPtr Take(); + ScheduleTaskPtr Take(); - IndexLoaderContextPtr Front(); + ScheduleTaskPtr Front(); - IndexLoaderContextPtr Back(); + ScheduleTaskPtr Back(); size_t Size(); @@ -54,7 +47,7 @@ private: std::condition_variable full_; std::condition_variable empty_; - LoaderQueue queue_; + TaskList queue_; size_t capacity_ = 1000000; }; diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7200f2584ff68177940240c8decbad307599d1c2 --- /dev/null +++ b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp @@ -0,0 +1,122 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "TaskDispatchStrategy.h" +#include "context/SearchContext.h" +#include "context/DeleteContext.h" +#include "task/IndexLoadTask.h" +#include "task/DeleteTask.h" +#include "cache/CpuCacheMgr.h" +#include "utils/Error.h" +#include "db/Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class ReuseCacheIndexStrategy { +public: + bool Schedule(const SearchContextPtr &context, std::list& task_list) { + if(context == nullptr) { + return false; + } + + SearchContext::Id2IndexMap index_files = context->GetIndexMap(); + //some index loader alread exists + for(auto& task : task_list) { + if(task->type() != ScheduleTaskType::kIndexLoad) { + continue; + } + + IndexLoadTaskPtr loader = std::static_pointer_cast(task); + if(index_files.find(loader->file_->id_) != index_files.end()){ + ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; + index_files.erase(loader->file_->id_); + loader->search_contexts_.push_back(context); + } + } + + //index_files still contains some index files, create new loader + for(auto& pair : index_files) { + ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_; + IndexLoadTaskPtr new_loader = std::make_shared(); + new_loader->search_contexts_.push_back(context); + new_loader->file_ = pair.second; + + auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_); + if(index != nullptr) { + //if the index file has been in memory, increase its priority + task_list.push_front(new_loader); + } else { + //index file not in memory, put it to tail + task_list.push_back(new_loader); + } + } + + return true; + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +class DeleteTableStrategy { +public: + bool Schedule(const DeleteContextPtr &context, std::list &task_list) { + if (context == nullptr) { + return false; + } + + DeleteTaskPtr delete_task = std::make_shared(context); + if(task_list.empty()) { + task_list.push_back(delete_task); + return true; + } + + std::string table_id = context->table_id(); + for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) { + if((*iter)->type() != ScheduleTaskType::kIndexLoad) { + continue; + } + + //put delete task to proper position + IndexLoadTaskPtr loader = std::static_pointer_cast(*iter); + if(loader->file_->table_id_ == table_id) { + + task_list.insert(++iter, delete_task); + break; + } + } + + return true; + } +}; + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr, + std::list &task_list) { + if(context_ptr == nullptr) { + return false; + } + + switch(context_ptr->type()) { + case ScheduleContextType::kSearch: { + SearchContextPtr search_context = std::static_pointer_cast(context_ptr); + ReuseCacheIndexStrategy strategy; + return strategy.Schedule(search_context, task_list); + } + case ScheduleContextType::kDelete: { + DeleteContextPtr delete_context = std::static_pointer_cast(context_ptr); + DeleteTableStrategy strategy; + return strategy.Schedule(delete_context, task_list); + } + default: + ENGINE_LOG_ERROR << "Invalid schedule task type"; + return false; + } +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/ScheduleStrategy.h b/cpp/src/db/scheduler/TaskDispatchStrategy.h similarity index 66% rename from cpp/src/db/scheduler/ScheduleStrategy.h rename to cpp/src/db/scheduler/TaskDispatchStrategy.h index 1fc0493e415e20f7ed70a3eb5afd87a2cee55f5e..d835e3ce86277acfdca63fa6375612d28ef0ddfe 100644 --- a/cpp/src/db/scheduler/ScheduleStrategy.h +++ b/cpp/src/db/scheduler/TaskDispatchStrategy.h @@ -5,18 +5,18 @@ ******************************************************************************/ #pragma once -#include "IScheduleStrategy.h" +#include "context/IScheduleContext.h" +#include "task/IScheduleTask.h" + +#include namespace zilliz { namespace milvus { namespace engine { -class StrategyFactory { -private: - StrategyFactory() {} - +class TaskDispatchStrategy { public: - static ScheduleStrategyPtr CreateMemStrategy(); + static bool Schedule(const ScheduleContextPtr &context_ptr, std::list& task_list); }; } diff --git a/cpp/src/db/scheduler/TaskScheduler.cpp b/cpp/src/db/scheduler/TaskScheduler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..04d6762aaeddc6045186b431f2781310aa3b8227 --- /dev/null +++ b/cpp/src/db/scheduler/TaskScheduler.cpp @@ -0,0 +1,117 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TaskScheduler.h" +#include "TaskDispatchQueue.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" +#include "db/EngineFactory.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +TaskScheduler::TaskScheduler() + : stopped_(true) { + Start(); +} + +TaskScheduler::~TaskScheduler() { + Stop(); +} + +TaskScheduler& TaskScheduler::GetInstance() { + static TaskScheduler s_instance; + return s_instance; +} + +bool +TaskScheduler::Start() { + if(!stopped_) { + return true; + } + + stopped_ = false; + + task_queue_.SetCapacity(2); + + task_dispatch_thread_ = std::make_shared(&TaskScheduler::TaskDispatchWorker, this); + task_thread_ = std::make_shared(&TaskScheduler::TaskWorker, this); + + return true; +} + +bool +TaskScheduler::Stop() { + if(stopped_) { + return true; + } + + if(task_dispatch_thread_) { + task_dispatch_queue_.Put(nullptr); + task_dispatch_thread_->join(); + task_dispatch_thread_ = nullptr; + } + + if(task_thread_) { + task_queue_.Put(nullptr); + task_thread_->join(); + task_thread_ = nullptr; + } + + stopped_ = true; + + return true; +} + +bool +TaskScheduler::Schedule(ScheduleContextPtr context) { + task_dispatch_queue_.Put(context); + + return true; +} + +bool +TaskScheduler::TaskDispatchWorker() { + while(true) { + ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take(); + if(task_ptr == nullptr) { + SERVER_LOG_INFO << "Stop db task dispatch thread"; + break;//exit + } + + //execute task + ScheduleTaskPtr next_task = task_ptr->Execute(); + if(next_task != nullptr) { + task_queue_.Put(next_task); + } + } + + return true; +} + +bool +TaskScheduler::TaskWorker() { + while(true) { + ScheduleTaskPtr task_ptr = task_queue_.Take(); + if(task_ptr == nullptr) { + SERVER_LOG_INFO << "Stop db task thread"; + break;//exit + } + + //execute task + ScheduleTaskPtr next_task = task_ptr->Execute(); + if(next_task != nullptr) { + task_queue_.Put(next_task); + } + } + + return true; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/TaskScheduler.h b/cpp/src/db/scheduler/TaskScheduler.h new file mode 100644 index 0000000000000000000000000000000000000000..d03fb858ac5dea26db41b03c5666945aa7f0c761 --- /dev/null +++ b/cpp/src/db/scheduler/TaskScheduler.h @@ -0,0 +1,49 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "context/IScheduleContext.h" +#include "task/IScheduleTask.h" +#include "TaskDispatchQueue.h" +#include "utils/BlockingQueue.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class TaskScheduler { +private: + TaskScheduler(); + virtual ~TaskScheduler(); + +public: + static TaskScheduler& GetInstance(); + + bool Schedule(ScheduleContextPtr context); + +private: + bool Start(); + bool Stop(); + + bool TaskDispatchWorker(); + bool TaskWorker(); + +private: + std::shared_ptr task_dispatch_thread_; + std::shared_ptr task_thread_; + + TaskDispatchQueue task_dispatch_queue_; + + using TaskQueue = server::BlockingQueue; + TaskQueue task_queue_; + + bool stopped_ = true; +}; + + +} +} +} diff --git a/cpp/src/db/scheduler/context/DeleteContext.cpp b/cpp/src/db/scheduler/context/DeleteContext.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bffeb9a134051886f291ce45a97dc231740b1120 --- /dev/null +++ b/cpp/src/db/scheduler/context/DeleteContext.cpp @@ -0,0 +1,22 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "DeleteContext.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +DeleteContext::DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr) + : IScheduleContext(ScheduleContextType::kDelete), + table_id_(table_id), + meta_ptr_(meta_ptr) { + +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/context/DeleteContext.h b/cpp/src/db/scheduler/context/DeleteContext.h new file mode 100644 index 0000000000000000000000000000000000000000..b909ecb69ea36ee98f0214ad822937235dee466e --- /dev/null +++ b/cpp/src/db/scheduler/context/DeleteContext.h @@ -0,0 +1,31 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IScheduleContext.h" +#include "db/Meta.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class DeleteContext : public IScheduleContext { +public: + DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr); + + std::string table_id() const { return table_id_; } + meta::Meta::Ptr meta() const { return meta_ptr_; } + +private: + std::string table_id_; + meta::Meta::Ptr meta_ptr_; +}; + +using DeleteContextPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/db/scheduler/context/IScheduleContext.h b/cpp/src/db/scheduler/context/IScheduleContext.h new file mode 100644 index 0000000000000000000000000000000000000000..6ae56e364e4b046faa05d52a99d28cb0e85baf2b --- /dev/null +++ b/cpp/src/db/scheduler/context/IScheduleContext.h @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class ScheduleContextType { + kUnknown = 0, + kSearch, + kDelete, +}; + +class IScheduleContext { +public: + IScheduleContext(ScheduleContextType type) + : type_(type) { + } + + virtual ~IScheduleContext() = default; + + ScheduleContextType type() const { return type_; } + +protected: + ScheduleContextType type_; +}; + +using ScheduleContextPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/db/scheduler/SearchContext.cpp b/cpp/src/db/scheduler/context/SearchContext.cpp similarity index 95% rename from cpp/src/db/scheduler/SearchContext.cpp rename to cpp/src/db/scheduler/context/SearchContext.cpp index 5bc27f06df3c7dac2879870794594aa496d03f65..493005f05a0351279540b8ba0e11342b8545ab7f 100644 --- a/cpp/src/db/scheduler/SearchContext.cpp +++ b/cpp/src/db/scheduler/context/SearchContext.cpp @@ -14,7 +14,8 @@ namespace milvus { namespace engine { SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors) - : topk_(topk), + : IScheduleContext(ScheduleContextType::kSearch), + topk_(topk), nq_(nq), vectors_(vectors) { //use current time to identify this context diff --git a/cpp/src/db/scheduler/SearchContext.h b/cpp/src/db/scheduler/context/SearchContext.h similarity index 95% rename from cpp/src/db/scheduler/SearchContext.h rename to cpp/src/db/scheduler/context/SearchContext.h index 72cbfa4f99d5c71f03a36eb3d17d0d3d19fbca92..1997b807640fa454d2589df8a6cce2eae63509f0 100644 --- a/cpp/src/db/scheduler/SearchContext.h +++ b/cpp/src/db/scheduler/context/SearchContext.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#include "IScheduleContext.h" #include "db/MetaTypes.h" #include @@ -18,7 +19,7 @@ namespace engine { using TableFileSchemaPtr = std::shared_ptr; -class SearchContext { +class SearchContext : public IScheduleContext { public: SearchContext(uint64_t topk, uint64_t nq, const float* vectors); diff --git a/cpp/src/db/scheduler/task/DeleteTask.cpp b/cpp/src/db/scheduler/task/DeleteTask.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a4660fc15756f38ef9f1ac3a158793dac4854d5f --- /dev/null +++ b/cpp/src/db/scheduler/task/DeleteTask.cpp @@ -0,0 +1,30 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "DeleteTask.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +DeleteTask::DeleteTask(const DeleteContextPtr& context) + : IScheduleTask(ScheduleTaskType::kDelete), + context_(context) { + +} + +std::shared_ptr DeleteTask::Execute() { + + if(context_ != nullptr && context_->meta() != nullptr) { + context_->meta()->DeleteTableFiles(context_->table_id()); + } + + return nullptr; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IScheduleStrategy.h b/cpp/src/db/scheduler/task/DeleteTask.h similarity index 57% rename from cpp/src/db/scheduler/IScheduleStrategy.h rename to cpp/src/db/scheduler/task/DeleteTask.h index a619c2a45ea48c8af411af7c8887dd3c4719f129..4617a943bb4545a31a7daadfc46cd20ad89f2519 100644 --- a/cpp/src/db/scheduler/IScheduleStrategy.h +++ b/cpp/src/db/scheduler/task/DeleteTask.h @@ -5,22 +5,25 @@ ******************************************************************************/ #pragma once -#include "IndexLoaderQueue.h" -#include "SearchContext.h" +#include "IScheduleTask.h" +#include "db/scheduler/context/DeleteContext.h" namespace zilliz { namespace milvus { namespace engine { -class IScheduleStrategy { +class DeleteTask : public IScheduleTask { public: - virtual ~IScheduleStrategy() {} + DeleteTask(const DeleteContextPtr& context); - virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; + virtual std::shared_ptr Execute() override; + +private: + DeleteContextPtr context_; }; -using ScheduleStrategyPtr = std::shared_ptr; +using DeleteTaskPtr = std::shared_ptr; } } -} \ No newline at end of file +} diff --git a/cpp/src/db/scheduler/task/IScheduleTask.h b/cpp/src/db/scheduler/task/IScheduleTask.h new file mode 100644 index 0000000000000000000000000000000000000000..652a1739fe1294bb8369d898e1ad367cf3385dd3 --- /dev/null +++ b/cpp/src/db/scheduler/task/IScheduleTask.h @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class ScheduleTaskType { + kUnknown = 0, + kIndexLoad, + kSearch, + kDelete, +}; + +class IScheduleTask { +public: + IScheduleTask(ScheduleTaskType type) + : type_(type) { + } + + virtual ~IScheduleTask() = default; + + ScheduleTaskType type() const { return type_; } + + virtual std::shared_ptr Execute() = 0; + +protected: + ScheduleTaskType type_; +}; + +using ScheduleTaskPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/db/scheduler/task/IndexLoadTask.cpp b/cpp/src/db/scheduler/task/IndexLoadTask.cpp new file mode 100644 index 0000000000000000000000000000000000000000..91f0e26577b4f468345c63aca06fc770e8bd4731 --- /dev/null +++ b/cpp/src/db/scheduler/task/IndexLoadTask.cpp @@ -0,0 +1,72 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "IndexLoadTask.h" +#include "SearchTask.h" +#include "db/Log.h" +#include "db/EngineFactory.h" +#include "utils/TimeRecorder.h" +#include "metrics/Metrics.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +namespace { +void CollectFileMetrics(int file_type, size_t file_size) { + switch(file_type) { + case meta::TableFileSchema::RAW: + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + default: { + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + break; + } + } +} +} + +IndexLoadTask::IndexLoadTask() + : IScheduleTask(ScheduleTaskType::kIndexLoad) { + +} + +std::shared_ptr IndexLoadTask::Execute() { + ENGINE_LOG_INFO << "Loading index(" << file_->id_ << ") from location: " << file_->location_; + + server::TimeRecorder rc("Load index"); + //step 1: load index + ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_, + file_->location_, + (EngineType)file_->engine_type_); + index_ptr->Load(); + + rc.Record("load index file to memory"); + + size_t file_size = index_ptr->PhysicalSize(); + LOG(DEBUG) << "Index file type " << file_->file_type_ << " Of Size: " + << file_size/(1024*1024) << " M"; + + CollectFileMetrics(file_->file_type_, file_size); + + //step 2: return search task for later execution + SearchTaskPtr task_ptr = std::make_shared(); + task_ptr->index_id_ = file_->id_; + task_ptr->index_type_ = file_->file_type_; + task_ptr->index_engine_ = index_ptr; + task_ptr->search_contexts_.swap(search_contexts_); + return std::static_pointer_cast(task_ptr); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/task/IndexLoadTask.h b/cpp/src/db/scheduler/task/IndexLoadTask.h new file mode 100644 index 0000000000000000000000000000000000000000..c3548d7311bc8b0ce7b36717c8951c841a1dfa05 --- /dev/null +++ b/cpp/src/db/scheduler/task/IndexLoadTask.h @@ -0,0 +1,30 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IScheduleTask.h" +#include "db/scheduler/context/SearchContext.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class IndexLoadTask : public IScheduleTask { +public: + IndexLoadTask(); + + virtual std::shared_ptr Execute() override; + +public: + TableFileSchemaPtr file_; + std::vector search_contexts_; +}; + +using IndexLoadTaskPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/db/scheduler/SearchTask.cpp b/cpp/src/db/scheduler/task/SearchTask.cpp similarity index 82% rename from cpp/src/db/scheduler/SearchTask.cpp rename to cpp/src/db/scheduler/task/SearchTask.cpp index 45503490d137ff5367987066c9ad81eb25fbc37b..d8c372699bfe70474005a4c58047d7496ea34490 100644 --- a/cpp/src/db/scheduler/SearchTask.cpp +++ b/cpp/src/db/scheduler/task/SearchTask.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #include "SearchTask.h" +#include "metrics/Metrics.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -110,15 +111,42 @@ void TopkResult(SearchContext::ResultSet &result_src, } } +void CollectDurationMetrics(int index_type, double total_time) { + switch(index_type) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + default: { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + break; + } + } } -bool SearchTask::DoSearch() { +} + +SearchTask::SearchTask() +: IScheduleTask(ScheduleTaskType::kSearch) { + +} + +std::shared_ptr SearchTask::Execute() { if(index_engine_ == nullptr) { - return false; + return nullptr; } + SERVER_LOG_INFO << "Searching in index(" << index_id_<< ") with " + << search_contexts_.size() << " tasks"; + server::TimeRecorder rc("DoSearch index(" + std::to_string(index_id_) + ")"); + auto start_time = METRICS_NOW_TIME; + std::vector output_ids; std::vector output_distence; for(auto& context : search_contexts_) { @@ -153,9 +181,13 @@ bool SearchTask::DoSearch() { context->IndexSearchDone(index_id_); } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + CollectDurationMetrics(index_type_, total_time); + rc.Elapse("totally cost"); - return true; + return nullptr; } } diff --git a/cpp/src/db/scheduler/SearchTask.h b/cpp/src/db/scheduler/task/SearchTask.h similarity index 77% rename from cpp/src/db/scheduler/SearchTask.h rename to cpp/src/db/scheduler/task/SearchTask.h index 3513c15a3e9a8eebfcc8979302d35ff0fad623e5..0b3a236ce47c1497ab3b10b626987184bfb90ca5 100644 --- a/cpp/src/db/scheduler/SearchTask.h +++ b/cpp/src/db/scheduler/task/SearchTask.h @@ -5,19 +5,19 @@ ******************************************************************************/ #pragma once -#include "SearchContext.h" -#include "utils/BlockingQueue.h" +#include "IScheduleTask.h" +#include "db/scheduler/context/SearchContext.h" #include "db/ExecutionEngine.h" -#include - namespace zilliz { namespace milvus { namespace engine { -class SearchTask { +class SearchTask : public IScheduleTask { public: - bool DoSearch(); + SearchTask(); + + virtual std::shared_ptr Execute() override; public: size_t index_id_ = 0; diff --git a/cpp/src/sdk/examples/simple/src/ClientTest.cpp b/cpp/src/sdk/examples/simple/src/ClientTest.cpp index b2ed9f3b25ff05a7fac33c5227ea044c9fa1ede5..3aad4e0789105eaba3c2e40b55c110008ddb06f6 100644 --- a/cpp/src/sdk/examples/simple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/simple/src/ClientTest.cpp @@ -20,7 +20,7 @@ namespace { static constexpr int64_t TOTAL_ROW_COUNT = 100000; static constexpr int64_t TOP_K = 10; static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different - static constexpr int64_t ADD_VECTOR_LOOP = 2; + static constexpr int64_t ADD_VECTOR_LOOP = 10; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl; @@ -195,10 +195,10 @@ ClientTest::Test(const std::string& address, const std::string& port) { PrintSearchResult(topk_query_result_array); } -// {//delete table -// Status stat = conn->DeleteTable(TABLE_NAME); -// std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl; -// } + {//delete table + Status stat = conn->DeleteTable(TABLE_NAME); + std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl; + } {//server status std::string status = conn->ServerStatus(); diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4cae31ea6b51b26066498b149b65a81ff18f3505 --- /dev/null +++ b/cpp/src/server/DBWrapper.cpp @@ -0,0 +1,42 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "DBWrapper.h" +#include "ServerConfig.h" +#include "utils/CommonUtil.h" +#include "utils/Log.h" + +namespace zilliz { +namespace milvus { +namespace server { + +DBWrapper::DBWrapper() { + zilliz::milvus::engine::Options opt; + ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); + opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL); + std::string db_path = config.GetValue(CONFIG_DB_PATH); + opt.meta.path = db_path + "/db"; + int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE); + if(index_size > 0) {//ensure larger than zero, unit is MB + opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; + } + + CommonUtil::CreateDirectory(opt.meta.path); + + zilliz::milvus::engine::DB::Open(opt, &db_); + if(db_ == nullptr) { + SERVER_LOG_ERROR << "Failed to open db"; + throw ServerException(SERVER_NULL_POINTER, "Failed to open db"); + } +} + +DBWrapper::~DBWrapper() { + delete db_; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h new file mode 100644 index 0000000000000000000000000000000000000000..5bd09bd0f4e32b5dcdb4798fc63c46948db7e11a --- /dev/null +++ b/cpp/src/server/DBWrapper.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "db/DB.h" +#include "db/Meta.h" + +namespace zilliz { +namespace milvus { +namespace server { + +class DBWrapper { +private: + DBWrapper(); + ~DBWrapper(); + +public: + static zilliz::milvus::engine::DB* DB() { + static DBWrapper db_wrapper; + return db_wrapper.db(); + } + + zilliz::milvus::engine::DB* db() { return db_; } + +private: + zilliz::milvus::engine::DB* db_ = nullptr; +}; + +} +} +} diff --git a/cpp/src/server/MilvusServer.cpp b/cpp/src/server/MilvusServer.cpp index 2b46627ad0092e5770a289dd8e770a9a961e71a6..f738c46711e010e9b2c389354747b97c29ec00b8 100644 --- a/cpp/src/server/MilvusServer.cpp +++ b/cpp/src/server/MilvusServer.cpp @@ -7,6 +7,7 @@ #include "RequestHandler.h" #include "ServerConfig.h" #include "ThreadPoolServer.h" +#include "DBWrapper.h" #include "milvus_types.h" #include "milvus_constants.h" @@ -51,6 +52,8 @@ MilvusServer::StartService() { std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool"); try { + DBWrapper::DB();//initialize db + stdcxx::shared_ptr handler(new RequestHandler()); stdcxx::shared_ptr processor(new MilvusServiceProcessor(handler)); stdcxx::shared_ptr server_transport(new TServerSocket(address, port)); diff --git a/cpp/src/server/RequestTask.cpp b/cpp/src/server/RequestTask.cpp index 3133e1cd80c4157a32a822a2ff1219243dfeee67..dbe4e6a7406ebd0b512608fe3b2b6123344fd442 100644 --- a/cpp/src/server/RequestTask.cpp +++ b/cpp/src/server/RequestTask.cpp @@ -8,8 +8,7 @@ #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" -#include "db/DB.h" -#include "db/Meta.h" +#include "DBWrapper.h" #include "version.h" namespace zilliz { @@ -26,43 +25,6 @@ using DB_META = zilliz::milvus::engine::meta::Meta; using DB_DATE = zilliz::milvus::engine::meta::DateT; namespace { - class DBWrapper { - public: - DBWrapper() { - zilliz::milvus::engine::Options opt; - ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); - opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL); - std::string db_path = config.GetValue(CONFIG_DB_PATH); - opt.meta.path = db_path + "/db"; - int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE); - if(index_size > 0) {//ensure larger than zero, unit is MB - opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; - } - - CommonUtil::CreateDirectory(opt.meta.path); - - zilliz::milvus::engine::DB::Open(opt, &db_); - if(db_ == nullptr) { - SERVER_LOG_ERROR << "Failed to open db"; - throw ServerException(SERVER_NULL_POINTER, "Failed to open db"); - } - } - - ~DBWrapper() { - delete db_; - } - - zilliz::milvus::engine::DB* DB() { return db_; } - - private: - zilliz::milvus::engine::DB* db_ = nullptr; - }; - - zilliz::milvus::engine::DB* DB() { - static DBWrapper db_wrapper; - return db_wrapper.DB(); - } - engine::EngineType EngineType(int type) { static std::map map_type = { {0, engine::EngineType::INVALID}, @@ -199,7 +161,7 @@ ServerError CreateTableTask::OnExecute() { table_info.store_raw_data_ = schema_.store_raw_vector; //step 3: create table - engine::Status stat = DB()->CreateTable(table_info); + engine::Status stat = DBWrapper::DB()->CreateTable(table_info); if(!stat.ok()) {//table could exist error_code_ = SERVER_UNEXPECTED_ERROR; error_msg_ = "Engine failed: " + stat.ToString(); @@ -246,7 +208,7 @@ ServerError DescribeTableTask::OnExecute() { //step 2: get table info engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; - engine::Status stat = DB()->DescribeTable(table_info); + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { error_code_ = SERVER_TABLE_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -297,7 +259,7 @@ ServerError DeleteTableTask::OnExecute() { //step 2: check table existence engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; - engine::Status stat = DB()->DescribeTable(table_info); + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { error_code_ = SERVER_TABLE_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -309,7 +271,7 @@ ServerError DeleteTableTask::OnExecute() { //step 3: delete table std::vector dates; - stat = DB()->DeleteTable(table_name_, dates); + stat = DBWrapper::DB()->DeleteTable(table_name_, dates); if(!stat.ok()) { SERVER_LOG_ERROR << "Engine failed: " << stat.ToString(); return SERVER_UNEXPECTED_ERROR; @@ -340,7 +302,7 @@ BaseTaskPtr ShowTablesTask::Create(std::vector& tables) { ServerError ShowTablesTask::OnExecute() { std::vector schema_array; - engine::Status stat = DB()->AllTables(schema_array); + engine::Status stat = DBWrapper::DB()->AllTables(schema_array); if(!stat.ok()) { error_code_ = SERVER_UNEXPECTED_ERROR; error_msg_ = "Engine failed: " + stat.ToString(); @@ -395,7 +357,7 @@ ServerError AddVectorTask::OnExecute() { //step 2: check table existence engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; - engine::Status stat = DB()->DescribeTable(table_info); + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { error_code_ = SERVER_TABLE_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -417,7 +379,7 @@ ServerError AddVectorTask::OnExecute() { //step 4: insert vectors uint64_t vec_count = (uint64_t)record_array_.size(); - stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_); + stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_); rc.Record("add vectors to engine"); if(!stat.ok()) { error_code_ = SERVER_UNEXPECTED_ERROR; @@ -493,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() { //step 2: check table existence engine::meta::TableSchema table_info; table_info.table_id_ = table_name_; - engine::Status stat = DB()->DescribeTable(table_info); + engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { error_code_ = SERVER_TABLE_NOT_EXIST; error_msg_ = "Engine failed: " + stat.ToString(); @@ -526,9 +488,9 @@ ServerError SearchVectorTask::OnExecute() { uint64_t record_count = (uint64_t)record_array_.size(); if(file_id_array_.empty()) { - stat = DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results); + stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results); } else { - stat = DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results); + stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results); } rc.Record("search vectors from engine"); @@ -599,7 +561,7 @@ ServerError GetTableRowCountTask::OnExecute() { //step 2: get row count uint64_t row_count = 0; - engine::Status stat = DB()->GetTableRowCount(table_name_, row_count); + engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count); if (!stat.ok()) { error_code_ = SERVER_UNEXPECTED_ERROR; error_msg_ = "Engine failed: " + stat.ToString(); diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 67ef5fccec0bc1cf6739d98a23db22e286e21c36..df47f14c659426d003c13588a3549acd67757eb9 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -4,11 +4,19 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) -aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler db_scheduler_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) +set(db_scheduler_srcs + ${scheduler_files} + ${scheduler_context_files} + ${scheduler_task_files} + ) + include_directories(/usr/local/cuda/include) link_directories("/usr/local/cuda/lib64") diff --git a/cpp/unittest/metrics/CMakeLists.txt b/cpp/unittest/metrics/CMakeLists.txt index 2f07cbed242a90e26c5f8e114c31d82f36de9cfb..9ae711ce252f424d12b47c66123f7071b58ea38a 100644 --- a/cpp/unittest/metrics/CMakeLists.txt +++ b/cpp/unittest/metrics/CMakeLists.txt @@ -13,12 +13,19 @@ include_directories(../../src) aux_source_directory(../../src/db db_srcs) -aux_source_directory(../../src/db/scheduler db_scheduler_srcs) aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) aux_source_directory(../../src/metrics metrics_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) +set(db_scheduler_srcs + ${scheduler_files} + ${scheduler_context_files} + ${scheduler_task_files} + ) include_directories(/usr/include) include_directories(../../third_party/build/include)