未验证 提交 d473c7ad 编写于 作者: C Cai Yudong 提交者: GitHub

snapshot integrate (#2854)

* add test_segment
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update interface GetResPath
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry ci
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update SSSegmentWriter
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 4cb83837
......@@ -21,7 +21,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2692 Milvus hangs during multi-thread concurrent search
- \#2739 Fix mishards start failed
- \#2752 Milvus formats vectors data to double-precision and return to http client
- \#2767 fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2767 Fix a bug of getting wrong nprobe limitation in knowhere on GPU version
- \#2776 Fix too many data copies during creating IVF index
- \#2813 To implemente RNSG IP
......
......@@ -445,7 +445,8 @@ SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, id_array, field_names);
std::string dir_root = options_.meta_.path_;
auto handler = std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names);
handler->Iterate();
STATUS_CHECK(handler->GetStatus());
......
......@@ -108,9 +108,17 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm
///////////////////////////////////////////////////////////////////////////////
GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<milvus::server::Context>& context,
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
engine::snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names)
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
: BaseT(ss),
context_(context),
dir_root_(dir_root),
ids_(ids),
field_names_(field_names),
vector_data_(),
attr_type_(),
attr_data_() {
for (auto& field_name : field_names_) {
auto field_ptr = ss_->GetField(field_name);
auto field_type = field_ptr->GetFtype();
......@@ -126,25 +134,26 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
if (segment_visitor == nullptr) {
return Status(DB_ERROR, "Fail to build segment visitor with id " + std::to_string(segment->GetID()));
}
segment::SSSegmentReader segment_reader(segment_visitor);
segment::SSSegmentReader segment_reader(dir_root_, segment_visitor);
/* load UID's bloom filter file */
auto uid_field_visitor = segment_visitor->GetFieldVisitor(DEFAULT_UID_NAME);
auto uid_bf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_bf_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_bf_visitor->GetFile());
/* load UID's bloom filter file */
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_blf_visitor->GetFile());
segment::IdBloomFilterPtr id_bloom_filter_ptr;
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_bf_path, id_bloom_filter_ptr));
STATUS_CHECK(segment_reader.LoadBloomFilter(uid_blf_path, id_bloom_filter_ptr));
/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_RAW);
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::string uid_raw_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
STATUS_CHECK(segment_reader.LoadUids(uid_raw_path, uids));
/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(uid_del_visitor->GetFile());
std::string uid_del_path = snapshot::GetResPath<snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader.LoadDeletedDocs(uid_del_path, deleted_docs_ptr));
......
......@@ -65,13 +65,15 @@ struct SegmentsToSearchCollector : public snapshot::IterateHandler<snapshot::Seg
struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
GetEntityByIdSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const std::string& dir_root, const IDNumbers& ids,
const std::vector<std::string>& field_names);
Status
Handle(const typename ResourceT::Ptr&) override;
const server::ContextPtr context_;
const std::string dir_root_;
const engine::IDNumbers ids_;
const std::vector<std::string> field_names_;
std::vector<engine::VectorsData> vector_data_;
......
......@@ -17,6 +17,7 @@
#include "SSVectorSource.h"
#include "db/Constants.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "utils/Log.h"
namespace milvus {
......@@ -128,17 +129,16 @@ SSMemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, con
return Status(DB_ERROR, err_msg + name);
}
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
int64_t row_size =
(ftype == meta::hybrid::DataType::VECTOR_BINARY) ? dimension / 8 : dimension * sizeof(float);
if (data_size != chunk->count_ * row_size) {
......
......@@ -23,6 +23,7 @@
#include "db/meta/MetaTypes.h"
#include "db/snapshot/Operations.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "utils/Log.h"
......@@ -109,7 +110,7 @@ SSMemSegment::CreateSegment() {
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);
// create segment writer
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(visitor);
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(options_.meta_.path_, visitor);
return Status::OK();
}
......@@ -152,17 +153,16 @@ SSMemSegment::GetSingleEntitySize(size_t& single_size) {
case meta::hybrid::DataType::INT64:
single_size += sizeof(uint64_t);
break;
case meta::hybrid::DataType::VECTOR:
case meta::hybrid::DataType::VECTOR_FLOAT:
case meta::hybrid::DataType::VECTOR_BINARY: {
json params = field->GetParams();
if (params.find(VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
int64_t dimension = params[VECTOR_DIMENSION_PARAM];
int64_t dimension = params[knowhere::meta::DIM];
if (ftype == meta::hybrid::DataType::VECTOR_BINARY) {
single_size += (dimension / 8);
} else {
......
......@@ -295,6 +295,7 @@ NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, Seg
auto ctx = context;
ctx.segment_id = context_.new_segment->GetID();
ctx.partition_id = context_.new_segment->GetPartitionId();
ctx.collection_id = GetStartedSS()->GetCollectionId();
auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
STATUS_CHECK(new_sf_op->Push());
STATUS_CHECK(new_sf_op->GetResource(created));
......@@ -613,8 +614,8 @@ CreateCollectionOperation::DoExecute(Store& store) {
auto& field_schema = field_kv.first;
auto& field_elements = field_kv.second;
FieldPtr field;
status =
store.CreateResource<Field>(Field(field_schema->GetName(), field_idx, field_schema->GetFtype()), field);
status = store.CreateResource<Field>(
Field(field_schema->GetName(), field_idx, field_schema->GetFtype(), field_schema->GetParams()), field);
auto f_ctx_p = ResourceContextBuilder<Field>().SetOp(meta::oUpdate).CreatePtr();
AddStepWithLsn(*field, c_context_.lsn, f_ctx_p);
MappingT element_ids = {};
......
......@@ -49,7 +49,7 @@ class ResourceGCEvent : public Event {
STATUS_CHECK((*sd_op)(store));
/* TODO: physically clean resource */
std::string res_path = GetResPath<ResourceT>(res_);
std::string res_path = GetResPath<ResourceT>(dir_root_, res_);
/* if (!boost::filesystem::exists(res_path)) { */
/* return Status::OK(); */
/* } */
......@@ -68,6 +68,7 @@ class ResourceGCEvent : public Event {
private:
class ResourceT::Ptr res_;
std::string dir_root_;
};
} // namespace snapshot
......
......@@ -13,57 +13,65 @@
#include <memory>
#include <string>
#include <vector>
#include "db/snapshot/Resources.h"
#include "utils/Status.h"
namespace milvus::engine::snapshot {
static const char* COLLECTION_PREFIX = "C_";
static const char* PARTITION_PREFIX = "P_";
static const char* SEGMENT_PREFIX = "S_";
static const char* SEGMENT_FILE_PREFIX = "F_";
template <class ResourceT>
inline std::string
GetResPath(const typename ResourceT::Ptr& res_ptr) {
GetResPath(const std::string& root, const typename ResourceT::Ptr& res_ptr) {
return std::string();
}
template <>
inline std::string
GetResPath<Collection>(const Collection::Ptr& res_ptr) {
GetResPath<Collection>(const std::string& root, const Collection::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetID();
return ss.str();
}
template <>
inline std::string
GetResPath<Partition>(const Partition::Ptr& res_ptr) {
GetResPath<Partition>(const std::string& root, const Partition::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetID();
return ss.str();
}
template <>
inline std::string
GetResPath<Segment>(const Segment::Ptr& res_ptr) {
GetResPath<Segment>(const std::string& root, const Segment::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetID();
return ss.str();
}
template <>
inline std::string
GetResPath<SegmentFile>(const SegmentFile::Ptr& res_ptr) {
GetResPath<SegmentFile>(const std::string& root, const SegmentFile::Ptr& res_ptr) {
std::stringstream ss;
ss << res_ptr->GetCollectionId() << "/";
ss << res_ptr->GetPartitionId() << "/";
ss << res_ptr->GetSegmentId() << "/";
ss << res_ptr->GetID();
ss << root << "/";
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
ss << SEGMENT_PREFIX << res_ptr->GetSegmentId() << "/";
ss << SEGMENT_FILE_PREFIX << res_ptr->GetID();
return ss.str();
}
......
......@@ -234,7 +234,7 @@ Snapshot::ToString() const {
ss << to_matrix_string(fc_m, row_element_size, 2);
for (auto& fe_id : fc_m) {
auto fe = GetResource<FieldElement>(fe_id);
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName();
ss << "\n\tFieldElement: id=" << fe_id << ",name=" << fe->GetName() << " CID=" << fe->GetCollectionId();
}
}
......
......@@ -32,9 +32,11 @@
namespace milvus {
namespace segment {
SSSegmentReader::SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) {
SSSegmentReader::SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor)
: dir_root_(dir_root), segment_visitor_(segment_visitor) {
auto& segment_ptr = segment_visitor_->GetSegment();
std::string directory = engine::snapshot::GetResPath<engine::snapshot::Segment>(segment_ptr);
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor->GetSegment());
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
......@@ -55,36 +57,39 @@ SSSegmentReader::Load() {
try {
// auto& ss_codec = codec::SSCodec::instance();
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
/* load UID's raw data */
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string uid_raw_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_raw_visitor->GetFile());
std::vector<segment::doc_id_t> uids;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_raw_visitor->GetFile());
STATUS_CHECK(LoadUids(uid_raw_path, segment_ptr_->vectors_ptr_->GetMutableUids()));
/* load UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
segment::DeletedDocsPtr deleted_docs_ptr;
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
STATUS_CHECK(LoadDeletedDocs(uid_del_path, segment_ptr_->deleted_docs_ptr_));
/* load other data */
Status s;
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& f_kv : field_visitors_map) {
auto& field_visitor = f_kv.second;
auto& field = field_visitor->GetField();
for (auto& file_kv : field_visitor->GetElementVistors()) {
auto& field_element_visitor = file_kv.second;
auto& fv = f_kv.second;
auto& field = fv->GetField();
for (auto& file_kv : fv->GetElementVistors()) {
auto& fev = file_kv.second;
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, fev->GetFile());
if (!s.ok()) {
LOG_ENGINE_WARNING_ << "Cannot get resource path";
}
auto& segment_file = field_element_visitor->GetFile();
auto& segment_file = fev->GetFile();
if (segment_file == nullptr) {
continue;
}
auto file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(segment_file);
auto& field_element = field_element_visitor->GetElement();
auto& field_element = fev->GetElement();
if ((field->GetFtype() == engine::FieldType::VECTOR_FLOAT ||
field->GetFtype() == engine::FieldType::VECTOR_BINARY) &&
......
......@@ -32,7 +32,7 @@ namespace segment {
class SSSegmentReader {
public:
explicit SSSegmentReader(const engine::SegmentVisitorPtr& segment_visitor);
explicit SSSegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor);
// TODO(zhiru)
Status
......@@ -70,6 +70,7 @@ class SSSegmentReader {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
SegmentPtr segment_ptr_;
std::string dir_root_;
};
using SSSegmentReaderPtr = std::shared_ptr<SSSegmentReader>;
......
......@@ -26,6 +26,7 @@
#include "codecs/snapshot/SSCodec.h"
#include "db/Utils.h"
#include "db/snapshot/ResourceHelper.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
......@@ -35,19 +36,22 @@
namespace milvus {
namespace segment {
SSSegmentWriter::SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) {
SSSegmentWriter::SSSegmentWriter(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor)
: dir_root_(dir_root), segment_visitor_(segment_visitor) {
Initialize();
}
Status
SSSegmentWriter::Initialize() {
auto& segment_ptr = segment_visitor_->GetSegment();
std::string directory = engine::snapshot::GetResPath<engine::snapshot::Segment>(segment_ptr);
std::string directory =
engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_root_, segment_visitor_->GetSegment());
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
fs_ptr_->operation_ptr_->CreateDirectory();
segment_ptr_ = std::make_shared<engine::Segment>();
const engine::SegmentVisitor::IdMapT& field_map = segment_visitor_->GetFieldVisitors();
......@@ -55,17 +59,16 @@ SSSegmentWriter::Initialize() {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
std::string name = field->GetName();
engine::FIELD_TYPE ftype = static_cast<engine::FIELD_TYPE>(field->GetFtype());
if (ftype == engine::FIELD_TYPE::VECTOR || ftype == engine::FIELD_TYPE::VECTOR ||
ftype == engine::FIELD_TYPE::VECTOR) {
if (ftype == engine::FIELD_TYPE::VECTOR_FLOAT || ftype == engine::FIELD_TYPE::VECTOR_BINARY) {
json params = field->GetParams();
if (params.find(engine::VECTOR_DIMENSION_PARAM) == params.end()) {
if (params.find(knowhere::meta::DIM) == params.end()) {
std::string msg = "Vector field params must contain: dimension";
LOG_SERVER_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
uint64_t field_width = 0;
uint64_t dimension = params[engine::VECTOR_DIMENSION_PARAM];
int64_t field_width = 0;
int64_t dimension = params[knowhere::meta::DIM];
if (ftype == engine::FIELD_TYPE::VECTOR_BINARY) {
field_width += (dimension / 8);
} else {
......@@ -103,19 +106,22 @@ SSSegmentWriter::Serialize() {
segment_ptr_->GetFixedFieldData(name, raw_data);
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(element_visitor->GetFile());
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, element_visitor->GetFile());
STATUS_CHECK(WriteField(file_path, raw_data));
}
/* write UID's deleted docs */
/* write empty UID's deleted docs */
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string uid_del_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
STATUS_CHECK(WriteDeletedDocs(uid_del_path, segment_ptr_->GetDeletedDocs()));
std::string uid_del_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_del_visitor->GetFile());
STATUS_CHECK(WriteDeletedDocs(uid_del_path));
/* write UID's bloom filter */
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_blf_visitor->GetFile());
STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter()));
/* don't write UID's bloom filter */
// auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
// std::string uid_blf_path =
// engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_root_, uid_blf_visitor->GetFile());
// STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter()));
return Status::OK();
}
......@@ -124,10 +130,9 @@ Status
SSSegmentWriter::WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw) {
try {
auto& ss_codec = codec::SSCodec::instance();
fs_ptr_->operation_ptr_->CreateDirectory();
ss_codec.GetBlockFormat()->write(fs_ptr_, file_path, raw);
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
std::string err_msg = "Failed to write field: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
engine::utils::SendExitSignal();
......@@ -176,11 +181,8 @@ SSSegmentWriter::WriteBloomFilter(const std::string& file_path) {
Status
SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
TimeRecorder recorder("SSSegmentWriter::WriteBloomFilter");
auto& ss_codec = codec::SSCodec::instance();
fs_ptr_->operation_ptr_->CreateDirectory();
ss_codec.GetIdBloomFilterFormat()->write(fs_ptr_, file_path, id_bloom_filter_ptr);
recorder.RecordSection("finish writing bloom filter");
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -194,11 +196,7 @@ SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFil
Status
SSSegmentWriter::WriteDeletedDocs(const std::string& file_path) {
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
auto status = WriteDeletedDocs(file_path, deleted_docs_ptr);
if (!status.ok()) {
return status;
}
STATUS_CHECK(WriteDeletedDocs(file_path, deleted_docs_ptr));
segment_ptr_->SetDeletedDocs(deleted_docs_ptr);
return Status::OK();
}
......@@ -207,7 +205,6 @@ Status
SSSegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsPtr& deleted_docs) {
try {
auto& ss_codec = codec::SSCodec::instance();
fs_ptr_->operation_ptr_->CreateDirectory();
ss_codec.GetDeletedDocsFormat()->write(fs_ptr_, file_path, deleted_docs);
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
......
......@@ -34,7 +34,7 @@ namespace segment {
class SSSegmentWriter {
public:
explicit SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor);
explicit SSSegmentWriter(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor);
Status
AddChunk(const engine::DataChunkPtr& chunk_ptr);
......@@ -86,6 +86,7 @@ class SSSegmentWriter {
engine::SegmentVisitorPtr segment_visitor_;
storage::FSHandlerPtr fs_ptr_;
engine::SegmentPtr segment_ptr_;
std::string dir_root_;
};
using SSSegmentWriterPtr = std::shared_ptr<SSSegmentWriter>;
......
......@@ -23,8 +23,6 @@
namespace milvus {
namespace engine {
const char* VECTOR_DIMENSION_PARAM = "dimension"; // hard code
Status
Segment::AddField(const std::string& field_name, FIELD_TYPE field_type, uint64_t field_width) {
if (field_types_.find(field_name) != field_types_.end()) {
......
......@@ -31,8 +31,6 @@
namespace milvus {
namespace engine {
extern const char* VECTOR_DIMENSION_PARAM;
using FIELD_TYPE = engine::meta::hybrid::DataType;
using FIELD_TYPE_MAP = std::unordered_map<std::string, engine::meta::hybrid::DataType>;
using FIELD_WIDTH_MAP = std::unordered_map<std::string, uint64_t>;
......
......@@ -34,7 +34,8 @@ DiskOperation::CreateDirectory() {
bool is_dir = boost::filesystem::is_directory(dir_path_);
fiu_do_on("DiskOperation.CreateDirectory.is_directory", is_dir = false);
if (!is_dir) {
auto ret = boost::filesystem::create_directory(dir_path_);
/* create directories recursively */
auto ret = boost::filesystem::create_directories(dir_path_);
fiu_do_on("DiskOperation.CreateDirectory.create_directory", ret = false);
if (!ret) {
std::string err_msg = "Failed to create directory: " + dir_path_;
......
......@@ -20,9 +20,11 @@
#include "db/Types.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/Resources.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SSSegmentReader.h"
#include "segment/SSSegmentWriter.h"
#include "segment/Types.h"
#include "utils/Json.h"
using SegmentVisitor = milvus::engine::SegmentVisitor;
......@@ -46,8 +48,9 @@ CreateCollection(std::shared_ptr<SSDBImpl> db, const std::string& collection_nam
field_id++;
/* field vector */
auto vector_field = std::make_shared<Field>("vector", 0, milvus::engine::FieldType::VECTOR_FLOAT,
milvus::engine::snapshot::JEmpty, field_id);
milvus::json vector_param = {{milvus::knowhere::meta::DIM, 4}};
auto vector_field = std::make_shared<Field>("vector", 0, milvus::engine::FieldType::VECTOR_FLOAT, vector_param,
field_id);
auto vector_field_element_index = std::make_shared<FieldElement>(collection_id, field_id,
milvus::engine::DEFAULT_INDEX_NAME, milvus::engine::FieldElementType::FET_INDEX);
......@@ -63,6 +66,8 @@ TEST_F(SSSegmentTest, SegmentTest) {
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string db_root = "/tmp/milvus_test/db/table";
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
......@@ -76,27 +81,77 @@ TEST_F(SSSegmentTest, SegmentTest) {
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
std::vector<SegmentFileContext> contexts;
SFContextsBuilder(contexts, ss);
// std::cout << ss->ToString() << std::endl;
auto& partitions = ss->GetResources<Partition>();
ID_TYPE partition_id;
for (auto& kv : partitions) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context).ok());
/* select the first partition */
partition_id = kv.first;
break;
}
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
std::vector<milvus::segment::doc_id_t> raw_uids = {123};
std::vector<uint8_t> raw_vectors = {1, 2, 3, 4};
auto& segments = ss->GetResources<Segment>();
for (auto& kv : segments) {
auto segment = kv.second;
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
milvus::segment::SSSegmentWriter segment_writer(visitor);
// status = segment_writer.AddVectors("test", raw_vectors, raw_uids);
{
/* commit new segment */
OperationContext context;
context.lsn = next_lsn();
context.prev_partition = ss->GetResource<Partition>(partition_id);
auto op = std::make_shared<NewSegmentOperation>(context, ss);
SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
ASSERT_TRUE(status.ok());
/* commit new segment file */
for (auto& cctx : contexts) {
SegmentFilePtr seg_file;
auto nsf_context = cctx;
nsf_context.segment_id = new_seg->GetID();
nsf_context.partition_id = new_seg->GetPartitionId();
status = op->CommitNewSegmentFile(nsf_context, seg_file);
}
/* build segment visitor */
auto ctx = op->GetContext();
ASSERT_TRUE(ctx.new_segment);
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);
ASSERT_TRUE(visitor);
ASSERT_EQ(visitor->GetSegment(), new_seg);
ASSERT_FALSE(visitor->GetSegment()->IsActive());
// std::cout << visitor->ToString() << std::endl;
// std::cout << ss->ToString() << std::endl;
/* write data */
milvus::segment::SSSegmentWriter segment_writer(db_root, visitor);
// status = segment_writer.AddChunk("test", raw_vectors, raw_uids);
// ASSERT_TRUE(status.ok())
//
// status = segment_writer.Serialize();
// ASSERT_TRUE(status.ok());
// status = segment_writer.Serialize();
ASSERT_TRUE(status.ok());
/* read data */
// milvus::segment::SSSegmentReader segment_reader(db_root, visitor);
//
// status = segment_reader.Load();
// ASSERT_TRUE(status.ok());
//
// milvus::segment::SegmentPtr segment_ptr;
// status = segment_reader.GetSegment(segment_ptr);
// ASSERT_TRUE(status.ok());
//
// auto& out_uids = segment_ptr->vectors_ptr_->GetUids();
// ASSERT_EQ(raw_uids.size(), out_uids.size());
// ASSERT_EQ(raw_uids[0], out_uids[0]);
// auto& out_vectors = segment_ptr->vectors_ptr_->GetData();
// ASSERT_EQ(raw_vectors.size(), out_vectors.size());
// ASSERT_EQ(raw_vectors[0], out_vectors[0]);
}
status = db_->DropCollection(c1);
......
......@@ -101,6 +101,31 @@ SFContextBuilder(SegmentFileContext& ctx, ScopedSnapshotT sss) {
ctx.partition_id = sss->GetResources<Segment>().begin()->second->GetPartitionId();
}
inline void
SFContextsBuilder(std::vector<SegmentFileContext>& contexts, ScopedSnapshotT sss) {
auto fields = sss->GetResources<Field>();
for (auto& field_kv : fields) {
for (auto& kv : sss->GetResources<FieldElement>()) {
if (kv.second->GetFieldId() != field_kv.first) {
continue;
}
SegmentFileContext ctx;
ctx.field_name = field_kv.second->GetName();
ctx.field_element_name = kv.second->GetName();
contexts.push_back(ctx);
}
}
auto& segments = sss->GetResources<Segment>();
if (segments.size() == 0) {
return;
}
for (auto& ctx : contexts) {
ctx.segment_id = sss->GetResources<Segment>().begin()->second->GetID();
ctx.partition_id = sss->GetResources<Segment>().begin()->second->GetPartitionId();
}
}
struct PartitionCollector : public IteratePartitionHandler {
using ResourceT = Partition;
using BaseT = IteratePartitionHandler;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册