diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index ba78a8126e9fa050cab494ecf0360c31495d3e8d..ca09ffd73f0f25720a2105c64a9662fe8e0c01dc 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -103,7 +103,7 @@ if (BUILD_UNIT_TEST STREQUAL "ON") if (BUILD_COVERAGE STREQUAL "ON") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage") endif () - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DELPP_DISABLE_LOGS") +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DELPP_DISABLE_LOGS") add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/unittest) endif () diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index 14fd16cd0d7340d4fdab51b8124c9596eb866308..bf6bad4b473ae4b3038ad37ded1f65482524be4a 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -30,7 +30,7 @@ class DeletedDocsFormat { public: DeletedDocsFormat() = default; - std::string + static std::string FilePostfix(); void diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index b807215b022b00e4ec7523865f634ab1bea09509..f424ee2187861b5e0d8ad844819e3bca1498fe40 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -30,7 +30,7 @@ class IdBloomFilterFormat { public: IdBloomFilterFormat() = default; - std::string + static std::string FilePostfix(); void diff --git a/core/src/codecs/StructuredIndexFormat.h b/core/src/codecs/StructuredIndexFormat.h index 53409f94d5da53f91c31eb635ab900ad482c2e87..328204b2aac91b599c374d1220c1d602703efc17 100644 --- a/core/src/codecs/StructuredIndexFormat.h +++ b/core/src/codecs/StructuredIndexFormat.h @@ -32,7 +32,7 @@ class StructuredIndexFormat { public: StructuredIndexFormat() = default; - std::string + static std::string FilePostfix(); void diff --git a/core/src/codecs/VectorCompressFormat.h b/core/src/codecs/VectorCompressFormat.h index 2e7e0ddcb83173c9cd5bbdb55048bb6342ae2ce5..8f4a6c9de926c437960daedbf6b5ffa30a253815 100644 --- a/core/src/codecs/VectorCompressFormat.h +++ b/core/src/codecs/VectorCompressFormat.h @@ -30,7 +30,7 @@ class VectorCompressFormat { public: VectorCompressFormat() = default; - std::string + static std::string FilePostfix(); void diff --git a/core/src/codecs/VectorIndexFormat.h b/core/src/codecs/VectorIndexFormat.h index 5eccc78aefb5fb2e51fa1b973ecedb8b3bbee21c..6c8810e38852ff1d8dea4da2e64da3c318244c7a 100644 --- a/core/src/codecs/VectorIndexFormat.h +++ b/core/src/codecs/VectorIndexFormat.h @@ -31,7 +31,7 @@ class VectorIndexFormat { public: VectorIndexFormat() = default; - std::string + static std::string FilePostfix(); void diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 1192d76193596247944c7875bd542973172a46b1..39fc12a622ee27fe85508d56bde23b25a9db3c16 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -947,13 +947,20 @@ DBImpl::BackgroundBuildIndexTask(std::vector collection_names) { std::unique_lock lock(build_index_mutex_); for (auto collection_name : collection_names) { - SnapshotVisitor ss_visitor(collection_name); + snapshot::ScopedSnapshotT latest_ss; + auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name); + if (!status.ok()) { + return; + } + SnapshotVisitor ss_visitor(latest_ss); snapshot::IDS_TYPE segment_ids; ss_visitor.SegmentsToIndex("", segment_ids); + if (segment_ids.empty()) { + continue; + } - scheduler::BuildIndexJobPtr job = - std::make_shared(options_, collection_name, segment_ids); + scheduler::BuildIndexJobPtr job = std::make_shared(latest_ss, options_, segment_ids); scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitFinish(); diff --git a/core/src/db/engine/EngineFactory.cpp b/core/src/db/engine/EngineFactory.cpp index 14995a4582a118378ca042dba6fb0dd2310cd049..2aff7d150f2c156b9ea7b38f3f8b8b7b03d1c990 100644 --- a/core/src/db/engine/EngineFactory.cpp +++ b/core/src/db/engine/EngineFactory.cpp @@ -11,7 +11,6 @@ #include "db/engine/EngineFactory.h" #include "db/engine/ExecutionEngineImpl.h" -#include "db/snapshot/Snapshots.h" #include "utils/Log.h" #include @@ -22,42 +21,13 @@ namespace milvus { namespace engine { ExecutionEnginePtr -EngineFactory::Build(const std::string& dir_root, const std::string& collection_name, int64_t segment_id) { - snapshot::ScopedSnapshotT ss; - snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - auto seg_visitor = engine::SegmentVisitor::Build(ss, segment_id); - +EngineFactory::Build(const engine::snapshot::ScopedSnapshotT& snapshot, const std::string& dir_root, + int64_t segment_id) { + auto seg_visitor = engine::SegmentVisitor::Build(snapshot, segment_id); ExecutionEnginePtr execution_engine_ptr = std::make_shared(dir_root, seg_visitor); return execution_engine_ptr; } -void -EngineFactory::GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups) { - snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - LOG_ENGINE_ERROR_ << collection_name << " doesn't exist: " << status.message(); - return; - } - - std::set structured_fields; - std::vector field_names = ss->GetFieldNames(); - for (auto& field_name : field_names) { - auto field = ss->GetField(field_name); - auto ftype = field->GetFtype(); - if (ftype == meta::DataType::VECTOR_FLOAT || ftype == meta::DataType::VECTOR_BINARY) { - std::set index_field = {field_name}; - field_groups.emplace_back(index_field); - } else { - structured_fields.insert(field_name); - } - } - - if (!structured_fields.empty()) { - field_groups.emplace_back(structured_fields); - } -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/engine/EngineFactory.h b/core/src/db/engine/EngineFactory.h index ddc225b282e6771b6c2b7f036887a73709b13c70..2052ab192659df87e7d0fdff46efbcbc5c0b7e1a 100644 --- a/core/src/db/engine/EngineFactory.h +++ b/core/src/db/engine/EngineFactory.h @@ -12,6 +12,7 @@ #pragma once #include "ExecutionEngine.h" +#include "db/snapshot/Snapshots.h" #include "utils/Json.h" #include "utils/Status.h" @@ -23,13 +24,7 @@ namespace engine { class EngineFactory { public: static ExecutionEnginePtr - Build(const std::string& dir_root, const std::string& collection_name, int64_t segment_id); - - // this method distribute fields to multiple groups: - // put structured fields into one group - // each vector field as a group - static void - GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups); + Build(const engine::snapshot::ScopedSnapshotT& snapshot, const std::string& dir_root, int64_t segment_id); }; } // namespace engine diff --git a/core/src/db/engine/ExecutionEngine.h b/core/src/db/engine/ExecutionEngine.h index f55bce6d5e1777db1103269ebf79767e8024a06c..cbe42108c95ccaf600d568d4f8b6158ece6e7499 100644 --- a/core/src/db/engine/ExecutionEngine.h +++ b/core/src/db/engine/ExecutionEngine.h @@ -27,7 +27,6 @@ namespace milvus { namespace engine { using TargetFields = std::set; -using TargetFieldGroups = std::vector; struct ExecutionEngineContext { query::QueryPtr query_ptr_; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index f0e0a894b18110b788ed2270b0723ba8bd8a5018..451702a79abdb16747624388bb0d28d1d45b477e 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -101,20 +101,24 @@ ExecutionEngineImpl::CreateStructuredIndex(const milvus::engine::meta::DataType index_ptr = std::static_pointer_cast(int32_index_ptr); break; } + case engine::meta::DataType::UID: case engine::meta::DataType::INT64: { auto int64_index_ptr = std::make_shared>( raw_data.size(), reinterpret_cast(raw_data.data())); index_ptr = std::static_pointer_cast(int64_index_ptr); + break; } case engine::meta::DataType::FLOAT: { auto float_index_ptr = std::make_shared>( raw_data.size(), reinterpret_cast(raw_data.data())); index_ptr = std::static_pointer_cast(float_index_ptr); + break; } case engine::meta::DataType::DOUBLE: { auto double_index_ptr = std::make_shared>( raw_data.size(), reinterpret_cast(raw_data.data())); index_ptr = std::static_pointer_cast(double_index_ptr); + break; } default: { return Status(DB_ERROR, "Field is not structured type"); } } @@ -487,11 +491,11 @@ ExecutionEngineImpl::BuildIndex() { auto collection = snapshot->GetCollection(); auto& segment = segment_visitor->GetSegment(); - for (auto& field_name : target_fields_) { - snapshot::OperationContext context; - context.prev_partition = snapshot->GetResource(segment->GetPartitionId()); - auto build_op = std::make_shared(context, snapshot); + snapshot::OperationContext context; + context.prev_partition = snapshot->GetResource(segment->GetPartitionId()); + auto build_op = std::make_shared(context, snapshot); + for (auto& field_name : target_fields_) { // create snapshot segment files CollectionIndex index_info; auto status = CreateSnapshotIndexFile(build_op, field_name, index_info); @@ -504,7 +508,9 @@ ExecutionEngineImpl::BuildIndex() { auto& field = field_visitor->GetField(); auto root_path = segment_reader_->GetRootPath(); - auto segment_writer_ptr = std::make_shared(root_path, segment_visitor); + auto op_ctx = build_op->GetContext(); + auto new_visitor = SegmentVisitor::Build(snapshot, segment, op_ctx.new_segment_files); + auto segment_writer_ptr = std::make_shared(root_path, new_visitor); if (IsVectorField(field)) { knowhere::VecIndexPtr new_index; status = BuildKnowhereIndex(field_name, index_info, new_index); @@ -512,25 +518,32 @@ ExecutionEngineImpl::BuildIndex() { return status; } segment_writer_ptr->SetVectorIndex(field_name, new_index); - rc.RecordSection("build index"); + rc.RecordSection("build structured index"); + + // serialze index files + status = segment_writer_ptr->WriteVectorIndex(field_name); + if (!status.ok()) { + return status; + } + rc.RecordSection("serialize vector index"); } else { knowhere::IndexPtr index_ptr; segment_ptr->GetStructuredIndex(field_name, index_ptr); segment_writer_ptr->SetStructuredIndex(field_name, index_ptr); - rc.RecordSection("build index"); - } + rc.RecordSection("build structured index"); - // serialze index files - status = segment_writer_ptr->WriteVectorIndex(field_name); - if (!status.ok()) { - return status; + // serialze index files + status = segment_writer_ptr->WriteStructuredIndex(field_name); + if (!status.ok()) { + return status; + } + rc.RecordSection("serialize structured index"); } - rc.RecordSection("serialize index"); - - // finish transaction - build_op->Push(); } + // finish transaction + build_op->Push(); + return Status::OK(); } @@ -543,16 +556,26 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation, auto& field = field_visitor->GetField(); auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX); - auto& index_element = element_visitor->GetElement(); if (element_visitor == nullptr) { return Status(DB_ERROR, "Could not build index: index not specified"); // no index specified } + auto& index_element = element_visitor->GetElement(); + index_info.index_name_ = index_element->GetName(); + auto params = index_element->GetParams(); + if (params.find(engine::PARAM_INDEX_METRIC_TYPE) != params.end()) { + index_info.metric_name_ = params[engine::PARAM_INDEX_METRIC_TYPE]; + } + if (params.find(engine::PARAM_INDEX_EXTRA_PARAMS) != params.end()) { + index_info.extra_params_ = params[engine::PARAM_INDEX_EXTRA_PARAMS]; + } + snapshot::SegmentFilePtr seg_file = element_visitor->GetFile(); if (seg_file != nullptr) { // index already build? - std::string file_path = - engine::snapshot::GetResPath(segment_reader_->GetRootPath(), seg_file); + std::string file_path = engine::snapshot::GetResPath( + segment_reader_->GetCollectionsPath(), seg_file); + file_path += codec::VectorIndexFormat::FilePostfix(); if (CommonUtil::IsFileExist(file_path)) { return Status(DB_ERROR, "Could not build index: Index file already exist"); // index already build } @@ -563,6 +586,7 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation, sf_context.field_element_name = index_element->GetName(); sf_context.collection_id = segment->GetCollectionId(); sf_context.partition_id = segment->GetPartitionId(); + sf_context.segment_id = segment->GetID(); auto status = operation->CommitNewSegmentFile(sf_context, seg_file); if (!status.ok()) { @@ -589,6 +613,7 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation, sf_context.field_element_name = compress_element->GetName(); sf_context.collection_id = segment->GetCollectionId(); sf_context.partition_id = segment->GetPartitionId(); + sf_context.segment_id = segment->GetID(); auto status = operation->CommitNewSegmentFile(sf_context, seg_file); if (!status.ok()) { @@ -617,8 +642,8 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col } // build index by knowhere - auto to_index = CreateVecIndex(index_info.index_name_); - if (!to_index) { + new_index = CreateVecIndex(index_info.index_name_); + if (!new_index) { throw Exception(DB_ERROR, "Unsupported index type"); } @@ -628,24 +653,20 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col auto field_visitor = segment_visitor->GetFieldVisitor(field_name); auto& field = field_visitor->GetField(); - auto element_json = index_info.extra_params_; - auto metric_type = element_json[engine::PARAM_INDEX_METRIC_TYPE]; - auto index_params = element_json[engine::PARAM_INDEX_EXTRA_PARAMS]; - auto field_json = field->GetParams(); auto dimension = field_json[milvus::knowhere::meta::DIM]; auto segment_commit = snapshot->GetSegmentCommitBySegmentId(segment->GetID()); auto row_count = segment_commit->GetRowCount(); - milvus::json conf = index_params; + milvus::json conf = index_info.extra_params_; conf[knowhere::meta::DIM] = dimension; conf[knowhere::meta::ROWS] = row_count; conf[knowhere::meta::DEVICEID] = gpu_num_; - conf[knowhere::Metric::TYPE] = metric_type; + conf[knowhere::Metric::TYPE] = index_info.metric_name_; LOG_ENGINE_DEBUG_ << "Index params: " << conf.dump(); - auto adapter = knowhere::AdapterMgr::GetInstance().GetAdapter(to_index->index_type()); - if (!adapter->CheckTrain(conf, to_index->index_mode())) { + auto adapter = knowhere::AdapterMgr::GetInstance().GetAdapter(new_index->index_type()); + if (!adapter->CheckTrain(conf, new_index->index_mode())) { throw Exception(DB_ERROR, "Illegal index params"); } LOG_ENGINE_DEBUG_ << "Index config: " << conf.dump(); @@ -655,28 +676,28 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col if (from_index) { auto dataset = knowhere::GenDatasetWithIds(row_count, dimension, from_index->GetRawVectors(), from_index->GetRawIds()); - to_index->BuildAll(dataset, conf); + new_index->BuildAll(dataset, conf); uids = from_index->GetUids(); blacklist = from_index->GetBlacklist(); } else if (bin_from_index) { auto dataset = knowhere::GenDatasetWithIds(row_count, dimension, bin_from_index->GetRawVectors(), bin_from_index->GetRawIds()); - to_index->BuildAll(dataset, conf); + new_index->BuildAll(dataset, conf); uids = bin_from_index->GetUids(); blacklist = bin_from_index->GetBlacklist(); } #ifdef MILVUS_GPU_VERSION /* for GPU index, need copy back to CPU */ - if (to_index->index_mode() == knowhere::IndexMode::MODE_GPU) { - auto device_index = std::dynamic_pointer_cast(to_index); - to_index = device_index->CopyGpuToCpu(conf); + if (new_index->index_mode() == knowhere::IndexMode::MODE_GPU) { + auto device_index = std::dynamic_pointer_cast(new_index); + new_index = device_index->CopyGpuToCpu(conf); } #endif - to_index->SetUids(uids); + new_index->SetUids(uids); if (blacklist != nullptr) { - to_index->SetBlacklist(blacklist); + new_index->SetBlacklist(blacklist); } return Status::OK(); diff --git a/core/src/scheduler/job/BuildIndexJob.cpp b/core/src/scheduler/job/BuildIndexJob.cpp index d845954844c22f9e7b22eff7391422e7de1edbc8..6d1ee6d45808b8c507fcb473662e3f338b5a115a 100644 --- a/core/src/scheduler/job/BuildIndexJob.cpp +++ b/core/src/scheduler/job/BuildIndexJob.cpp @@ -19,25 +19,56 @@ namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(engine::DBOptions options, const std::string& collection_name, - const engine::snapshot::IDS_TYPE& segment_ids) - : Job(JobType::BUILD), options_(std::move(options)), collection_name_(collection_name), segment_ids_(segment_ids) { +namespace { + +// each vector field in one group +// all structured fields put into one group +void +WhichFieldsToBuild(const engine::snapshot::ScopedSnapshotT& snapshot, std::vector& field_groups) { + auto field_names = snapshot->GetFieldNames(); + engine::TargetFields structured_fields; + for (auto& field_name : field_names) { + auto field = snapshot->GetField(field_name); + engine::FIELD_TYPE ftype = static_cast(field->GetFtype()); + bool is_vector = (ftype == engine::FIELD_TYPE::VECTOR_FLOAT || ftype == engine::FIELD_TYPE::VECTOR_BINARY); + auto elements = snapshot->GetFieldElementsByField(field_name); + for (auto& element : elements) { + if (element->GetFtype() == engine::FieldElementType::FET_INDEX) { + // index has been defined + if (is_vector) { + engine::TargetFields fields = {field_name}; + field_groups.emplace_back(fields); + } else { + structured_fields.insert(field_name); + } + break; + } + } + } + + if (!structured_fields.empty()) { + field_groups.push_back(structured_fields); + } } -JobTasks -BuildIndexJob::CreateTasks() { - engine::TargetFieldGroups target_groups; - BuildIndexTask::GroupFieldsForIndex(collection_name_, target_groups); +} // namespace + +BuildIndexJob::BuildIndexJob(const engine::snapshot::ScopedSnapshotT& snapshot, engine::DBOptions options, + const engine::snapshot::IDS_TYPE& segment_ids) + : Job(JobType::BUILD), snapshot_(snapshot), options_(std::move(options)), segment_ids_(segment_ids) { +} - std::vector tasks; +void +BuildIndexJob::OnCreateTasks(JobTasks& tasks) { + std::vector field_groups; + WhichFieldsToBuild(snapshot_, field_groups); for (auto& id : segment_ids_) { - for (auto& group : target_groups) { - auto task = std::make_shared(options_, collection_name_, id, group, nullptr); + for (auto& group : field_groups) { + auto task = std::make_shared(snapshot_, options_, id, group, nullptr); task->job_ = this; tasks.emplace_back(task); } } - return tasks; } json diff --git a/core/src/scheduler/job/BuildIndexJob.h b/core/src/scheduler/job/BuildIndexJob.h index 7bb34f98701393a631158483315db15b9cd4c95c..3bf74175efb38579674e0bf6b0be842ff1b6786d 100644 --- a/core/src/scheduler/job/BuildIndexJob.h +++ b/core/src/scheduler/job/BuildIndexJob.h @@ -31,37 +31,32 @@ namespace scheduler { class BuildIndexJob : public Job { public: - explicit BuildIndexJob(engine::DBOptions options, const std::string& collection_name, + explicit BuildIndexJob(const engine::snapshot::ScopedSnapshotT&, engine::DBOptions options, const engine::snapshot::IDS_TYPE& segment_ids); ~BuildIndexJob() = default; public: - JobTasks - CreateTasks() override; - json Dump() const override; - public: engine::DBOptions options() const { return options_; } - const std::string& - collection_name() { - return collection_name_; - } - const engine::snapshot::IDS_TYPE& segment_ids() { return segment_ids_; } + protected: + void + OnCreateTasks(JobTasks& tasks) override; + private: + engine::snapshot::ScopedSnapshotT snapshot_; engine::DBOptions options_; - std::string collection_name_; engine::snapshot::IDS_TYPE segment_ids_; }; diff --git a/core/src/scheduler/job/Job.cpp b/core/src/scheduler/job/Job.cpp index 1d8aaaad5a0b4e2578f2b705a4472898b65ad4d3..0bdd1946578b1dba7e6cdda35b242fb3d95e62f1 100644 --- a/core/src/scheduler/job/Job.cpp +++ b/core/src/scheduler/job/Job.cpp @@ -33,6 +33,18 @@ Job::Dump() const { return ret; } +JobTasks +Job::CreateTasks() { + std::unique_lock lock(mutex_); + tasks_.clear(); + OnCreateTasks(tasks_); + tasks_created_ = true; + if (tasks_.empty()) { + cv_.notify_all(); + } + return tasks_; +} + void Job::TaskDone(Task* task) { if (task == nullptr) { @@ -52,17 +64,17 @@ Job::TaskDone(Task* task) { auto json = task->Dump(); std::string task_desc = json.dump(); - LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] task %s finish", "scheduler job", id(), task_desc.c_str()); + LOG_SERVER_DEBUG_ << LogOut("scheduler job [%ld] task %s finish", id(), task_desc.c_str()); } void Job::WaitFinish() { std::unique_lock lock(mutex_); - cv_.wait(lock, [this] { return tasks_.empty(); }); + cv_.wait(lock, [this] { return tasks_created_ && tasks_.empty(); }); + auto json = Dump(); std::string job_desc = json.dump(); - - LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] %s all done", "scheduler job", id(), job_desc.c_str()); + LOG_SERVER_DEBUG_ << LogOut("scheduler job [%ld] %s all done", id(), job_desc.c_str()); } } // namespace scheduler diff --git a/core/src/scheduler/job/Job.h b/core/src/scheduler/job/Job.h index 4466eb6b5a3defe3b456baf85ce1aef4f3d0976a..f41acf9d215115009172ae4c2bf38e018eec74b9 100644 --- a/core/src/scheduler/job/Job.h +++ b/core/src/scheduler/job/Job.h @@ -55,8 +55,8 @@ class Job : public interface::dumpable { json Dump() const override; - virtual JobTasks - CreateTasks() = 0; + JobTasks + CreateTasks(); void TaskDone(Task* task); @@ -72,6 +72,9 @@ class Job : public interface::dumpable { protected: explicit Job(JobType type); + virtual void + OnCreateTasks(JobTasks& tasks) = 0; + protected: Status status_; std::mutex mutex_; @@ -82,6 +85,7 @@ class Job : public interface::dumpable { JobType type_; JobTasks tasks_; + bool tasks_created_ = false; }; using JobPtr = std::shared_ptr; diff --git a/core/src/scheduler/job/SearchJob.cpp b/core/src/scheduler/job/SearchJob.cpp index ac159ae6e55709a0f8be94cb6f1d2aaa7711cc31..f58ff74aba606ce52cd6d53b5ed6a956167a9a76 100644 --- a/core/src/scheduler/job/SearchJob.cpp +++ b/core/src/scheduler/job/SearchJob.cpp @@ -21,15 +21,13 @@ SearchJob::SearchJob(const server::ContextPtr& context, engine::DBOptions option GetSegmentsFromQuery(query_ptr, segment_ids_); } -JobTasks -SearchJob::CreateTasks() { - std::vector tasks; +void +SearchJob::OnCreateTasks(JobTasks& tasks) { for (auto& id : segment_ids_) { auto task = std::make_shared(context_, options_, query_ptr_, id, nullptr); task->job_ = this; tasks.emplace_back(task); } - return tasks; } json diff --git a/core/src/scheduler/job/SearchJob.h b/core/src/scheduler/job/SearchJob.h index 19b19dd39fb9be5b60c9378ed118665a92d45cd5..05d075751bfbe2f37df749294f1c5491fc9360f5 100644 --- a/core/src/scheduler/job/SearchJob.h +++ b/core/src/scheduler/job/SearchJob.h @@ -42,13 +42,9 @@ class SearchJob : public Job { SearchJob(const server::ContextPtr& context, engine::DBOptions options, const query::QueryPtr& query_ptr); public: - JobTasks - CreateTasks() override; - json Dump() const override; - public: const server::ContextPtr& GetContext() const { return context_; @@ -74,6 +70,10 @@ class SearchJob : public Job { return segment_ids_; } + protected: + void + OnCreateTasks(JobTasks& tasks) override; + private: void GetSegmentsFromQuery(const query::QueryPtr& query_ptr, engine::snapshot::IDS_TYPE& segment_ids); diff --git a/core/src/scheduler/task/BuildIndexTask.cpp b/core/src/scheduler/task/BuildIndexTask.cpp index 4e41f92c2e52f87c7c29edfc15069d76db0d2fc0..03117948771674726beea139ecb91824033808be 100644 --- a/core/src/scheduler/task/BuildIndexTask.cpp +++ b/core/src/scheduler/task/BuildIndexTask.cpp @@ -23,12 +23,12 @@ namespace milvus { namespace scheduler { -BuildIndexTask::BuildIndexTask(const engine::DBOptions& options, const std::string& collection_name, +BuildIndexTask::BuildIndexTask(const engine::snapshot::ScopedSnapshotT& snapshot, const engine::DBOptions& options, engine::snapshot::ID_TYPE segment_id, const engine::TargetFields& target_fields, TaskLabelPtr label) : Task(TaskType::BuildIndexTask, std::move(label)), + snapshot_(snapshot), options_(options), - collection_name_(collection_name), segment_id_(segment_id), target_fields_(target_fields) { CreateExecEngine(); @@ -37,13 +37,13 @@ BuildIndexTask::BuildIndexTask(const engine::DBOptions& options, const std::stri void BuildIndexTask::CreateExecEngine() { if (execution_engine_ == nullptr) { - execution_engine_ = engine::EngineFactory::Build(options_.meta_.path_, collection_name_, segment_id_); + execution_engine_ = engine::EngineFactory::Build(snapshot_, options_.meta_.path_, segment_id_); } } Status BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) { - TimeRecorder rc("BuildIndexTask::Load"); + TimeRecorder rc("BuildIndexTask::OnLoad"); Status stat = Status::OK(); std::string error_msg; std::string type_str; @@ -61,7 +61,6 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) { error_msg = "Wrong load type"; stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - fiu_do_on("XSSBuildIndexTask.Load.throw_std_exception", throw std::exception()); } catch (std::exception& ex) { // typical error: out of disk space or permission denied error_msg = "Failed to load to_index file: " + std::string(ex.what()); @@ -80,7 +79,6 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) { } LOG_ENGINE_ERROR_ << s.message(); - return s; } @@ -89,7 +87,7 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) { Status BuildIndexTask::OnExecute() { - TimeRecorderAuto rc("XSSBuildIndexTask::Execute " + std::to_string(segment_id_)); + TimeRecorderAuto rc("BuildIndexTask::OnExecute " + std::to_string(segment_id_)); if (execution_engine_ == nullptr) { return Status(DB_ERROR, "execution engine is null"); @@ -97,7 +95,7 @@ BuildIndexTask::OnExecute() { auto status = execution_engine_->BuildIndex(); if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString(); + LOG_ENGINE_ERROR_ << "Failed to build index: " << status.ToString(); execution_engine_ = nullptr; return status; } @@ -105,10 +103,5 @@ BuildIndexTask::OnExecute() { return Status::OK(); } -void -BuildIndexTask::GroupFieldsForIndex(const std::string& collection_name, engine::TargetFieldGroups& groups) { - engine::EngineFactory::GroupFieldsForIndex(collection_name, groups); -} - } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/task/BuildIndexTask.h b/core/src/scheduler/task/BuildIndexTask.h index 95d0ba4940b9bab82199a7d4ec5410e2de0c7fb4..ce9ac071fb5065d11bf47a14353c76834b857286 100644 --- a/core/src/scheduler/task/BuildIndexTask.h +++ b/core/src/scheduler/task/BuildIndexTask.h @@ -24,7 +24,7 @@ namespace scheduler { class BuildIndexTask : public Task { public: - explicit BuildIndexTask(const engine::DBOptions& options, const std::string& collection_name, + explicit BuildIndexTask(const engine::snapshot::ScopedSnapshotT& snapshot, const engine::DBOptions& options, engine::snapshot::ID_TYPE segment_id, const engine::TargetFields& target_fields, TaskLabelPtr label); @@ -32,7 +32,6 @@ class BuildIndexTask : public Task { Dump() const override { json ret{ {"type", type_}, - {"collection_name", collection_name_}, {"segment_id", segment_id_}, }; return ret; @@ -44,16 +43,13 @@ class BuildIndexTask : public Task { Status OnExecute() override; - static void - GroupFieldsForIndex(const std::string& collection_name, engine::TargetFieldGroups& groups); - private: void CreateExecEngine(); public: - const engine::DBOptions& options_; - std::string collection_name_; + engine::snapshot::ScopedSnapshotT snapshot_; + engine::DBOptions options_; engine::snapshot::ID_TYPE segment_id_; // structured field could not be processed with vector field in a task diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 125ad4fa27f7ab380024ebc62bfa5972b98c4f8c..7407a6aa452117f772bfcad03c45eb5d632eb890 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -42,19 +42,20 @@ SearchTask::SearchTask(const server::ContextPtr& context, const engine::DBOption void SearchTask::CreateExecEngine() { if (execution_engine_ == nullptr && query_ptr_ != nullptr) { - execution_engine_ = engine::EngineFactory::Build(options_.meta_.path_, query_ptr_->collection_id, segment_id_); + engine::snapshot::ScopedSnapshotT latest_ss; + engine::snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, query_ptr_->collection_id); + execution_engine_ = engine::EngineFactory::Build(latest_ss, options_.meta_.path_, segment_id_); } } Status SearchTask::OnLoad(LoadType type, uint8_t device_id) { - TimeRecorder rc(LogOut("[%s][%ld]", "search", segment_id_)); + TimeRecorder rc("SearchTask::OnLoad " + std::to_string(segment_id_)); Status stat = Status::OK(); std::string error_msg; std::string type_str; try { - fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception()); if (type == LoadType::DISK2CPU) { engine::ExecutionEngineContext context; context.query_ptr_ = query_ptr_; @@ -73,7 +74,7 @@ SearchTask::OnLoad(LoadType type, uint8_t device_id) { } catch (std::exception& ex) { // typical error: out of disk space or permition denied error_msg = "Failed to load index file: " + std::string(ex.what()); - LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter exception: %s", "search", 0, error_msg.c_str()); + LOG_ENGINE_ERROR_ << LogOut("Search task encounter exception: %s", error_msg.c_str()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index dfdf6258fee5136a6e24d3ddbe59de8046d9158e..9e2d2c09cffd87c707bc9b90e769a0daf5608340 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -44,10 +44,10 @@ SegmentReader::SegmentReader(const std::string& dir_root, const engine::SegmentV Status SegmentReader::Initialize() { - dir_root_ += engine::COLLECTIONS_FOLDER; + dir_collections_ = dir_root_ + engine::COLLECTIONS_FOLDER; std::string directory = - engine::snapshot::GetResPath(dir_root_, segment_visitor_->GetSegment()); + engine::snapshot::GetResPath(dir_collections_, segment_visitor_->GetSegment()); storage::IOReaderPtr reader_ptr = std::make_shared(); storage::IOWriterPtr writer_ptr = std::make_shared(); @@ -113,7 +113,7 @@ SegmentReader::LoadField(const std::string& field_name, std::vector& ra auto field_visitor = segment_visitor_->GetFieldVisitor(field_name); auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW); std::string file_path = - engine::snapshot::GetResPath(dir_root_, raw_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, raw_visitor->GetFile()); auto& ss_codec = codec::Codec::instance(); ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw); @@ -138,8 +138,8 @@ SegmentReader::LoadFields() { if (!status.ok() || raw_data.empty()) { auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW); - std::string file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + std::string file_path = engine::snapshot::GetResPath( + dir_collections_, element_visitor->GetFile()); STATUS_CHECK(LoadField(file_path, raw_data)); } } @@ -154,7 +154,7 @@ SegmentReader::LoadEntities(const std::string& field_name, const std::vectorGetFieldVisitor(field_name); auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW); std::string file_path = - engine::snapshot::GetResPath(dir_root_, raw_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, raw_visitor->GetFile()); int64_t field_width = 0; segment_ptr_->GetFixedFieldWidth(field_name, field_width); @@ -263,7 +263,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data); } else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) { auto file_path = - engine::snapshot::GetResPath(dir_root_, visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data); } }; @@ -295,7 +295,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex // read index file std::string file_path = - engine::snapshot::GetResPath(dir_root_, index_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, index_visitor->GetFile()); ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, file_path, index_data); auto index_name = index_visitor->GetElement()->GetName(); @@ -310,7 +310,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex if (index_name == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8NR || index_name == knowhere::IndexEnum::INDEX_HNSW_SQ8NM) { if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) { - file_path = engine::snapshot::GetResPath(dir_root_, visitor->GetFile()); + file_path = + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data); } } @@ -347,9 +348,9 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde // read field index auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX); - if (index_visitor) { + if (index_visitor == nullptr || index_visitor->GetFile() != nullptr) { std::string file_path = - engine::snapshot::GetResPath(dir_root_, index_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, index_visitor->GetFile()); ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr); segment_ptr_->SetStructuredIndex(field_name, index_ptr); @@ -376,7 +377,7 @@ SegmentReader::LoadVectorIndice() { } std::string file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, element_visitor->GetFile()); if (engine::IsVectorField(field)) { knowhere::VecIndexPtr index_ptr; STATUS_CHECK(LoadVectorIndex(name, index_ptr)); @@ -400,7 +401,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); std::string file_path = - engine::snapshot::GetResPath(dir_root_, visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); auto& ss_codec = codec::Codec::instance(); ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr); @@ -427,7 +428,7 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string file_path = - engine::snapshot::GetResPath(dir_root_, visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); if (!boost::filesystem::exists(file_path)) { return Status::OK(); // file doesn't exist } @@ -459,7 +460,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) { auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string file_path = - engine::snapshot::GetResPath(dir_root_, visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); if (!boost::filesystem::exists(file_path)) { return Status::OK(); // file doesn't exist } @@ -496,7 +497,7 @@ SegmentReader::GetSegmentID(int64_t& id) { std::string SegmentReader::GetSegmentPath() { std::string seg_path = - engine::snapshot::GetResPath(dir_root_, segment_visitor_->GetSegment()); + engine::snapshot::GetResPath(dir_collections_, segment_visitor_->GetSegment()); return seg_path; } diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 3320b28b45318e5e6f121f89671f59e3d81b51c7..ab7d52350b113f046c055fb42c1c9a8febcac082 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -84,6 +84,11 @@ class SegmentReader { return dir_root_; } + std::string + GetCollectionsPath() const { + return dir_collections_; + } + engine::SegmentVisitorPtr GetSegmentVisitor() const { return segment_visitor_; @@ -97,7 +102,9 @@ class SegmentReader { engine::SegmentVisitorPtr segment_visitor_; storage::FSHandlerPtr fs_ptr_; engine::SegmentPtr segment_ptr_; + std::string dir_root_; + std::string dir_collections_; }; using SegmentReaderPtr = std::shared_ptr; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index a1ad8b33c0301e4d4fa55b90aef60ebc4a7cdce0..dec0596ec7de93b1bfc7585237edf8df31f87258 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -44,10 +44,10 @@ SegmentWriter::SegmentWriter(const std::string& dir_root, const engine::SegmentV Status SegmentWriter::Initialize() { - dir_root_ += engine::COLLECTIONS_FOLDER; + dir_collections_ = dir_root_ + engine::COLLECTIONS_FOLDER; std::string directory = - engine::snapshot::GetResPath(dir_root_, segment_visitor_->GetSegment()); + engine::snapshot::GetResPath(dir_collections_, segment_visitor_->GetSegment()); storage::IOReaderPtr reader_ptr = std::make_shared(); storage::IOWriterPtr writer_ptr = std::make_shared(); @@ -136,7 +136,7 @@ SegmentWriter::WriteFields() { auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW); std::string file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, element_visitor->GetFile()); STATUS_CHECK(WriteField(file_path, raw_data)); } @@ -158,7 +158,7 @@ SegmentWriter::WriteBloomFilter() { auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); std::string uid_blf_path = - engine::snapshot::GetResPath(dir_root_, uid_blf_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, uid_blf_visitor->GetFile()); auto& ss_codec = codec::Codec::instance(); segment::IdBloomFilterPtr bloom_filter_ptr; @@ -212,7 +212,7 @@ SegmentWriter::WriteDeletedDocs() { auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string file_path = - engine::snapshot::GetResPath(dir_root_, del_doc_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, del_doc_visitor->GetFile()); return WriteDeletedDocs(file_path, segment_ptr_->GetDeletedDocs()); } @@ -344,13 +344,13 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) { fs_ptr_->operation_ptr_->CreateDirectory(); std::string file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, element_visitor->GetFile()); ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index); element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8); if (element_visitor != nullptr) { - file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + file_path = engine::snapshot::GetResPath(dir_collections_, + element_visitor->GetFile()); ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index); } } catch (std::exception& e) { @@ -396,7 +396,7 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) { segment_ptr_->GetFieldType(field_name, field_type); std::string file_path = - engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); + engine::snapshot::GetResPath(dir_collections_, element_visitor->GetFile()); ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index); } catch (std::exception& e) { std::string err_msg = "Failed to write vector index: " + std::string(e.what()); @@ -431,7 +431,7 @@ SegmentWriter::GetSegmentID(int64_t& id) { std::string SegmentWriter::GetSegmentPath() { std::string seg_path = - engine::snapshot::GetResPath(dir_root_, segment_visitor_->GetSegment()); + engine::snapshot::GetResPath(dir_collections_, segment_visitor_->GetSegment()); return seg_path; } diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index 3fbd23a234b72739689fbc90fc68db62227bd998..7d7af27f98dffad685c7f21450193396e3edc083 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -81,6 +81,16 @@ class SegmentWriter { std::string GetSegmentPath(); + std::string + GetRootPath() const { + return dir_root_; + } + + engine::SegmentVisitorPtr + GetSegmentVisitor() const { + return segment_visitor_; + } + private: Status Initialize(); @@ -101,7 +111,9 @@ class SegmentWriter { engine::SegmentVisitorPtr segment_visitor_; storage::FSHandlerPtr fs_ptr_; engine::SegmentPtr segment_ptr_; + std::string dir_root_; + std::string dir_collections_; }; using SegmentWriterPtr = std::shared_ptr; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index b799f01fbed19c747f403d9ede41630e3d033c32..0e7031b9f41a4c6534b0c6ca6491f6a1e5b5588d 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -493,10 +493,23 @@ TEST_F(DBTest, IndexTest) { status = db_->Flush(); ASSERT_TRUE(status.ok()); -// milvus::engine::CollectionIndex index; -// index.index_name_ = "IVFLAT"; -// index.metric_name_ = "L2"; -// index.extra_params_["nlist"] = 2048; -// status = db_->CreateIndex(dummy_context_, collection_name, "vector", index); -// ASSERT_TRUE(status.ok()); + { + milvus::engine::CollectionIndex index; + index.index_name_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT; + index.metric_name_ = milvus::knowhere::Metric::L2; + index.extra_params_["nlist"] = 2048; + status = db_->CreateIndex(dummy_context_, collection_name, "vector", index); + ASSERT_TRUE(status.ok()); + } + + { +// milvus::engine::CollectionIndex index; +// index.index_name_ = "SORTED"; +// status = db_->CreateIndex(dummy_context_, collection_name, "field_0", index); +// ASSERT_TRUE(status.ok()); +// status = db_->CreateIndex(dummy_context_, collection_name, "field_1", index); +// ASSERT_TRUE(status.ok()); +// status = db_->CreateIndex(dummy_context_, collection_name, "field_2", index); +// ASSERT_TRUE(status.ok()); + } } diff --git a/core/unittest/db/utils.cpp b/core/unittest/db/utils.cpp index dc38c85d403cd4e94b143e483944b255ae2e3f4e..8bc190ce6710d14c71ee1c271df107194bf6b83e 100644 --- a/core/unittest/db/utils.cpp +++ b/core/unittest/db/utils.cpp @@ -250,6 +250,8 @@ DBTest::SetUp() { void DBTest::TearDown() { + db_ = nullptr; // db must be stopped before JobMgr and Snapshot + milvus::scheduler::JobMgrInst::GetInstance()->Stop(); milvus::scheduler::SchedInst::GetInstance()->Stop(); milvus::scheduler::CPUBuilderInst::GetInstance()->Stop(); @@ -257,7 +259,6 @@ DBTest::TearDown() { milvus::scheduler::ResMgrInst::GetInstance()->Clear(); BaseTest::SnapshotStop(); - db_ = nullptr; auto options = GetOptions(); boost::filesystem::remove_all(options.meta_.path_); @@ -330,13 +331,14 @@ SchedulerTest::SetUp() { void SchedulerTest::TearDown() { + db_ = nullptr; // db must be stopped before JobMgr and Snapshot + milvus::scheduler::JobMgrInst::GetInstance()->Stop(); milvus::scheduler::SchedInst::GetInstance()->Stop(); milvus::scheduler::CPUBuilderInst::GetInstance()->Stop(); milvus::scheduler::ResMgrInst::GetInstance()->Stop(); milvus::scheduler::ResMgrInst::GetInstance()->Clear(); - db_ = nullptr; BaseTest::SnapshotStop(); BaseTest::TearDown(); }