未验证 提交 31173727 编写于 作者: F foxspy 提交者: GitHub

growing segment index memory opt & get vector bugfix (#25272)

Signed-off-by: Nxianliang <xianliang.li@zilliz.com>
上级 80e4de62
......@@ -231,7 +231,7 @@ queryNode:
# And this value should be a number greater than 1 and less than 32.
chunkRows: 1024 # The number of vectors in a chunk.
growing: # growing a vector index for growing segment to accelerate search
enableIndex: false
enableIndex: true
nlist: 128 # growing segment index nlist
nprobe: 16 # nprobe to search growing segment, based on your accuracy requirement, must smaller than nlist
loadMemoryUsageFactor: 3 # The multiply factor of calculating the memory usage while loading segments
......
......@@ -100,6 +100,8 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
results.unity_topK_ = topk;
results.total_nq_ = num_queries;
} else {
std::shared_lock<std::shared_mutex> read_chunk_mutex(
segment.get_chunk_mutex());
int32_t current_chunk_id = 0;
// step 3: brute force search where small indexing is unavailable
auto vec_ptr = record.get_field_data_base(vecfield_id);
......
......@@ -28,7 +28,10 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
: FieldIndexing(field_meta, segcore_config),
config_(std::make_unique<VecIndexConfig>(
segment_max_row_count, field_index_meta, segcore_config)),
build(false),
sync_with_index(false) {
index_ = std::make_unique<index::VectorMemIndex>(config_->GetIndexType(),
config_->GetMetricType());
}
void
......@@ -87,7 +90,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset,
auto per_chunk = source->get_size_per_chunk();
//append vector [vector_id_beg, vector_id_end] into index
//build index [vector_id_beg, build_threshold) when index not exist
if (!index_.get()) {
if (!build) {
idx_t vector_id_beg = index_cur_.load();
idx_t vector_id_end = get_build_threshold() - 1;
auto chunk_id_beg = vector_id_beg / per_chunk;
......@@ -122,13 +125,17 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset,
}
auto dataset = knowhere::GenDataSet(vec_num, dim, data_addr);
dataset->SetIsOwner(false);
auto indexing = std::make_unique<index::VectorMemIndex>(
config_->GetIndexType(), config_->GetMetricType());
indexing->BuildWithDataset(dataset, conf);
try {
index_->BuildWithDataset(dataset, conf);
} catch (SegcoreError& error) {
LOG_SEGCORE_ERROR_ << " growing index build error : "
<< error.what();
return;
}
index_cur_.fetch_add(vec_num);
index_ = std::move(indexing);
build = true;
}
//append rest data when index exist
//append rest data when index has built
idx_t vector_id_beg = index_cur_.load();
idx_t vector_id_end = reserved_offset + size - 1;
auto chunk_id_beg = vector_id_beg / per_chunk;
......@@ -188,6 +195,11 @@ VectorFieldIndexing::sync_data_with_index() const {
return sync_with_index.load();
}
bool
VectorFieldIndexing::has_raw_data() const {
return index_->HasRawData();
}
template <typename T>
void
ScalarFieldIndexing<T>::BuildIndexRange(int64_t ack_beg,
......
......@@ -66,6 +66,11 @@ class FieldIndexing {
virtual bool
sync_data_with_index() const = 0;
virtual bool
has_raw_data() const {
return true;
}
const FieldMeta&
get_field_meta() {
return field_meta_;
......@@ -192,6 +197,9 @@ class VectorFieldIndexing : public FieldIndexing {
bool
sync_data_with_index() const override;
bool
has_raw_data() const override;
idx_t
get_index_cursor() override;
......@@ -203,6 +211,7 @@ class VectorFieldIndexing : public FieldIndexing {
private:
std::atomic<idx_t> index_cur_ = 0;
std::atomic<bool> build;
std::atomic<bool> sync_with_index;
std::unique_ptr<VecIndexConfig> config_;
std::unique_ptr<index::VectorIndex> index_;
......@@ -323,6 +332,16 @@ class IndexingRecord {
}
return false;
}
bool
HasRawData(FieldId fieldId) const {
if (is_in(fieldId)) {
const FieldIndexing& indexing = get_field_indexing(fieldId);
return indexing.has_raw_data();
}
return false;
}
// concurrent
int64_t
get_finished_ack() const {
......
......@@ -53,6 +53,21 @@ SegmentGrowingImpl::mask_with_delete(BitsetType& bitset,
bitset |= delete_bitset;
}
void
SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) {
//remove the chunk data to reduce memory consumption
if (indexing_record_.SyncDataWithIndex(fieldId)) {
auto vec_data_base =
dynamic_cast<segcore::ConcurrentVector<FloatVector>*>(
insert_record_.get_field_data_base(fieldId));
if (vec_data_base && vec_data_base->num_chunk() > 0 &&
chunk_mutex_.try_lock()) {
vec_data_base->clear();
chunk_mutex_.unlock();
}
}
}
void
SegmentGrowingImpl::Insert(int64_t reserved_offset,
int64_t size,
......@@ -89,6 +104,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
&insert_data->fields_data(data_offset),
field_meta);
}
//insert vector data into index
if (segcore_config_.get_enable_growing_segment_index()) {
indexing_record_.AppendingIndex(
reserved_offset,
......@@ -97,6 +113,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
&insert_data->fields_data(data_offset),
insert_record_);
}
try_remove_chunks(field_id);
}
// step 4: set pks to offset
......@@ -174,6 +191,7 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
offset += row_count;
}
}
try_remove_chunks(field_id);
if (field_id == primary_field_id) {
insert_record_.insert_pks(field_datas);
......@@ -392,10 +410,7 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,
auto& vec = *vec_ptr;
std::vector<uint8_t> empty(element_sizeof, 0);
if (indexing_record_.SyncDataWithIndex(field_id)) {
indexing_record_.GetDataFromIndex(
field_id, seg_offsets, count, element_sizeof, output_raw);
} else {
auto copy_from_chunk = [&]() {
auto output_base = reinterpret_cast<char*>(output_raw);
for (int i = 0; i < count; ++i) {
auto dst = output_base + i * element_sizeof;
......@@ -406,7 +421,20 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,
: (const uint8_t*)vec.get_element(offset));
memcpy(dst, src, element_sizeof);
}
};
//HasRawData interface guarantees that data can be fetched from growing segment
if (HasRawData(field_id.get())) {
//When data sync with index
if (indexing_record_.SyncDataWithIndex(field_id)) {
indexing_record_.GetDataFromIndex(
field_id, seg_offsets, count, element_sizeof, output_raw);
} else {
//Else copy from chunk
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
copy_from_chunk();
}
}
AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data");
}
template <typename S, typename T>
......
......@@ -89,6 +89,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
return deleted_record_;
}
std::shared_mutex&
get_chunk_mutex() const {
return chunk_mutex_;
}
const SealedIndexingRecord&
get_sealed_indexing_record() const {
return sealed_indexing_record_;
......@@ -124,6 +129,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
return segcore_config_.get_chunk_rows();
}
void
try_remove_chunks(FieldId fieldId);
public:
int64_t
get_row_count() const override {
......@@ -228,6 +236,12 @@ class SegmentGrowingImpl : public SegmentGrowing {
bool
HasRawData(int64_t field_id) const override {
//growing index hold raw data when
// 1. growing index enabled and it holds raw data
// 2. growing index disabled then raw data held by chunk
if (indexing_record_.is_in(FieldId(field_id))) {
return indexing_record_.HasRawData(FieldId(field_id));
}
return true;
}
......@@ -255,6 +269,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
// inserted fields data and row_ids, timestamps
InsertRecord<false> insert_record_;
mutable std::shared_mutex chunk_mutex_;
// deleted pks
mutable DeletedRecord deleted_record_;
......
......@@ -11,7 +11,7 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
set( KNOWHERE_VERSION 4eea3c1 )
set( KNOWHERE_VERSION 37d764a)
message(STATUS "Building knowhere-${KNOWHERE_SOURCE_VER} from source")
message(STATUS ${CMAKE_BUILD_TYPE})
......
......@@ -42,6 +42,7 @@ TEST(GrowingIndex, Correctness) {
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(226985, std::move(filedMap));
auto segment = CreateGrowingSegment(schema, metaPtr);
auto segmentImplPtr = dynamic_cast<SegmentGrowingImpl*>(segment.get());
// std::string dsl = R"({
// "bool": {
......@@ -86,6 +87,17 @@ TEST(GrowingIndex, Correctness) {
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
auto filed_data = segmentImplPtr->get_insert_record()
.get_field_data<milvus::FloatVector>(vec);
auto inserted = (i + 1) * per_batch;
//once index built, chunk data will be removed
if (i < 2) {
EXPECT_EQ(filed_data->num_chunk(),
upper_div(inserted, filed_data->get_size_per_chunk()));
} else {
EXPECT_EQ(filed_data->num_chunk(), 0);
}
auto plan = milvus::query::CreateSearchPlanByExpr(
*schema, plan_str.data(), plan_str.size());
......@@ -102,16 +114,37 @@ TEST(GrowingIndex, Correctness) {
}
}
TEST(GrowingIndex, GetVector) {
using Param = const char*;
class GrowingIndexGetVectorTest : public ::testing::TestWithParam<Param> {
void
SetUp() override {
auto param = GetParam();
metricType = param;
}
protected:
const char* metricType;
};
INSTANTIATE_TEST_CASE_P(IndexTypeParameters,
GrowingIndexGetVectorTest,
::testing::Values(knowhere::metric::L2,
knowhere::metric::COSINE,
knowhere::metric::IP));
TEST_P(GrowingIndexGetVectorTest, GetVector) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
auto random = schema->AddDebugField("random", DataType::DOUBLE);
auto vec = schema->AddDebugField(
"embeddings", DataType::VECTOR_FLOAT, 128, knowhere::metric::L2);
"embeddings", DataType::VECTOR_FLOAT, 128, metricType);
schema->set_primary_field_id(pk);
std::map<std::string, std::string> index_params = {
{"index_type", "IVF_FLAT"}, {"metric_type", "L2"}, {"nlist", "128"}};
{"index_type", "IVF_FLAT"},
{"metric_type", metricType},
{"nlist", "128"}};
std::map<std::string, std::string> type_params = {{"dim", "128"}};
FieldIndexMeta fieldIndexMeta(
vec, std::move(index_params), std::move(type_params));
......
......@@ -330,7 +330,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(8192), chunkRows)
enableGrowingIndex := Params.EnableGrowingSegmentIndex.GetAsBool()
assert.Equal(t, false, enableGrowingIndex)
assert.Equal(t, true, enableGrowingIndex)
params.Save("queryNode.segcore.growing.enableIndex", "true")
enableGrowingIndex = Params.EnableGrowingSegmentIndex.GetAsBool()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册