未验证 提交 36e0c115 编写于 作者: G groot 提交者: GitHub

cache structured field index (#3984)

* cache structured field index
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix unittest
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 504ee108
......@@ -47,20 +47,6 @@
namespace milvus {
namespace engine {
namespace {
template <typename T>
knowhere::IndexPtr
CreateSortedIndex(engine::BinaryDataPtr& raw_data) {
if (raw_data == nullptr) {
return nullptr;
}
auto count = raw_data->data_.size() / sizeof(T);
auto index_ptr =
std::make_shared<knowhere::StructuredIndexSort<T>>(count, reinterpret_cast<const T*>(raw_data->data_.data()));
return std::static_pointer_cast<knowhere::Index>(index_ptr);
}
} // namespace
ExecutionEngineImpl::ExecutionEngineImpl(const std::string& dir_root, const SegmentVisitorPtr& segment_visitor)
: gpu_enable_(config.gpu.enable()) {
......@@ -94,31 +80,6 @@ ExecutionEngineImpl::LoadForSearch(const query::QueryPtr& query_ptr) {
return Load(query_ptr->index_fields);
}
Status
ExecutionEngineImpl::CreateStructuredIndex(const DataType field_type, engine::BinaryDataPtr& raw_data,
knowhere::IndexPtr& index_ptr) {
switch (field_type) {
case engine::DataType::INT32: {
index_ptr = CreateSortedIndex<int32_t>(raw_data);
break;
}
case engine::DataType::INT64: {
index_ptr = CreateSortedIndex<int64_t>(raw_data);
break;
}
case engine::DataType::FLOAT: {
index_ptr = CreateSortedIndex<float>(raw_data);
break;
}
case engine::DataType::DOUBLE: {
index_ptr = CreateSortedIndex<double>(raw_data);
break;
}
default: { return Status(DB_ERROR, "Field is not structured type"); }
}
return Status::OK();
}
Status
ExecutionEngineImpl::Load(const TargetFields& field_names) {
TimeRecorderAuto rc("ExecutionEngineImpl::Load");
......@@ -173,13 +134,7 @@ ExecutionEngineImpl::Load(const TargetFields& field_names) {
STATUS_CHECK(segment_reader_->LoadStructuredIndex(name, index_ptr));
index_exist = (index_ptr != nullptr);
if (!index_exist) {
// for structured field, create a simple sorted index for it
// we also can do this in BuildIndex step, but for now we do this in Load step
BinaryDataPtr raw_data;
segment_reader_->LoadField(name, raw_data);
STATUS_CHECK(CreateStructuredIndex(field_type, raw_data, index_ptr));
segment_ptr->SetStructuredIndex(name, index_ptr);
index_exist = true;
LOG_ENGINE_ERROR_ << "Structure index doesn't exist";
}
}
......
......@@ -48,10 +48,6 @@ class ExecutionEngineImpl : public ExecutionEngine {
knowhere::VecIndexPtr
CreateVecIndex(const std::string& index_name, knowhere::IndexMode mode);
Status
CreateStructuredIndex(const engine::DataType field_type, engine::BinaryDataPtr& raw_data,
knowhere::IndexPtr& index_ptr);
Status
LoadForSearch(const query::QueryPtr& query_ptr);
......
......@@ -59,7 +59,7 @@ class StructuredIndexSort : public StructuredIndex<T> {
int64_t
Size() override {
return (int64_t)data_.size();
return (int64_t)data_.size() * sizeof(IndexStructure<T>);
}
bool
......
......@@ -100,7 +100,7 @@ TEST(STRUCTUREDINDEXSORT_TEST, test_serialize_and_load) {
binaryset.Append("index_length", length_data, bin_length->size);
structuredIndexSort.Load(binaryset);
EXPECT_EQ(n, (int)structuredIndexSort.Size());
EXPECT_EQ(n * sizeof(milvus::knowhere::IndexStructure<int>), (int)structuredIndexSort.Size());
EXPECT_EQ(true, structuredIndexSort.IsBuilt());
std::sort(p, p + n);
const std::vector<milvus::knowhere::IndexStructure<int>> const_index_data = structuredIndexSort.GetData();
......
......@@ -26,6 +26,7 @@
#include "db/Types.h"
#include "db/Utils.h"
#include "db/snapshot/ResourceHelper.h"
#include "knowhere/index/structured_index/StructuredIndexSort.h"
#include "knowhere/index/vector_index/IndexBinaryIDMAP.h"
#include "knowhere/index/vector_index/IndexIDMAP.h"
#include "knowhere/index/vector_index/VecIndex.h"
......@@ -39,6 +40,45 @@
namespace milvus {
namespace segment {
namespace {
template <typename T>
knowhere::IndexPtr
CreateSortedIndex(engine::BinaryDataPtr& raw_data) {
if (raw_data == nullptr) {
return nullptr;
}
auto count = raw_data->data_.size() / sizeof(T);
auto index_ptr =
std::make_shared<knowhere::StructuredIndexSort<T>>(count, reinterpret_cast<const T*>(raw_data->data_.data()));
return std::static_pointer_cast<knowhere::Index>(index_ptr);
}
Status
CreateStructuredIndex(const engine::DataType field_type, engine::BinaryDataPtr& raw_data,
knowhere::IndexPtr& index_ptr) {
switch (field_type) {
case engine::DataType::INT32: {
index_ptr = CreateSortedIndex<int32_t>(raw_data);
break;
}
case engine::DataType::INT64: {
index_ptr = CreateSortedIndex<int64_t>(raw_data);
break;
}
case engine::DataType::FLOAT: {
index_ptr = CreateSortedIndex<float>(raw_data);
break;
}
case engine::DataType::DOUBLE: {
index_ptr = CreateSortedIndex<double>(raw_data);
break;
}
default: { return Status(DB_ERROR, "Field is not structured type"); }
}
return Status::OK();
}
} // namespace
SegmentReader::SegmentReader(const std::string& dir_root, const engine::SegmentVisitorPtr& segment_visitor,
bool initialize)
......@@ -100,7 +140,7 @@ SegmentReader::Load() {
segment::DeletedDocsPtr deleted_docs_ptr;
LoadDeletedDocs(deleted_docs_ptr);
STATUS_CHECK(LoadVectorIndice());
STATUS_CHECK(LoadIndice());
return Status::OK();
}
......@@ -372,13 +412,14 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// if index not specified, or index file not created, return a temp index(IDMAP type)
auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (flat || index_visitor == nullptr || index_visitor->GetFile() == nullptr) {
// if the data is in cache, no need to read file
std::string temp_index_path;
GetTempIndexPath(field_name, temp_index_path);
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path);
if (data_obj != nullptr) {
// if the temp index is in cache, no need to create it
index_ptr = std::static_pointer_cast<knowhere::VecIndex>(data_obj);
segment_ptr_->SetVectorIndex(field_name, index_ptr);
recorder.RecordSection("get temp index from cache");
} else {
auto& json = field->GetParams();
if (json.find(knowhere::meta::DIM) == json.end()) {
......@@ -484,7 +525,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
Status
SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::IndexPtr& index_ptr) {
try {
TimeRecorderAuto recorder("SegmentReader::LoadStructuredIndex");
TimeRecorder recorder("SegmentReader::LoadStructuredIndex: " + field_name);
segment_ptr_->GetStructuredIndex(field_name, index_ptr);
if (index_ptr != nullptr) {
......@@ -502,9 +543,9 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
return Status(DB_ERROR, "Field is not structured type");
}
// read field index
auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (index_visitor && index_visitor->GetFile() != nullptr) {
// read field index
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, index_visitor->GetFile());
......@@ -513,12 +554,36 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
if (data_obj == nullptr) {
STATUS_CHECK(ss_codec.GetStructuredIndexFormat()->Read(fs_ptr_, file_path, index_ptr));
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, index_ptr); // put into cache
recorder.RecordSection("read from storage");
} else {
index_ptr = std::static_pointer_cast<knowhere::Index>(data_obj);
recorder.RecordSection("get from cache");
}
} else {
// if index not specified, or index file not created, return a temp index(SORTED type)
std::string temp_index_path;
GetTempIndexPath(field_name, temp_index_path);
auto data_obj = cache::CpuCacheMgr::GetInstance().GetItem(temp_index_path);
if (data_obj != nullptr) {
// if the temp index is in cache, no need to create it
index_ptr = std::static_pointer_cast<knowhere::VecIndex>(data_obj);
recorder.RecordSection("get temp index from cache");
} else {
// create temp index and put into cache
engine::DataType field_type = engine::DataType::NONE;
STATUS_CHECK(segment_ptr_->GetFieldType(field_name, field_type));
segment_ptr_->SetStructuredIndex(field_name, index_ptr);
engine::BinaryDataPtr raw_data;
LoadField(field_name, raw_data, false);
STATUS_CHECK(CreateStructuredIndex(field_type, raw_data, index_ptr));
cache::CpuCacheMgr::GetInstance().InsertItem(temp_index_path, index_ptr); // put into cache
recorder.RecordSection("create temp index");
}
}
segment_ptr_->SetStructuredIndex(field_name, index_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to load vector index: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -529,7 +594,7 @@ SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::Inde
}
Status
SegmentReader::LoadVectorIndice() {
SegmentReader::LoadIndice() {
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& iter : field_visitors_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
......@@ -540,8 +605,6 @@ SegmentReader::LoadVectorIndice() {
continue;
}
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
if (engine::IsVectorField(field)) {
knowhere::VecIndexPtr index_ptr;
STATUS_CHECK(LoadVectorIndex(name, index_ptr));
......@@ -695,7 +758,7 @@ SegmentReader::GetTempIndexPath(const std::string& field_name, std::string& path
auto segment = segment_visitor_->GetSegment();
path = engine::snapshot::GetResPath<engine::snapshot::Segment>(dir_collections_, segment);
path += "/";
std::string temp_index_name = field_name + ".idmap";
std::string temp_index_name = field_name + ".tmp.index";
path += temp_index_name;
return Status::OK();
......
......@@ -66,7 +66,7 @@ class SegmentReader {
LoadStructuredIndex(const std::string& field_name, knowhere::IndexPtr& index_ptr);
Status
LoadVectorIndice();
LoadIndice();
Status
LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册