未验证 提交 362bf7a9 编写于 作者: G groot 提交者: GitHub

cache (#3124)

* cache
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
Co-authored-by: NWang Xiangyu <xy.wang@zilliz.com>
上级 f18c95ee
......@@ -38,17 +38,6 @@ CpuCacheMgr::GetInstance() {
return s_mgr;
}
DataObjPtr
CpuCacheMgr::GetDataObj(const std::string& key) {
DataObjPtr obj = GetItem(key);
return obj;
}
void
CpuCacheMgr::SetDataObj(const std::string& key, const milvus::cache::DataObjPtr& data) {
CacheMgr<DataObjPtr>::InsertItem(key, data);
}
void
CpuCacheMgr::ConfigUpdate(const std::string& name) {
SetCapacity(config.cache.cache_size());
......
......@@ -31,12 +31,6 @@ class CpuCacheMgr : public CacheMgr<DataObjPtr>, public ConfigObserver {
static CpuCacheMgr&
GetInstance();
DataObjPtr
GetDataObj(const std::string& key);
void
SetDataObj(const std::string& key, const DataObjPtr& data);
public:
void
ConfigUpdate(const std::string& name) override;
......
......@@ -51,17 +51,6 @@ GpuCacheMgr::GetInstance(int64_t gpu_id) {
return instance_[gpu_id];
}
DataObjPtr
GpuCacheMgr::GetDataObj(const std::string& key) {
DataObjPtr obj = GetItem(key);
return obj;
}
void
GpuCacheMgr::SetDataObj(const std::string& key, const milvus::cache::DataObjPtr& data) {
CacheMgr<DataObjPtr>::InsertItem(key, data);
}
bool
GpuCacheMgr::Reserve(const int64_t size) {
return CacheMgr<DataObjPtr>::Reserve(size);
......
......@@ -36,12 +36,6 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr>, public ConfigObserver {
static GpuCacheMgrPtr
GetInstance(int64_t gpu_id);
DataObjPtr
GetDataObj(const std::string& key);
void
SetDataObj(const std::string& key, const DataObjPtr& data);
bool
Reserve(const int64_t size);
......
......@@ -98,7 +98,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
}
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize();
size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetCount();
if (!deleted_docs_list.empty()) {
delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end());
}
......
......@@ -516,10 +516,6 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
}
// generate id
DataChunkPtr new_chunk = std::make_shared<DataChunk>();
new_chunk->fixed_fields_ = data_chunk->fixed_fields_;
new_chunk->variable_fields_ = data_chunk->variable_fields_;
new_chunk->count_ = data_chunk->count_;
if (auto_increment) {
SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
IDNumbers ids;
......@@ -527,7 +523,7 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
BinaryDataPtr id_data = std::make_shared<BinaryData>();
id_data->data_.resize(ids.size() * sizeof(int64_t));
memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
new_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data;
data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data;
}
if (options_.wal_enable_) {
......@@ -549,8 +545,8 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
record.lsn = 0;
record.collection_id = collection_name;
record.partition_tag = partition_name;
record.data_chunk = new_chunk;
record.length = new_chunk->count_;
record.data_chunk = data_chunk;
record.length = data_chunk->count_;
record.type = wal::MXLogType::Entity;
STATUS_CHECK(ExecWalRecord(record));
......@@ -614,7 +610,6 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));
auto ss_id = ss->GetID();
/* collect all valid segment */
std::vector<SegmentVisitor::Ptr> segment_visitors;
......@@ -659,9 +654,11 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info before query
/* put search job to scheduler and wait job finish */
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitFinish();
cache::CpuCacheMgr::GetInstance().PrintInfo(); // print cache info after query
if (!job->status().ok()) {
return job->status();
......@@ -858,7 +855,7 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
continue;
}
auto deleted_count = deleted_docs->GetSize();
auto deleted_count = deleted_docs->GetCount();
if (deleted_count / (row_count + deleted_count) < threshold) {
continue; // no need to compact
}
......
......@@ -36,6 +36,7 @@ const char* JSON_FIELD_ELEMENT = "field_element";
const char* JSON_PARTITION_TAG = "tag";
const char* JSON_FILES = "files";
const char* JSON_INDEX_NAME = "index_name";
const char* JSON_INDEX_TYPE = "index_type";
const char* JSON_DATA_SIZE = "data_size";
const char* JSON_PATH = "path";
......@@ -165,6 +166,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
size_t total_row_count = 0;
size_t total_data_size = 0;
// get partition information
std::unordered_map<snapshot::ID_TYPE, milvus::json> partitions;
auto partition_names = ss->GetPartitionNames();
for (auto& name : partition_names) {
......@@ -183,11 +185,13 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
partitions.insert(std::make_pair(partition->GetID(), json_partition));
}
// just ensure segments listed in id order
snapshot::IDS_TYPE segment_ids;
auto handler = std::make_shared<SegmentsToSearchCollector>(ss, segment_ids);
handler->Iterate();
std::sort(segment_ids.begin(), segment_ids.end());
// get segment information and construct segment json nodes
std::unordered_map<snapshot::ID_TYPE, std::vector<milvus::json>> json_partition_segments;
for (auto id : segment_ids) {
auto segment_commit = ss->GetSegmentCommitBySegmentId(id);
......@@ -214,7 +218,15 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
json_file[JSON_PATH] =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>("", pair.second->GetFile());
json_file[JSON_FIELD] = field->GetName();
json_file[JSON_FIELD_ELEMENT] = element->GetName();
// if the element is index, print index name/type
// else print element name
if (element->GetFtype() == engine::FieldElementType::FET_INDEX) {
json_file[JSON_INDEX_NAME] = element->GetName();
json_file[JSON_INDEX_TYPE] = element->GetTypeName();
} else {
json_file[JSON_FIELD_ELEMENT] = element->GetName();
}
}
json_files.push_back(json_file);
}
......@@ -228,6 +240,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
json_partition_segments[segment_commit->GetPartitionId()].push_back(json_segment);
}
// construct partition json nodes
milvus::json json_partitions;
for (auto pair : partitions) {
milvus::json json_segments;
......
......@@ -128,6 +128,24 @@ SendExitSignal() {
kill(pid, SIGUSR2);
}
void
GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) {
ids.clear();
if (chunk == nullptr) {
return;
}
auto pair = chunk->fixed_fields_.find(engine::DEFAULT_UID_NAME);
if (pair == chunk->fixed_fields_.end() || pair->second == nullptr) {
return;
}
if (!pair->second->data_.empty()) {
ids.resize(pair->second->data_.size() / sizeof(engine::IDNumber));
memcpy((void*)(ids.data()), pair->second->data_.data(), pair->second->data_.size());
}
}
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -20,11 +20,6 @@
namespace milvus {
namespace engine {
namespace snapshot {
class Segment;
class Partition;
class Collection;
} // namespace snapshot
namespace utils {
int64_t
......@@ -58,6 +53,9 @@ ParseMetaUri(const std::string& uri, MetaUriInfo& info);
void
SendExitSignal();
void
GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids);
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -238,7 +238,7 @@ MemCollection::ApplyDeletes() {
}
}
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetSize() - pre_del_ids.size(), true);
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true);
segment_writer->WriteDeletedDocs(del_docs_path, delete_docs);
segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter);
......
......@@ -39,9 +39,14 @@ DeletedDocs::GetDeletedDocs() const {
//}
size_t
DeletedDocs::GetSize() const {
DeletedDocs::GetCount() const {
return deleted_doc_offsets_.size();
}
int64_t
DeletedDocs::Size() {
return deleted_doc_offsets_.size() * sizeof(offset_t);
}
} // namespace segment
} // namespace milvus
......@@ -20,12 +20,14 @@
#include <memory>
#include <vector>
#include "cache/DataObj.h"
namespace milvus {
namespace segment {
using offset_t = int32_t;
class DeletedDocs {
class DeletedDocs : public cache::DataObj {
public:
explicit DeletedDocs(const std::vector<offset_t>& deleted_doc_offsets);
......@@ -42,7 +44,10 @@ class DeletedDocs {
// GetName() const;
size_t
GetSize() const;
GetCount() const;
int64_t
Size() override;
// void
// GetBitset(faiss::ConcurrentBitsetPtr& bitset);
......
......@@ -76,9 +76,9 @@ IdBloomFilter::Remove(doc_id_t uid) {
// return name_;
//}
size_t
int64_t
IdBloomFilter::Size() {
return bloom_filter_->num_bytes;
return bloom_filter_ ? bloom_filter_->num_bytes : 0;
}
} // namespace segment
......
......@@ -20,6 +20,7 @@
#include <memory>
#include <mutex>
#include "cache/DataObj.h"
#include "dablooms/dablooms.h"
#include "utils/Status.h"
......@@ -28,7 +29,7 @@ namespace segment {
using doc_id_t = int64_t;
class IdBloomFilter {
class IdBloomFilter : public cache::DataObj {
public:
explicit IdBloomFilter(scaling_bloom_t* bloom_filter);
......@@ -46,8 +47,8 @@ class IdBloomFilter {
Status
Remove(doc_id_t uid);
size_t
Size();
int64_t
Size() override;
// const std::string&
// GetName() const;
......
......@@ -21,6 +21,7 @@
#include <memory>
#include <utility>
#include "cache/CpuCacheMgr.h"
#include "codecs/Codec.h"
#include "db/SnapshotUtils.h"
#include "db/Types.h"
......@@ -115,8 +116,16 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw);
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetBlockFormat()->Read(fs_ptr_, file_path, raw);
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, raw); // put into cache
} else {
raw = std::static_pointer_cast<engine::BinaryData>(data_obj);
}
field_map.insert(std::make_pair(field_name, raw));
} catch (std::exception& e) {
......@@ -265,62 +274,80 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
knowhere::BinarySet index_data;
knowhere::BinaryPtr raw_data, compress_data;
auto read_raw = [&]() -> void {
engine::BinaryDataPtr fixed_data;
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
if (status.ok()) {
ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data);
} else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data);
}
};
// if index not specified, or index file not created, return IDMAP
// if index not specified, or index file not created, return a temp index(IDMAP type)
auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (index_visitor == nullptr || index_visitor->GetFile() == nullptr) {
auto& json = field->GetParams();
if (json.find(knowhere::meta::DIM) == json.end()) {
return Status(DB_ERROR, "Vector field dimension undefined");
auto temp_index_path = engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment);
temp_index_path += "/";
std::string temp_index_name = field_name + ".idmap";
temp_index_path += temp_index_name;
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path);
if (data_obj != nullptr) {
index_ptr = std::static_pointer_cast<knowhere::VecIndex>(data_obj);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
} else {
auto& json = field->GetParams();
if (json.find(knowhere::meta::DIM) == json.end()) {
return Status(DB_ERROR, "Vector field dimension undefined");
}
int64_t dimension = json[knowhere::meta::DIM];
engine::BinaryDataPtr raw;
STATUS_CHECK(LoadField(field_name, raw));
auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw->data_.data());
// construct IDMAP index
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
index_ptr = vec_index_factory.CreateVecIndex(knowhere::IndexEnum::INDEX_FAISS_IDMAP,
knowhere::IndexMode::MODE_CPU);
milvus::json conf{{knowhere::meta::DIM, dimension}};
conf[engine::PARAM_INDEX_METRIC_TYPE] = knowhere::Metric::L2;
index_ptr->Train(knowhere::DatasetPtr(), conf);
index_ptr->AddWithoutIds(dataset, conf);
index_ptr->SetUids(uids);
index_ptr->SetBlacklist(concurrent_bitset_ptr);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
}
int64_t dimension = json[knowhere::meta::DIM];
engine::BinaryDataPtr raw;
STATUS_CHECK(LoadField(field_name, raw));
auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw->data_.data());
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
index_ptr =
vec_index_factory.CreateVecIndex(knowhere::IndexEnum::INDEX_FAISS_IDMAP, knowhere::IndexMode::MODE_CPU);
milvus::json conf{{knowhere::meta::DIM, dimension}};
conf[engine::PARAM_INDEX_METRIC_TYPE] = knowhere::Metric::L2;
index_ptr->Train(knowhere::DatasetPtr(), conf);
index_ptr->AddWithoutIds(dataset, conf);
index_ptr->SetUids(uids);
index_ptr->SetBlacklist(concurrent_bitset_ptr);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
return Status::OK();
}
// read index file
std::string file_path =
std::string index_file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, index_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, file_path, index_data);
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(index_file_path);
if (data_obj != nullptr) {
index_ptr = std::static_pointer_cast<knowhere::VecIndex>(data_obj);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
auto index_type = index_visitor->GetElement()->GetTypeName();
return Status::OK();
}
ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data);
// for some kinds index(IVF), read raw file
auto index_type = index_visitor->GetElement()->GetTypeName();
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG ||
index_type == knowhere::IndexEnum::INDEX_HNSW) {
read_raw();
engine::BinaryDataPtr fixed_data;
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
if (status.ok()) {
ss_codec.GetVectorIndexFormat()->ConvertRaw(fixed_data, raw_data);
} else if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data);
}
}
// for some kinds index(SQ8), read compress file
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8NR ||
index_type == knowhere::IndexEnum::INDEX_HNSW_SQ8NM) {
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) {
file_path =
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data);
}
......@@ -331,6 +358,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
index_ptr->SetUids(uids);
index_ptr->SetBlacklist(concurrent_bitset_ptr);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
cache::CpuCacheMgr::GetInstance().InsertItem(index_file_path, index_ptr); // put into cache
} catch (std::exception& e) {
std::string err_msg = "Failed to load vector index: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -361,7 +390,15 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
if (index_visitor && index_visitor->GetFile() != nullptr) {
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, index_visitor->GetFile());
ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr);
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr);
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, index_ptr); // put into cache
} else {
index_ptr = std::static_pointer_cast<knowhere::Index>(data_obj);
}
segment_ptr_->SetStructuredIndex(field_name, index_ptr);
}
......@@ -412,12 +449,22 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!boost::filesystem::exists(file_path + codec::IdBloomFilterFormat::FilePostfix())) {
return Status::OK(); // file doesn't exist
}
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr);
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Read(fs_ptr_, file_path, id_bloom_filter_ptr);
} else {
id_bloom_filter_ptr = std::static_pointer_cast<segment::IdBloomFilter>(data_obj);
}
if (id_bloom_filter_ptr) {
segment_ptr_->SetBloomFilter(id_bloom_filter_ptr);
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, id_bloom_filter_ptr); // put into cache
}
} catch (std::exception& e) {
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());
......@@ -439,15 +486,22 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!boost::filesystem::exists(file_path)) {
if (!boost::filesystem::exists(file_path + codec::DeletedDocsFormat::FilePostfix())) {
return Status::OK(); // file doesn't exist
}
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr);
// if the data is in cache, no need to read file
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(file_path);
if (data_obj == nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr);
} else {
deleted_docs_ptr = std::static_pointer_cast<segment::DeletedDocs>(data_obj);
}
if (deleted_docs_ptr) {
segment_ptr_->SetDeletedDocs(deleted_docs_ptr);
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, deleted_docs_ptr); // put into cache
}
} catch (std::exception& e) {
std::string err_msg = "Failed to load deleted docs: " + std::string(e.what());
......@@ -463,7 +517,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) {
size = 0;
auto deleted_docs_ptr = segment_ptr_->GetDeletedDocs();
if (deleted_docs_ptr != nullptr) {
size = deleted_docs_ptr->GetSize();
size = deleted_docs_ptr->GetCount();
return Status::OK(); // already exist
}
......@@ -471,7 +525,7 @@ SegmentReader::ReadDeletedDocsSize(size_t& size) {
auto visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
if (!boost::filesystem::exists(file_path)) {
if (!boost::filesystem::exists(file_path + codec::DeletedDocsFormat::FilePostfix())) {
return Status::OK(); // file doesn't exist
}
......
......@@ -223,9 +223,9 @@ BuildEntities2(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& d
}
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
data_chunk->fixed_fields_["float_vector"] = raw;
raw->data_.resize(vectors.float_data_.size() * sizeof(float));
memcpy(raw->data_.data(), vectors.float_data_.data(), vectors.float_data_.size() * sizeof(float));
data_chunk->fixed_fields_["float_vector"] = raw;
std::vector<int64_t> value_1;
value_1.resize(n);
......@@ -236,9 +236,9 @@ BuildEntities2(uint64_t n, uint64_t batch_index, milvus::engine::DataChunkPtr& d
{
milvus::engine::BinaryDataPtr raw = std::make_shared<milvus::engine::BinaryData>();
data_chunk->fixed_fields_["int64"] = raw;
raw->data_.resize(value_1.size() * sizeof(int64_t));
memcpy(raw->data_.data(), value_1.data(), value_1.size() * sizeof(int64_t));
data_chunk->fixed_fields_["int64"] = raw;
}
}
} // namespace
......@@ -508,6 +508,8 @@ TEST_F(DBTest, MergeTest) {
status = db_->Insert(collection_name, "", data_chunk);
ASSERT_TRUE(status.ok());
data_chunk->fixed_fields_.erase(milvus::engine::DEFAULT_UID_NAME); // clear auto-generated id
status = db_->Flush();
ASSERT_TRUE(status.ok());
}
......@@ -568,6 +570,42 @@ TEST_F(DBTest, MergeTest) {
ASSERT_EQ(expect_file_paths.size(), segment_file_paths.size());
}
TEST_F(DBTest, CompactTest) {
std::string collection_name = "COMPACT_TEST";
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok());
const uint64_t entity_count = 10000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
status = db_->Insert(collection_name, "", data_chunk);
ASSERT_TRUE(status.ok());
status = db_->Flush();
ASSERT_TRUE(status.ok());
milvus::engine::IDNumbers entity_ids;
milvus::engine::utils::GetIDFromChunk(data_chunk, entity_ids);
ASSERT_EQ(entity_ids.size(), entity_count);
// int64_t delete_count = 10;
// entity_ids.resize(delete_count);
// status = db_->DeleteEntityByID(collection_name, entity_ids);
// ASSERT_TRUE(status.ok());
//
// status = db_->Flush();
// ASSERT_TRUE(status.ok());
//
// status = db_->Compact(dummy_context_, collection_name);
// ASSERT_TRUE(status.ok());
//
// int64_t row_count = 0;
// status = db_->CountEntities(collection_name, row_count);
// ASSERT_TRUE(status.ok());
// ASSERT_EQ(row_count, entity_count - delete_count);
}
TEST_F(DBTest, IndexTest) {
std::string collection_name = "INDEX_TEST";
auto status = CreateCollection2(db_, collection_name, 0);
......@@ -656,6 +694,8 @@ TEST_F(DBTest, StatsTest) {
status = db_->Insert(collection_name, "", data_chunk);
ASSERT_TRUE(status.ok());
data_chunk->fixed_fields_.erase(milvus::engine::DEFAULT_UID_NAME); // clear auto-generated id
status = db_->Insert(collection_name, partition_name, data_chunk);
ASSERT_TRUE(status.ok());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册