From d473c7ad2bf31655af95016d34b45cafd721a4c0 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 15 Jul 2020 14:32:09 +0800 Subject: [PATCH] snapshot integrate (#2854) * add test_segment Signed-off-by: yudong.cai * update interface GetResPath Signed-off-by: yudong.cai * retry ci Signed-off-by: yudong.cai * update SSSegmentWriter Signed-off-by: yudong.cai --- CHANGELOG.md | 2 +- core/src/db/SSDBImpl.cpp | 3 +- core/src/db/SnapshotHandlers.cpp | 27 ++++--- core/src/db/SnapshotHandlers.h | 4 +- core/src/db/insert/SSMemManagerImpl.cpp | 6 +- core/src/db/insert/SSMemSegment.cpp | 8 +- core/src/db/snapshot/CompoundOperations.cpp | 5 +- core/src/db/snapshot/Event.h | 3 +- core/src/db/snapshot/ResourceHelper.h | 40 ++++++---- core/src/db/snapshot/Snapshot.cpp | 2 +- core/src/segment/SSSegmentReader.cpp | 33 ++++---- core/src/segment/SSSegmentReader.h | 3 +- core/src/segment/SSSegmentWriter.cpp | 51 ++++++------- core/src/segment/SSSegmentWriter.h | 3 +- core/src/segment/Segment.cpp | 2 - core/src/segment/Segment.h | 2 - core/src/storage/disk/DiskOperation.cpp | 3 +- core/unittest/ssdb/test_segment.cpp | 83 +++++++++++++++++---- core/unittest/ssdb/utils.h | 25 +++++++ 19 files changed, 204 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bb852a4..da4fc075 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ Please mark all changes in change log and use the issue from GitHub - \#2692 Milvus hangs during multi-thread concurrent search - \#2739 Fix mishards start failed - \#2752 Milvus formats vectors data to double-precision and return to http client -- \#2767 fix a bug of getting wrong nprobe limitation in knowhere on GPU version +- \#2767 Fix a bug of getting wrong nprobe limitation in knowhere on GPU version - \#2776 Fix too many data copies during creating IVF index - \#2813 To implemente RNSG IP diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index ebfc2dce..9a334557 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -445,7 +445,8 @@ SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - auto handler = std::make_shared(nullptr, ss, id_array, field_names); + std::string dir_root = options_.meta_.path_; + auto handler = std::make_shared(nullptr, ss, dir_root, id_array, field_names); handler->Iterate(); STATUS_CHECK(handler->GetStatus()); diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index a5e8ef7e..95bbaa98 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -108,9 +108,17 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm /////////////////////////////////////////////////////////////////////////////// GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr& context, - engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids, + engine::snapshot::ScopedSnapshotT ss, + const std::string& dir_root, const IDNumbers& ids, const std::vector& field_names) - : BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() { + : BaseT(ss), + context_(context), + dir_root_(dir_root), + ids_(ids), + field_names_(field_names), + vector_data_(), + attr_type_(), + attr_data_() { for (auto& field_name : field_names_) { auto field_ptr = ss_->GetField(field_name); auto field_type = field_ptr->GetFtype(); @@ -126,25 +134,26 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) { if (segment_visitor == nullptr) { return Status(DB_ERROR, "Fail to build segment visitor with id " + std::to_string(segment->GetID())); } - segment::SSSegmentReader segment_reader(segment_visitor); + segment::SSSegmentReader segment_reader(dir_root_, segment_visitor); - /* load UID's bloom filter file */ auto uid_field_visitor = segment_visitor->GetFieldVisitor(DEFAULT_UID_NAME); - auto uid_bf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); - std::string uid_bf_path = snapshot::GetResPath(uid_bf_visitor->GetFile()); + + /* load UID's bloom filter file */ + auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); + std::string uid_blf_path = snapshot::GetResPath(dir_root_, uid_blf_visitor->GetFile()); segment::IdBloomFilterPtr id_bloom_filter_ptr; - STATUS_CHECK(segment_reader.LoadBloomFilter(uid_bf_path, id_bloom_filter_ptr)); + STATUS_CHECK(segment_reader.LoadBloomFilter(uid_blf_path, id_bloom_filter_ptr)); /* load UID's raw data */ auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_RAW); - std::string uid_raw_path = snapshot::GetResPath(uid_raw_visitor->GetFile()); + std::string uid_raw_path = snapshot::GetResPath(dir_root_, uid_raw_visitor->GetFile()); std::vector uids; STATUS_CHECK(segment_reader.LoadUids(uid_raw_path, uids)); /* load UID's deleted docs */ auto uid_del_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS); - std::string uid_del_path = snapshot::GetResPath(uid_del_visitor->GetFile()); + std::string uid_del_path = snapshot::GetResPath(dir_root_, uid_del_visitor->GetFile()); segment::DeletedDocsPtr deleted_docs_ptr; STATUS_CHECK(segment_reader.LoadDeletedDocs(uid_del_path, deleted_docs_ptr)); diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index 0f6cd12d..a6e28c97 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -65,13 +65,15 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler { using ResourceT = snapshot::Segment; using BaseT = snapshot::IterateHandler; - GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids, + GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, + const std::string& dir_root, const IDNumbers& ids, const std::vector& field_names); Status Handle(const typename ResourceT::Ptr&) override; const server::ContextPtr context_; + const std::string dir_root_; const engine::IDNumbers ids_; const std::vector field_names_; std::vector vector_data_; diff --git a/core/src/db/insert/SSMemManagerImpl.cpp b/core/src/db/insert/SSMemManagerImpl.cpp index 43f662ab..ea386960 100644 --- a/core/src/db/insert/SSMemManagerImpl.cpp +++ b/core/src/db/insert/SSMemManagerImpl.cpp @@ -17,6 +17,7 @@ #include "SSVectorSource.h" #include "db/Constants.h" #include "db/snapshot/Snapshots.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "utils/Log.h" namespace milvus { @@ -128,17 +129,16 @@ SSMemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, con return Status(DB_ERROR, err_msg + name); } break; - case meta::hybrid::DataType::VECTOR: case meta::hybrid::DataType::VECTOR_FLOAT: case meta::hybrid::DataType::VECTOR_BINARY: { json params = field->GetParams(); - if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) { + if (params.find(knowhere::meta::DIM) == params.end()) { std::string msg = "Vector field params must contain: dimension"; LOG_SERVER_ERROR_ << msg; return Status(DB_ERROR, msg); } - int64_t dimension = params[VECTOR_DIMENSION_PARAM]; + int64_t dimension = params[knowhere::meta::DIM]; int64_t row_size = (ftype == meta::hybrid::DataType::VECTOR_BINARY) ? dimension / 8 : dimension * sizeof(float); if (data_size != chunk->count_ * row_size) { diff --git a/core/src/db/insert/SSMemSegment.cpp b/core/src/db/insert/SSMemSegment.cpp index d4a23602..f4310581 100644 --- a/core/src/db/insert/SSMemSegment.cpp +++ b/core/src/db/insert/SSMemSegment.cpp @@ -23,6 +23,7 @@ #include "db/meta/MetaTypes.h" #include "db/snapshot/Operations.h" #include "db/snapshot/Snapshots.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "metrics/Metrics.h" #include "segment/SegmentReader.h" #include "utils/Log.h" @@ -109,7 +110,7 @@ SSMemSegment::CreateSegment() { auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files); // create segment writer - segment_writer_ptr_ = std::make_shared(visitor); + segment_writer_ptr_ = std::make_shared(options_.meta_.path_, visitor); return Status::OK(); } @@ -152,17 +153,16 @@ SSMemSegment::GetSingleEntitySize(size_t& single_size) { case meta::hybrid::DataType::INT64: single_size += sizeof(uint64_t); break; - case meta::hybrid::DataType::VECTOR: case meta::hybrid::DataType::VECTOR_FLOAT: case meta::hybrid::DataType::VECTOR_BINARY: { json params = field->GetParams(); - if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) { + if (params.find(knowhere::meta::DIM) == params.end()) { std::string msg = "Vector field params must contain: dimension"; LOG_SERVER_ERROR_ << msg; return Status(DB_ERROR, msg); } - int64_t dimension = params[VECTOR_DIMENSION_PARAM]; + int64_t dimension = params[knowhere::meta::DIM]; if (ftype == meta::hybrid::DataType::VECTOR_BINARY) { single_size += (dimension / 8); } else { diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index e0c453c3..3f762926 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -295,6 +295,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg auto ctx = context; ctx.segment_id = context_.new_segment->GetID(); ctx.partition_id = context_.new_segment->GetPartitionId(); + ctx.collection_id = GetStartedSS()->GetCollectionId(); auto new_sf_op = std::make_shared(ctx, GetStartedSS()); STATUS_CHECK(new_sf_op->Push()); STATUS_CHECK(new_sf_op->GetResource(created)); @@ -613,8 +614,8 @@ CreateCollectionOperation::DoExecute(Store& store) { auto& field_schema = field_kv.first; auto& field_elements = field_kv.second; FieldPtr field; - status = - store.CreateResource(Field(field_schema->GetName(), field_idx, field_schema->GetFtype()), field); + status = store.CreateResource( + Field(field_schema->GetName(), field_idx, field_schema->GetFtype(), field_schema->GetParams()), field); auto f_ctx_p = ResourceContextBuilder().SetOp(meta::oUpdate).CreatePtr(); AddStepWithLsn(*field, c_context_.lsn, f_ctx_p); MappingT element_ids = {}; diff --git a/core/src/db/snapshot/Event.h b/core/src/db/snapshot/Event.h index 9b91f91a..fd06c782 100644 --- a/core/src/db/snapshot/Event.h +++ b/core/src/db/snapshot/Event.h @@ -49,7 +49,7 @@ class ResourceGCEvent : public Event { STATUS_CHECK((*sd_op)(store)); /* TODO: physically clean resource */ - std::string res_path = GetResPath(res_); + std::string res_path = GetResPath(dir_root_, res_); /* if (!boost::filesystem::exists(res_path)) { */ /* return Status::OK(); */ /* } */ @@ -68,6 +68,7 @@ class ResourceGCEvent : public Event { private: class ResourceT::Ptr res_; + std::string dir_root_; }; } // namespace snapshot diff --git a/core/src/db/snapshot/ResourceHelper.h b/core/src/db/snapshot/ResourceHelper.h index 544342fe..e42aeb6f 100644 --- a/core/src/db/snapshot/ResourceHelper.h +++ b/core/src/db/snapshot/ResourceHelper.h @@ -13,57 +13,65 @@ #include #include -#include #include "db/snapshot/Resources.h" #include "utils/Status.h" namespace milvus::engine::snapshot { +static const char* COLLECTION_PREFIX = "C_"; +static const char* PARTITION_PREFIX = "P_"; +static const char* SEGMENT_PREFIX = "S_"; +static const char* SEGMENT_FILE_PREFIX = "F_"; + template inline std::string -GetResPath(const typename ResourceT::Ptr& res_ptr) { +GetResPath(const std::string& root, const typename ResourceT::Ptr& res_ptr) { return std::string(); } template <> inline std::string -GetResPath(const Collection::Ptr& res_ptr) { +GetResPath(const std::string& root, const Collection::Ptr& res_ptr) { std::stringstream ss; - ss << res_ptr->GetID(); + ss << root << "/"; + ss << COLLECTION_PREFIX << res_ptr->GetID(); return ss.str(); } template <> inline std::string -GetResPath(const Partition::Ptr& res_ptr) { +GetResPath(const std::string& root, const Partition::Ptr& res_ptr) { std::stringstream ss; - ss << res_ptr->GetCollectionId() << "/"; - ss << res_ptr->GetID(); + ss << root << "/"; + ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/"; + ss << PARTITION_PREFIX << res_ptr->GetID(); return ss.str(); } template <> inline std::string -GetResPath(const Segment::Ptr& res_ptr) { +GetResPath(const std::string& root, const Segment::Ptr& res_ptr) { std::stringstream ss; - ss << res_ptr->GetCollectionId() << "/"; - ss << res_ptr->GetPartitionId() << "/"; - ss << res_ptr->GetID(); + ss << root << "/"; + ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/"; + ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/"; + ss << SEGMENT_PREFIX << res_ptr->GetID(); return ss.str(); } template <> inline std::string -GetResPath(const SegmentFile::Ptr& res_ptr) { +GetResPath(const std::string& root, const SegmentFile::Ptr& res_ptr) { std::stringstream ss; - ss << res_ptr->GetCollectionId() << "/"; - ss << res_ptr->GetPartitionId() << "/"; - ss << res_ptr->GetSegmentId() << "/"; - ss << res_ptr->GetID(); + ss << root << "/"; + ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/"; + ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/"; + ss << SEGMENT_PREFIX << res_ptr->GetSegmentId() << "/"; + ss << SEGMENT_FILE_PREFIX << res_ptr->GetID(); return ss.str(); } diff --git a/core/src/db/snapshot/Snapshot.cpp b/core/src/db/snapshot/Snapshot.cpp index 39a8dbeb..8113fe09 100644 --- a/core/src/db/snapshot/Snapshot.cpp +++ b/core/src/db/snapshot/Snapshot.cpp @@ -234,7 +234,7 @@ Snapshot::ToString() const { ss << to_matrix_string(fc_m, row_element_size, 2); for (auto& fe_id : fc_m) { auto fe = GetResource(fe_id); - ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName(); + ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName() << " CID=" << fe->GetCollectionId(); } } diff --git a/core/src/segment/SSSegmentReader.cpp b/core/src/segment/SSSegmentReader.cpp index f1e307c7..8db3e0c4 100644 --- a/core/src/segment/SSSegmentReader.cpp +++ b/core/src/segment/SSSegmentReader.cpp @@ -32,9 +32,11 @@ namespace milvus { namespace segment { -SSSegmentReader::SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) { +SSSegmentReader::SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor) + : dir_root_(dir_root), segment_visitor_(segment_visitor) { auto& segment_ptr = segment_visitor_->GetSegment(); - std::string directory = engine::snapshot::GetResPath(segment_ptr); + std::string directory = + engine::snapshot::GetResPath(dir_root_, segment_visitor->GetSegment()); storage::IOReaderPtr reader_ptr = std::make_shared(); storage::IOWriterPtr writer_ptr = std::make_shared(); @@ -55,36 +57,39 @@ SSSegmentReader::Load() { try { // auto& ss_codec = codec::SSCodec::instance(); - auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME); /* load UID's raw data */ auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW); std::string uid_raw_path = - engine::snapshot::GetResPath(uid_raw_visitor->GetFile()); - std::vector uids; + engine::snapshot::GetResPath(dir_root_, uid_raw_visitor->GetFile()); STATUS_CHECK(LoadUids(uid_raw_path, segment_ptr_->vectors_ptr_->GetMutableUids())); /* load UID's deleted docs */ auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string uid_del_path = - engine::snapshot::GetResPath(uid_del_visitor->GetFile()); - segment::DeletedDocsPtr deleted_docs_ptr; + engine::snapshot::GetResPath(dir_root_, uid_del_visitor->GetFile()); STATUS_CHECK(LoadDeletedDocs(uid_del_path, segment_ptr_->deleted_docs_ptr_)); /* load other data */ + Status s; + auto& field_visitors_map = segment_visitor_->GetFieldVisitors(); for (auto& f_kv : field_visitors_map) { - auto& field_visitor = f_kv.second; - auto& field = field_visitor->GetField(); - for (auto& file_kv : field_visitor->GetElementVistors()) { - auto& field_element_visitor = file_kv.second; + auto& fv = f_kv.second; + auto& field = fv->GetField(); + for (auto& file_kv : fv->GetElementVistors()) { + auto& fev = file_kv.second; + std::string file_path = + engine::snapshot::GetResPath(dir_root_, fev->GetFile()); + if (!s.ok()) { + LOG_ENGINE_WARNING_ << "Cannot get resource path"; + } - auto& segment_file = field_element_visitor->GetFile(); + auto& segment_file = fev->GetFile(); if (segment_file == nullptr) { continue; } - auto file_path = engine::snapshot::GetResPath(segment_file); - auto& field_element = field_element_visitor->GetElement(); + auto& field_element = fev->GetElement(); if ((field->GetFtype() == engine::FieldType::VECTOR_FLOAT || field->GetFtype() == engine::FieldType::VECTOR_BINARY) && diff --git a/core/src/segment/SSSegmentReader.h b/core/src/segment/SSSegmentReader.h index a11b13da..d8ab4b12 100644 --- a/core/src/segment/SSSegmentReader.h +++ b/core/src/segment/SSSegmentReader.h @@ -32,7 +32,7 @@ namespace segment { class SSSegmentReader { public: - explicit SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor); + explicit SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor); // TODO(zhiru) Status @@ -70,6 +70,7 @@ class SSSegmentReader { engine::SegmentVisitorPtr segment_visitor_; storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; + std::string dir_root_; }; using SSSegmentReaderPtr = std::shared_ptr; diff --git a/core/src/segment/SSSegmentWriter.cpp b/core/src/segment/SSSegmentWriter.cpp index ac299ea3..37dd2727 100644 --- a/core/src/segment/SSSegmentWriter.cpp +++ b/core/src/segment/SSSegmentWriter.cpp @@ -26,6 +26,7 @@ #include "codecs/snapshot/SSCodec.h" #include "db/Utils.h" #include "db/snapshot/ResourceHelper.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "storage/disk/DiskIOReader.h" #include "storage/disk/DiskIOWriter.h" #include "storage/disk/DiskOperation.h" @@ -35,19 +36,22 @@ namespace milvus { namespace segment { -SSSegmentWriter::SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) { +SSSegmentWriter::SSSegmentWriter(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor) + : dir_root_(dir_root), segment_visitor_(segment_visitor) { Initialize(); } Status SSSegmentWriter::Initialize() { - auto& segment_ptr = segment_visitor_->GetSegment(); - std::string directory = engine::snapshot::GetResPath(segment_ptr); + std::string directory = + engine::snapshot::GetResPath(dir_root_, segment_visitor_->GetSegment()); storage::IOReaderPtr reader_ptr = std::make_shared(); storage::IOWriterPtr writer_ptr = std::make_shared(); storage::OperationPtr operation_ptr = std::make_shared(directory); fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + segment_ptr_ = std::make_shared(); const engine::SegmentVisitor::IdMapT& field_map = segment_visitor_->GetFieldVisitors(); @@ -55,17 +59,16 @@ SSSegmentWriter::Initialize() { const engine::snapshot::FieldPtr& field = iter.second->GetField(); std::string name = field->GetName(); engine::FIELD_TYPE ftype = static_cast(field->GetFtype()); - if (ftype == engine::FIELD_TYPE::VECTOR || ftype == engine::FIELD_TYPE::VECTOR || - ftype == engine::FIELD_TYPE::VECTOR) { + if (ftype == engine::FIELD_TYPE::VECTOR_FLOAT || ftype == engine::FIELD_TYPE::VECTOR_BINARY) { json params = field->GetParams(); - if (params.find(engine::VECTOR_DIMENSION_PARAM) == params.end()) { + if (params.find(knowhere::meta::DIM) == params.end()) { std::string msg = "Vector field params must contain: dimension"; LOG_SERVER_ERROR_ << msg; return Status(DB_ERROR, msg); } - uint64_t field_width = 0; - uint64_t dimension = params[engine::VECTOR_DIMENSION_PARAM]; + int64_t field_width = 0; + int64_t dimension = params[knowhere::meta::DIM]; if (ftype == engine::FIELD_TYPE::VECTOR_BINARY) { field_width += (dimension / 8); } else { @@ -103,19 +106,22 @@ SSSegmentWriter::Serialize() { segment_ptr_->GetFixedFieldData(name, raw_data); auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW); - std::string file_path = engine::snapshot::GetResPath(element_visitor->GetFile()); + std::string file_path = + engine::snapshot::GetResPath(dir_root_, element_visitor->GetFile()); STATUS_CHECK(WriteField(file_path, raw_data)); } - /* write UID's deleted docs */ + /* write empty UID's deleted docs */ auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); - std::string uid_del_path = engine::snapshot::GetResPath(uid_del_visitor->GetFile()); - STATUS_CHECK(WriteDeletedDocs(uid_del_path, segment_ptr_->GetDeletedDocs())); + std::string uid_del_path = + engine::snapshot::GetResPath(dir_root_, uid_del_visitor->GetFile()); + STATUS_CHECK(WriteDeletedDocs(uid_del_path)); - /* write UID's bloom filter */ - auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); - std::string uid_blf_path = engine::snapshot::GetResPath(uid_blf_visitor->GetFile()); - STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter())); + /* don't write UID's bloom filter */ + // auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); + // std::string uid_blf_path = + // engine::snapshot::GetResPath(dir_root_, uid_blf_visitor->GetFile()); + // STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter())); return Status::OK(); } @@ -124,10 +130,9 @@ Status SSSegmentWriter::WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw) { try { auto& ss_codec = codec::SSCodec::instance(); - fs_ptr_->operation_ptr_->CreateDirectory(); ss_codec.GetBlockFormat()->write(fs_ptr_, file_path, raw); } catch (std::exception& e) { - std::string err_msg = "Failed to write vectors: " + std::string(e.what()); + std::string err_msg = "Failed to write field: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; engine::utils::SendExitSignal(); @@ -176,11 +181,8 @@ SSSegmentWriter::WriteBloomFilter(const std::string& file_path) { Status SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& id_bloom_filter_ptr) { try { - TimeRecorder recorder("SSSegmentWriter::WriteBloomFilter"); auto& ss_codec = codec::SSCodec::instance(); - fs_ptr_->operation_ptr_->CreateDirectory(); ss_codec.GetIdBloomFilterFormat()->write(fs_ptr_, file_path, id_bloom_filter_ptr); - recorder.RecordSection("finish writing bloom filter"); } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; @@ -194,11 +196,7 @@ SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFil Status SSSegmentWriter::WriteDeletedDocs(const std::string& file_path) { DeletedDocsPtr deleted_docs_ptr = std::make_shared(); - auto status = WriteDeletedDocs(file_path, deleted_docs_ptr); - if (!status.ok()) { - return status; - } - + STATUS_CHECK(WriteDeletedDocs(file_path, deleted_docs_ptr)); segment_ptr_->SetDeletedDocs(deleted_docs_ptr); return Status::OK(); } @@ -207,7 +205,6 @@ Status SSSegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsPtr& deleted_docs) { try { auto& ss_codec = codec::SSCodec::instance(); - fs_ptr_->operation_ptr_->CreateDirectory(); ss_codec.GetDeletedDocsFormat()->write(fs_ptr_, file_path, deleted_docs); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); diff --git a/core/src/segment/SSSegmentWriter.h b/core/src/segment/SSSegmentWriter.h index 037f7d6c..fa3f85e9 100644 --- a/core/src/segment/SSSegmentWriter.h +++ b/core/src/segment/SSSegmentWriter.h @@ -34,7 +34,7 @@ namespace segment { class SSSegmentWriter { public: - explicit SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor); + explicit SSSegmentWriter(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor); Status AddChunk(const engine::DataChunkPtr& chunk_ptr); @@ -86,6 +86,7 @@ class SSSegmentWriter { engine::SegmentVisitorPtr segment_visitor_; storage::FSHandlerPtr fs_ptr_; engine::SegmentPtr segment_ptr_; + std::string dir_root_; }; using SSSegmentWriterPtr = std::shared_ptr; diff --git a/core/src/segment/Segment.cpp b/core/src/segment/Segment.cpp index d072e8e4..8166b109 100644 --- a/core/src/segment/Segment.cpp +++ b/core/src/segment/Segment.cpp @@ -23,8 +23,6 @@ namespace milvus { namespace engine { -const char* VECTOR_DIMENSION_PARAM = "dimension"; // hard code - Status Segment::AddField(const std::string& field_name, FIELD_TYPE field_type, uint64_t field_width) { if (field_types_.find(field_name) != field_types_.end()) { diff --git a/core/src/segment/Segment.h b/core/src/segment/Segment.h index e7abcb9b..42dfe7a7 100644 --- a/core/src/segment/Segment.h +++ b/core/src/segment/Segment.h @@ -31,8 +31,6 @@ namespace milvus { namespace engine { -extern const char* VECTOR_DIMENSION_PARAM; - using FIELD_TYPE = engine::meta::hybrid::DataType; using FIELD_TYPE_MAP = std::unordered_map; using FIELD_WIDTH_MAP = std::unordered_map; diff --git a/core/src/storage/disk/DiskOperation.cpp b/core/src/storage/disk/DiskOperation.cpp index 8ce5af10..eb3ea0b3 100644 --- a/core/src/storage/disk/DiskOperation.cpp +++ b/core/src/storage/disk/DiskOperation.cpp @@ -34,7 +34,8 @@ DiskOperation::CreateDirectory() { bool is_dir = boost::filesystem::is_directory(dir_path_); fiu_do_on("DiskOperation.CreateDirectory.is_directory", is_dir = false); if (!is_dir) { - auto ret = boost::filesystem::create_directory(dir_path_); + /* create directories recursively */ + auto ret = boost::filesystem::create_directories(dir_path_); fiu_do_on("DiskOperation.CreateDirectory.create_directory", ret = false); if (!ret) { std::string err_msg = "Failed to create directory: " + dir_path_; diff --git a/core/unittest/ssdb/test_segment.cpp b/core/unittest/ssdb/test_segment.cpp index 3b271936..82ce7d94 100644 --- a/core/unittest/ssdb/test_segment.cpp +++ b/core/unittest/ssdb/test_segment.cpp @@ -20,9 +20,11 @@ #include "db/Types.h" #include "db/snapshot/IterateHandler.h" #include "db/snapshot/Resources.h" +#include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "segment/SSSegmentReader.h" #include "segment/SSSegmentWriter.h" #include "segment/Types.h" +#include "utils/Json.h" using SegmentVisitor = milvus::engine::SegmentVisitor; @@ -46,8 +48,9 @@ CreateCollection(std::shared_ptr db, const std::string& collection_nam field_id++; /* field vector */ - auto vector_field = std::make_shared("vector", 0, milvus::engine::FieldType::VECTOR_FLOAT, - milvus::engine::snapshot::JEmpty, field_id); + milvus::json vector_param = {{milvus::knowhere::meta::DIM, 4}}; + auto vector_field = std::make_shared("vector", 0, milvus::engine::FieldType::VECTOR_FLOAT, vector_param, + field_id); auto vector_field_element_index = std::make_shared(collection_id, field_id, milvus::engine::DEFAULT_INDEX_NAME, milvus::engine::FieldElementType::FET_INDEX); @@ -63,6 +66,8 @@ TEST_F(SSSegmentTest, SegmentTest) { auto next_lsn = [&]() -> decltype(lsn) { return ++lsn; }; + + std::string db_root = "/tmp/milvus_test/db/table"; std::string c1 = "c1"; auto status = CreateCollection(db_, c1, next_lsn()); ASSERT_TRUE(status.ok()); @@ -76,27 +81,77 @@ TEST_F(SSSegmentTest, SegmentTest) { SegmentFileContext sf_context; SFContextBuilder(sf_context, ss); + std::vector contexts; + SFContextsBuilder(contexts, ss); + + + // std::cout << ss->ToString() << std::endl; + auto& partitions = ss->GetResources(); + ID_TYPE partition_id; for (auto& kv : partitions) { - ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok()); + /* select the first partition */ + partition_id = kv.first; + break; } - status = Snapshots::GetInstance().GetSnapshot(ss, c1); - ASSERT_TRUE(status.ok()); - std::vector raw_uids = {123}; std::vector raw_vectors = {1, 2, 3, 4}; - auto& segments = ss->GetResources(); - for (auto& kv : segments) { - auto segment = kv.second; - auto visitor = SegmentVisitor::Build(ss, segment->GetID()); - milvus::segment::SSSegmentWriter segment_writer(visitor); -// status = segment_writer.AddVectors("test", raw_vectors, raw_uids); + { + /* commit new segment */ + OperationContext context; + context.lsn = next_lsn(); + context.prev_partition = ss->GetResource(partition_id); + auto op = std::make_shared(context, ss); + SegmentPtr new_seg; + status = op->CommitNewSegment(new_seg); + ASSERT_TRUE(status.ok()); + + /* commit new segment file */ + for (auto& cctx : contexts) { + SegmentFilePtr seg_file; + auto nsf_context = cctx; + nsf_context.segment_id = new_seg->GetID(); + nsf_context.partition_id = new_seg->GetPartitionId(); + status = op->CommitNewSegmentFile(nsf_context, seg_file); + } + + /* build segment visitor */ + auto ctx = op->GetContext(); + ASSERT_TRUE(ctx.new_segment); + auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files); + ASSERT_TRUE(visitor); + ASSERT_EQ(visitor->GetSegment(), new_seg); + ASSERT_FALSE(visitor->GetSegment()->IsActive()); + // std::cout << visitor->ToString() << std::endl; + // std::cout << ss->ToString() << std::endl; + + /* write data */ + milvus::segment::SSSegmentWriter segment_writer(db_root, visitor); + +// status = segment_writer.AddChunk("test", raw_vectors, raw_uids); +// ASSERT_TRUE(status.ok()) +// +// status = segment_writer.Serialize(); // ASSERT_TRUE(status.ok()); - // status = segment_writer.Serialize(); - ASSERT_TRUE(status.ok()); + /* read data */ +// milvus::segment::SSSegmentReader segment_reader(db_root, visitor); +// +// status = segment_reader.Load(); +// ASSERT_TRUE(status.ok()); +// +// milvus::segment::SegmentPtr segment_ptr; +// status = segment_reader.GetSegment(segment_ptr); +// ASSERT_TRUE(status.ok()); +// +// auto& out_uids = segment_ptr->vectors_ptr_->GetUids(); +// ASSERT_EQ(raw_uids.size(), out_uids.size()); +// ASSERT_EQ(raw_uids[0], out_uids[0]); +// auto& out_vectors = segment_ptr->vectors_ptr_->GetData(); +// ASSERT_EQ(raw_vectors.size(), out_vectors.size()); +// ASSERT_EQ(raw_vectors[0], out_vectors[0]); } status = db_->DropCollection(c1); diff --git a/core/unittest/ssdb/utils.h b/core/unittest/ssdb/utils.h index 735b2167..61faf3e9 100644 --- a/core/unittest/ssdb/utils.h +++ b/core/unittest/ssdb/utils.h @@ -101,6 +101,31 @@ SFContextBuilder(SegmentFileContext& ctx, ScopedSnapshotT sss) { ctx.partition_id = sss->GetResources().begin()->second->GetPartitionId(); } +inline void +SFContextsBuilder(std::vector& contexts, ScopedSnapshotT sss) { + auto fields = sss->GetResources(); + for (auto& field_kv : fields) { + for (auto& kv : sss->GetResources()) { + if (kv.second->GetFieldId() != field_kv.first) { + continue; + } + SegmentFileContext ctx; + ctx.field_name = field_kv.second->GetName(); + ctx.field_element_name = kv.second->GetName(); + contexts.push_back(ctx); + } + } + auto& segments = sss->GetResources(); + if (segments.size() == 0) { + return; + } + + for (auto& ctx : contexts) { + ctx.segment_id = sss->GetResources().begin()->second->GetID(); + ctx.partition_id = sss->GetResources().begin()->second->GetPartitionId(); + } +} + struct PartitionCollector : public IteratePartitionHandler { using ResourceT = Partition; using BaseT = IteratePartitionHandler; -- GitLab