diff --git a/core/src/cache/CpuCacheMgr.cpp b/core/src/cache/CpuCacheMgr.cpp index 6601b48448bdc35b2a5a511bc56d470100ff24d0..b71e59a95050e28b602221f46d330b986b58b161 100644 --- a/core/src/cache/CpuCacheMgr.cpp +++ b/core/src/cache/CpuCacheMgr.cpp @@ -38,17 +38,6 @@ CpuCacheMgr::GetInstance() { return s_mgr; } -DataObjPtr -CpuCacheMgr::GetDataObj(const std::string& key) { - DataObjPtr obj = GetItem(key); - return obj; -} - -void -CpuCacheMgr::SetDataObj(const std::string& key, const milvus::cache::DataObjPtr& data) { - CacheMgr::InsertItem(key, data); -} - void CpuCacheMgr::ConfigUpdate(const std::string& name) { SetCapacity(config.cache.cache_size()); diff --git a/core/src/cache/CpuCacheMgr.h b/core/src/cache/CpuCacheMgr.h index c5dd691d2cdbacd98a88686abba53992ad7570ff..455479f41c25c107f8b9308e11dbd3651fefbd92 100644 --- a/core/src/cache/CpuCacheMgr.h +++ b/core/src/cache/CpuCacheMgr.h @@ -31,12 +31,6 @@ class CpuCacheMgr : public CacheMgr, public ConfigObserver { static CpuCacheMgr& GetInstance(); - DataObjPtr - GetDataObj(const std::string& key); - - void - SetDataObj(const std::string& key, const DataObjPtr& data); - public: void ConfigUpdate(const std::string& name) override; diff --git a/core/src/cache/GpuCacheMgr.cpp b/core/src/cache/GpuCacheMgr.cpp index 305530996108927071c00555b305634b8ea47cb6..f1486a575956692bd09088e3a5b36463a6f24f6c 100644 --- a/core/src/cache/GpuCacheMgr.cpp +++ b/core/src/cache/GpuCacheMgr.cpp @@ -51,17 +51,6 @@ GpuCacheMgr::GetInstance(int64_t gpu_id) { return instance_[gpu_id]; } -DataObjPtr -GpuCacheMgr::GetDataObj(const std::string& key) { - DataObjPtr obj = GetItem(key); - return obj; -} - -void -GpuCacheMgr::SetDataObj(const std::string& key, const milvus::cache::DataObjPtr& data) { - CacheMgr::InsertItem(key, data); -} - bool GpuCacheMgr::Reserve(const int64_t size) { return CacheMgr::Reserve(size); diff --git a/core/src/cache/GpuCacheMgr.h b/core/src/cache/GpuCacheMgr.h index b49b4a6bea6845b2812d447134e7784a24d1be44..f2dd4c899bccf1df6bc0868e0824a12edceb3aac 100644 --- a/core/src/cache/GpuCacheMgr.h +++ b/core/src/cache/GpuCacheMgr.h @@ -36,12 +36,6 @@ class GpuCacheMgr : public CacheMgr, public ConfigObserver { static GpuCacheMgrPtr GetInstance(int64_t gpu_id); - DataObjPtr - GetDataObj(const std::string& key); - - void - SetDataObj(const std::string& key, const DataObjPtr& data); - bool Reserve(const int64_t size); diff --git a/core/src/codecs/DeletedDocsFormat.cpp b/core/src/codecs/DeletedDocsFormat.cpp index b061f13816f3fc446b00d231b486615a829ed163..7dd772391139b2d581a410d78460a1f87bd25b0d 100644 --- a/core/src/codecs/DeletedDocsFormat.cpp +++ b/core/src/codecs/DeletedDocsFormat.cpp @@ -98,7 +98,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& } auto deleted_docs_list = deleted_docs->GetDeletedDocs(); - size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize(); + size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetCount(); if (!deleted_docs_list.empty()) { delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end()); } diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index b3d00fee095f2595a21fdea5b01e37cf256ae2e1..a5c16f58521dcd0ef675a9c97a7b23acfa891243 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -516,10 +516,6 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ } // generate id - DataChunkPtr new_chunk = std::make_shared(); - new_chunk->fixed_fields_ = data_chunk->fixed_fields_; - new_chunk->variable_fields_ = data_chunk->variable_fields_; - new_chunk->count_ = data_chunk->count_; if (auto_increment) { SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance(); IDNumbers ids; @@ -527,7 +523,7 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ BinaryDataPtr id_data = std::make_shared(); id_data->data_.resize(ids.size() * sizeof(int64_t)); memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t)); - new_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data; + data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data; } if (options_.wal_enable_) { @@ -549,8 +545,8 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ record.lsn = 0; record.collection_id = collection_name; record.partition_tag = partition_name; - record.data_chunk = new_chunk; - record.length = new_chunk->count_; + record.data_chunk = data_chunk; + record.length = data_chunk->count_; record.type = wal::MXLogType::Entity; STATUS_CHECK(ExecWalRecord(record)); @@ -614,7 +610,6 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id)); - auto ss_id = ss->GetID(); /* collect all valid segment */ std::vector segment_visitors; @@ -659,9 +654,11 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt scheduler::SearchJobPtr job = std::make_shared(nullptr, ss, options_, query_ptr, segment_ids); + cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info before query /* put search job to scheduler and wait job finish */ scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitFinish(); + cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info after query if (!job->status().ok()) { return job->status(); @@ -858,7 +855,7 @@ DBImpl::Compact(const std::shared_ptr& context, const std::stri continue; } - auto deleted_count = deleted_docs->GetSize(); + auto deleted_count = deleted_docs->GetCount(); if (deleted_count / (row_count + deleted_count) < threshold) { continue; // no need to compact } diff --git a/core/src/db/SnapshotUtils.cpp b/core/src/db/SnapshotUtils.cpp index f1e96a658a839388b6d90a9a3e57f1c72c843408..0ab1fbb0d5deda5648b838d7d4f494a51ae5fd19 100644 --- a/core/src/db/SnapshotUtils.cpp +++ b/core/src/db/SnapshotUtils.cpp @@ -36,6 +36,7 @@ const char* JSON_FIELD_ELEMENT = "field_element"; const char* JSON_PARTITION_TAG = "tag"; const char* JSON_FILES = "files"; const char* JSON_INDEX_NAME = "index_name"; +const char* JSON_INDEX_TYPE = "index_type"; const char* JSON_DATA_SIZE = "data_size"; const char* JSON_PATH = "path"; @@ -165,6 +166,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) { size_t total_row_count = 0; size_t total_data_size = 0; + // get partition information std::unordered_map partitions; auto partition_names = ss->GetPartitionNames(); for (auto& name : partition_names) { @@ -183,11 +185,13 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) { partitions.insert(std::make_pair(partition->GetID(), json_partition)); } + // just ensure segments listed in id order snapshot::IDS_TYPE segment_ids; auto handler = std::make_shared(ss, segment_ids); handler->Iterate(); std::sort(segment_ids.begin(), segment_ids.end()); + // get segment information and construct segment json nodes std::unordered_map> json_partition_segments; for (auto id : segment_ids) { auto segment_commit = ss->GetSegmentCommitBySegmentId(id); @@ -214,7 +218,15 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) { json_file[JSON_PATH] = engine::snapshot::GetResPath("", pair.second->GetFile()); json_file[JSON_FIELD] = field->GetName(); - json_file[JSON_FIELD_ELEMENT] = element->GetName(); + + // if the element is index, print index name/type + // else print element name + if (element->GetFtype() == engine::FieldElementType::FET_INDEX) { + json_file[JSON_INDEX_NAME] = element->GetName(); + json_file[JSON_INDEX_TYPE] = element->GetTypeName(); + } else { + json_file[JSON_FIELD_ELEMENT] = element->GetName(); + } } json_files.push_back(json_file); } @@ -228,6 +240,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) { json_partition_segments[segment_commit->GetPartitionId()].push_back(json_segment); } + // construct partition json nodes milvus::json json_partitions; for (auto pair : partitions) { milvus::json json_segments; diff --git a/core/src/db/Utils.cpp b/core/src/db/Utils.cpp index c48a92b5d9fc50836fc67ac0ccf9e6c812b1ab1f..407497027e91c511ee251d120fe43be2af7e962a 100644 --- a/core/src/db/Utils.cpp +++ b/core/src/db/Utils.cpp @@ -128,6 +128,24 @@ SendExitSignal() { kill(pid, SIGUSR2); } +void +GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) { + ids.clear(); + if (chunk == nullptr) { + return; + } + + auto pair = chunk->fixed_fields_.find(engine::DEFAULT_UID_NAME); + if (pair == chunk->fixed_fields_.end() || pair->second == nullptr) { + return; + } + + if (!pair->second->data_.empty()) { + ids.resize(pair->second->data_.size() / sizeof(engine::IDNumber)); + memcpy((void*)(ids.data()), pair->second->data_.data(), pair->second->data_.size()); + } +} + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/Utils.h b/core/src/db/Utils.h index 176ad94f14be4324b30532ab13a6c613f4464543..03b6d075b6f6ac5a6ce75cce50a26b3b0dfdf8ed 100644 --- a/core/src/db/Utils.h +++ b/core/src/db/Utils.h @@ -20,11 +20,6 @@ namespace milvus { namespace engine { -namespace snapshot { -class Segment; -class Partition; -class Collection; -} // namespace snapshot namespace utils { int64_t @@ -58,6 +53,9 @@ ParseMetaUri(const std::string& uri, MetaUriInfo& info); void SendExitSignal(); +void +GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids); + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index 7cc966bc4196cd5ea15ce1a1d2d811eec663ad14..24d0b7aec03514171b29c8d32fa7313a8afd8199 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -238,7 +238,7 @@ MemCollection::ApplyDeletes() { } } - segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetSize() - pre_del_ids.size(), true); + segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true); segment_writer->WriteDeletedDocs(del_docs_path, delete_docs); segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter); diff --git a/core/src/segment/DeletedDocs.cpp b/core/src/segment/DeletedDocs.cpp index 26ca61c0c320b963e6167fbb7c445b488f5863a6..67a01b36575f6000728824546afa9fff82804ab0 100644 --- a/core/src/segment/DeletedDocs.cpp +++ b/core/src/segment/DeletedDocs.cpp @@ -39,9 +39,14 @@ DeletedDocs::GetDeletedDocs() const { //} size_t -DeletedDocs::GetSize() const { +DeletedDocs::GetCount() const { return deleted_doc_offsets_.size(); } +int64_t +DeletedDocs::Size() { + return deleted_doc_offsets_.size() * sizeof(offset_t); +} + } // namespace segment } // namespace milvus diff --git a/core/src/segment/DeletedDocs.h b/core/src/segment/DeletedDocs.h index 288d4e958760c0b56a77eada5edb720100005f63..eb066a030d3e5bae5358547b78aa8aecbf0c338e 100644 --- a/core/src/segment/DeletedDocs.h +++ b/core/src/segment/DeletedDocs.h @@ -20,12 +20,14 @@ #include #include +#include "cache/DataObj.h" + namespace milvus { namespace segment { using offset_t = int32_t; -class DeletedDocs { +class DeletedDocs : public cache::DataObj { public: explicit DeletedDocs(const std::vector& deleted_doc_offsets); @@ -42,7 +44,10 @@ class DeletedDocs { // GetName() const; size_t - GetSize() const; + GetCount() const; + + int64_t + Size() override; // void // GetBitset(faiss::ConcurrentBitsetPtr& bitset); diff --git a/core/src/segment/IdBloomFilter.cpp b/core/src/segment/IdBloomFilter.cpp index 890faa00549b2fb6383c6b2938b09e861f6d8c5c..ed0f77b2556c446764d2d43b82ec4c0def79dcaa 100644 --- a/core/src/segment/IdBloomFilter.cpp +++ b/core/src/segment/IdBloomFilter.cpp @@ -76,9 +76,9 @@ IdBloomFilter::Remove(doc_id_t uid) { // return name_; //} -size_t +int64_t IdBloomFilter::Size() { - return bloom_filter_->num_bytes; + return bloom_filter_ ? bloom_filter_->num_bytes : 0; } } // namespace segment diff --git a/core/src/segment/IdBloomFilter.h b/core/src/segment/IdBloomFilter.h index 6c3c891a4f8485c958fcc8cfdef8a738bcd35203..35ad3b657343d663134daadcc37c7736e3d78b4a 100644 --- a/core/src/segment/IdBloomFilter.h +++ b/core/src/segment/IdBloomFilter.h @@ -20,6 +20,7 @@ #include #include +#include "cache/DataObj.h" #include "dablooms/dablooms.h" #include "utils/Status.h" @@ -28,7 +29,7 @@ namespace segment { using doc_id_t = int64_t; -class IdBloomFilter { +class IdBloomFilter : public cache::DataObj { public: explicit IdBloomFilter(scaling_bloom_t* bloom_filter); @@ -46,8 +47,8 @@ class IdBloomFilter { Status Remove(doc_id_t uid); - size_t - Size(); + int64_t + Size() override; // const std::string& // GetName() const; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 2ec47f22fecf6ec42bcff53554f4aa596ce95133..a5409044c160753b231b3f0fe128ada40a0d0cb1 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -21,6 +21,7 @@ #include #include +#include "cache/CpuCacheMgr.h" #include "codecs/Codec.h" #include "db/SnapshotUtils.h" #include "db/Types.h" @@ -115,8 +116,16 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r std::string file_path = engine::snapshot::GetResPath(dir_collections_, raw_visitor->GetFile()); - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw); + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); + if (data_obj == nullptr) { + auto& ss_codec = codec::Codec::instance(); + ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw); + + cache::CpuCacheMgr::GetInstance().InsertItem(file_path, raw); // put into cache + } else { + raw = std::static_pointer_cast(data_obj); + } field_map.insert(std::make_pair(field_name, raw)); } catch (std::exception& e) { @@ -265,62 +274,80 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex knowhere::BinarySet index_data; knowhere::BinaryPtr raw_data, compress_data; - auto read_raw = [&]() -> void { - engine::BinaryDataPtr fixed_data; - auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data); - if (status.ok()) { - ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data); - } else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) { - auto file_path = - engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); - ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data); - } - }; - - // if index not specified, or index file not created, return IDMAP + // if index not specified, or index file not created, return a temp index(IDMAP type) auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX); if (index_visitor == nullptr || index_visitor->GetFile() == nullptr) { - auto& json = field->GetParams(); - if (json.find(knowhere::meta::DIM) == json.end()) { - return Status(DB_ERROR, "Vector field dimension undefined"); + auto temp_index_path = engine::snapshot::GetResPath(dir_collections_, segment); + temp_index_path += "/"; + std::string temp_index_name = field_name + ".idmap"; + temp_index_path += temp_index_name; + + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path); + if (data_obj != nullptr) { + index_ptr = std::static_pointer_cast(data_obj); + segment_ptr_->SetVectorIndex(field_name, index_ptr); + } else { + auto& json = field->GetParams(); + if (json.find(knowhere::meta::DIM) == json.end()) { + return Status(DB_ERROR, "Vector field dimension undefined"); + } + int64_t dimension = json[knowhere::meta::DIM]; + engine::BinaryDataPtr raw; + STATUS_CHECK(LoadField(field_name, raw)); + + auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw->data_.data()); + + // construct IDMAP index + knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance(); + index_ptr = vec_index_factory.CreateVecIndex(knowhere::IndexEnum::INDEX_FAISS_IDMAP, + knowhere::IndexMode::MODE_CPU); + milvus::json conf{{knowhere::meta::DIM, dimension}}; + conf[engine::PARAM_INDEX_METRIC_TYPE] = knowhere::Metric::L2; + index_ptr->Train(knowhere::DatasetPtr(), conf); + index_ptr->AddWithoutIds(dataset, conf); + index_ptr->SetUids(uids); + index_ptr->SetBlacklist(concurrent_bitset_ptr); + segment_ptr_->SetVectorIndex(field_name, index_ptr); } - int64_t dimension = json[knowhere::meta::DIM]; - engine::BinaryDataPtr raw; - STATUS_CHECK(LoadField(field_name, raw)); - - auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw->data_.data()); - - knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance(); - index_ptr = - vec_index_factory.CreateVecIndex(knowhere::IndexEnum::INDEX_FAISS_IDMAP, knowhere::IndexMode::MODE_CPU); - milvus::json conf{{knowhere::meta::DIM, dimension}}; - conf[engine::PARAM_INDEX_METRIC_TYPE] = knowhere::Metric::L2; - index_ptr->Train(knowhere::DatasetPtr(), conf); - index_ptr->AddWithoutIds(dataset, conf); - index_ptr->SetUids(uids); - index_ptr->SetBlacklist(concurrent_bitset_ptr); - segment_ptr_->SetVectorIndex(field_name, index_ptr); + return Status::OK(); } // read index file - std::string file_path = + std::string index_file_path = engine::snapshot::GetResPath(dir_collections_, index_visitor->GetFile()); - ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, file_path, index_data); + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(index_file_path); + if (data_obj != nullptr) { + index_ptr = std::static_pointer_cast(data_obj); + segment_ptr_->SetVectorIndex(field_name, index_ptr); - auto index_type = index_visitor->GetElement()->GetTypeName(); + return Status::OK(); + } + + ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data); // for some kinds index(IVF), read raw file + auto index_type = index_visitor->GetElement()->GetTypeName(); if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG || index_type == knowhere::IndexEnum::INDEX_HNSW) { - read_raw(); + engine::BinaryDataPtr fixed_data; + auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data); + if (status.ok()) { + ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data); + } else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) { + auto file_path = + engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); + ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data); + } } // for some kinds index(SQ8), read compress file if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8NR || index_type == knowhere::IndexEnum::INDEX_HNSW_SQ8NM) { if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) { - file_path = + auto file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data); } @@ -331,6 +358,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex index_ptr->SetUids(uids); index_ptr->SetBlacklist(concurrent_bitset_ptr); segment_ptr_->SetVectorIndex(field_name, index_ptr); + + cache::CpuCacheMgr::GetInstance().InsertItem(index_file_path, index_ptr); // put into cache } catch (std::exception& e) { std::string err_msg = "Failed to load vector index: " + std::string(e.what()); LOG_ENGINE_ERROR_ << err_msg; @@ -361,7 +390,15 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde if (index_visitor && index_visitor->GetFile() != nullptr) { std::string file_path = engine::snapshot::GetResPath(dir_collections_, index_visitor->GetFile()); - ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr); + + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); + if (data_obj == nullptr) { + ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr); + cache::CpuCacheMgr::GetInstance().InsertItem(file_path, index_ptr); // put into cache + } else { + index_ptr = std::static_pointer_cast(data_obj); + } segment_ptr_->SetStructuredIndex(field_name, index_ptr); } @@ -412,12 +449,22 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER); std::string file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); + if (!boost::filesystem::exists(file_path + codec::IdBloomFilterFormat::FilePostfix())) { + return Status::OK(); // file doesn't exist + } - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr); + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); + if (data_obj == nullptr) { + auto& ss_codec = codec::Codec::instance(); + ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr); + } else { + id_bloom_filter_ptr = std::static_pointer_cast(data_obj); + } if (id_bloom_filter_ptr) { segment_ptr_->SetBloomFilter(id_bloom_filter_ptr); + cache::CpuCacheMgr::GetInstance().InsertItem(file_path, id_bloom_filter_ptr); // put into cache } } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); @@ -439,15 +486,22 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); - if (!boost::filesystem::exists(file_path)) { + if (!boost::filesystem::exists(file_path + codec::DeletedDocsFormat::FilePostfix())) { return Status::OK(); // file doesn't exist } - auto& ss_codec = codec::Codec::instance(); - ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr); + // if the data is in cache, no need to read file + auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path); + if (data_obj == nullptr) { + auto& ss_codec = codec::Codec::instance(); + ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr); + } else { + deleted_docs_ptr = std::static_pointer_cast(data_obj); + } if (deleted_docs_ptr) { segment_ptr_->SetDeletedDocs(deleted_docs_ptr); + cache::CpuCacheMgr::GetInstance().InsertItem(file_path, deleted_docs_ptr); // put into cache } } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); @@ -463,7 +517,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) { size = 0; auto deleted_docs_ptr = segment_ptr_->GetDeletedDocs(); if (deleted_docs_ptr != nullptr) { - size = deleted_docs_ptr->GetSize(); + size = deleted_docs_ptr->GetCount(); return Status::OK(); // already exist } @@ -471,7 +525,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) { auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS); std::string file_path = engine::snapshot::GetResPath(dir_collections_, visitor->GetFile()); - if (!boost::filesystem::exists(file_path)) { + if (!boost::filesystem::exists(file_path + codec::DeletedDocsFormat::FilePostfix())) { return Status::OK(); // file doesn't exist } diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 4a8a8764ab820a076b1f0a197ae72b1b688e5cb6..a9d582e5dd17452c0ed4d79678a250e7721af14c 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -223,9 +223,9 @@ BuildEntities2(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& d } milvus::engine::BinaryDataPtr raw = std::make_shared(); + data_chunk->fixed_fields_["float_vector"] = raw; raw->data_.resize(vectors.float_data_.size() * sizeof(float)); memcpy(raw->data_.data(), vectors.float_data_.data(), vectors.float_data_.size() * sizeof(float)); - data_chunk->fixed_fields_["float_vector"] = raw; std::vector value_1; value_1.resize(n); @@ -236,9 +236,9 @@ BuildEntities2(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& d { milvus::engine::BinaryDataPtr raw = std::make_shared(); + data_chunk->fixed_fields_["int64"] = raw; raw->data_.resize(value_1.size() * sizeof(int64_t)); memcpy(raw->data_.data(), value_1.data(), value_1.size() * sizeof(int64_t)); - data_chunk->fixed_fields_["int64"] = raw; } } } // namespace @@ -508,6 +508,8 @@ TEST_F(DBTest, MergeTest) { status = db_->Insert(collection_name, "", data_chunk); ASSERT_TRUE(status.ok()); + data_chunk->fixed_fields_.erase(milvus::engine::DEFAULT_UID_NAME); // clear auto-generated id + status = db_->Flush(); ASSERT_TRUE(status.ok()); } @@ -568,6 +570,42 @@ TEST_F(DBTest, MergeTest) { ASSERT_EQ(expect_file_paths.size(), segment_file_paths.size()); } +TEST_F(DBTest, CompactTest) { + std::string collection_name = "COMPACT_TEST"; + auto status = CreateCollection2(db_, collection_name, 0); + ASSERT_TRUE(status.ok()); + + const uint64_t entity_count = 10000; + milvus::engine::DataChunkPtr data_chunk; + BuildEntities(entity_count, 0, data_chunk); + + status = db_->Insert(collection_name, "", data_chunk); + ASSERT_TRUE(status.ok()); + + status = db_->Flush(); + ASSERT_TRUE(status.ok()); + + milvus::engine::IDNumbers entity_ids; + milvus::engine::utils::GetIDFromChunk(data_chunk, entity_ids); + ASSERT_EQ(entity_ids.size(), entity_count); + +// int64_t delete_count = 10; +// entity_ids.resize(delete_count); +// status = db_->DeleteEntityByID(collection_name, entity_ids); +// ASSERT_TRUE(status.ok()); +// +// status = db_->Flush(); +// ASSERT_TRUE(status.ok()); +// +// status = db_->Compact(dummy_context_, collection_name); +// ASSERT_TRUE(status.ok()); +// +// int64_t row_count = 0; +// status = db_->CountEntities(collection_name, row_count); +// ASSERT_TRUE(status.ok()); +// ASSERT_EQ(row_count, entity_count - delete_count); +} + TEST_F(DBTest, IndexTest) { std::string collection_name = "INDEX_TEST"; auto status = CreateCollection2(db_, collection_name, 0); @@ -656,6 +694,8 @@ TEST_F(DBTest, StatsTest) { status = db_->Insert(collection_name, "", data_chunk); ASSERT_TRUE(status.ok()); + data_chunk->fixed_fields_.erase(milvus::engine::DEFAULT_UID_NAME); // clear auto-generated id + status = db_->Insert(collection_name, partition_name, data_chunk); ASSERT_TRUE(status.ok());