未验证 提交 31a95400 编写于 作者: L Letian Jiang 提交者: GitHub

Support deltaLog loading on growing segment (#16985)

Signed-off-by: NLetian Jiang <letian.jiang@zilliz.com>
上级 898533c5
......@@ -243,6 +243,22 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const {
return total_bytes;
}
void
SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
AssertInfo(info.primary_keys, "Deleted primary keys is null");
AssertInfo(info.timestamps, "Deleted timestamps is null");
auto primary_keys = reinterpret_cast<const idx_t*>(info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
int64_t size = info.row_count;
deleted_record_.uids_.set_data(0, primary_keys, size);
deleted_record_.timestamps_.set_data(0, timestamps, size);
deleted_record_.ack_responder_.AddSegment(0, size);
deleted_record_.reserved.fetch_add(size);
deleted_record_.record_size_ = size;
}
SpanBase
SegmentGrowingImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const {
auto vec = get_insert_record().get_field_data_base(field_offset);
......
......@@ -63,6 +63,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
int64_t
GetMemoryUsageInBytes() const override;
......
......@@ -22,6 +22,7 @@
#include "common/Span.h"
#include "common/SystemProperty.h"
#include "common/Types.h"
#include "common/LoadInfo.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "query/Plan.h"
#include "query/PlanNode.h"
......@@ -61,6 +62,9 @@ class SegmentInterface {
virtual Status
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
virtual void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;
};
// internal API for DSL calculation
......
......@@ -204,11 +204,10 @@ CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) {
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
AssertInfo(segment_interface != nullptr, "segment conversion failed");
auto load_info = LoadDeletedRecordInfo{deleted_record_info.timestamps, deleted_record_info.primary_keys,
deleted_record_info.row_count};
segment->LoadDeletedRecord(load_info);
segment_interface->LoadDeletedRecord(load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());
......
......@@ -18,7 +18,9 @@ package querynode
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"math/rand"
"strconv"
......@@ -66,6 +68,8 @@ const (
defaultDMLChannel = "query-node-unittest-DML-0"
defaultDeltaChannel = "query-node-unittest-delta-channel-0"
defaultSubName = "query-node-unittest-sub-name-0"
defaultLocalStorage = "/tmp/milvus_test/querynode"
)
const (
......@@ -1455,3 +1459,57 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType,
return fieldData
}
// saveDeltaLog saves delta logs into MinIO for testing purpose.
func saveDeltaLog(ctx context.Context,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID) ([]*datapb.FieldBinlog, error) {
binlogWriter := storage.NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, _ := binlogWriter.NextDeleteEventWriter()
dData := &storage.DeleteData{
Pks: []int64{1, 2},
Tss: []Timestamp{100, 200},
RowCount: 2,
}
sizeTotal := 0
for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i]
ts := dData.Tss[i]
eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
sizeTotal += binary.Size(int64PkValue)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
binlogWriter.Finish()
buffer, _ := binlogWriter.GetBuffer()
blob := &storage.Blob{Key: "deltaLogPath1", Value: buffer}
kvs := make(map[string][]byte, 1)
// write delta log
pkFieldID := UniqueID(102)
fieldBinlog := make([]*datapb.FieldBinlog, 0)
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID))
key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID)
key += "delta" // append suffix 'delta' to avoid conflicts against binlog
kvs[key] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: []*datapb.Binlog{{LogPath: key}},
})
log.Debug("[query node unittest] save delta log file to MinIO/S3")
kv, err := genMinioKV(ctx)
if err != nil {
return nil, err
}
err = kv.MultiSaveBytes(kvs)
return fieldBinlog, err
}
......@@ -652,10 +652,10 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []IntPrimaryKey, timestam
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
if s.segmentType != segmentTypeSealed {
errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
// if s.segmentType != segmentTypeSealed {
// errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
// return errors.New(errMsg)
// }
loadInfo := C.CLoadDeletedRecordInfo{
timestamps: unsafe.Pointer(&timestamps[0]),
primary_keys: unsafe.Pointer(&primaryKeys[0]),
......
......@@ -374,11 +374,13 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
defer cancel()
schema := genSimpleInsertDataSchema()
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err)
t.Run("test load growing and sealed segments", func(t *testing.T) {
deltaLogs, err := saveDeltaLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID)
assert.NoError(t, err)
t.Run("test load sealed segments", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
......@@ -406,6 +408,10 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
err = loader.loadSegment(req1, segmentTypeSealed)
assert.NoError(t, err)
segment1, err := loader.historicalReplica.getSegmentByID(segmentID1)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), int64(100))
segmentID2 := UniqueID(101)
req2 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
......@@ -420,20 +426,78 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
Deltalogs: deltaLogs,
},
},
}
err = loader.loadSegment(req2, segmentTypeGrowing)
err = loader.loadSegment(req2, segmentTypeSealed)
assert.NoError(t, err)
segment1, err := loader.historicalReplica.getSegmentByID(segmentID1)
segment2, err := loader.historicalReplica.getSegmentByID(segmentID2)
assert.NoError(t, err)
// Note: getRowCount currently does not return accurate counts. The deleted rows are also counted.
assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98
})
segment2, err := loader.streamingReplica.getSegmentByID(segmentID2)
t.Run("test load growing segments", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
loader := node.loader
assert.NotNil(t, loader)
segmentID1 := UniqueID(100)
req1 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID1,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
err = loader.loadSegment(req1, segmentTypeGrowing)
assert.NoError(t, err)
segment1, err := loader.streamingReplica.getSegmentByID(segmentID1)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), int64(100))
segmentID2 := UniqueID(101)
req2 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID2,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
Deltalogs: deltaLogs,
},
},
}
err = loader.loadSegment(req2, segmentTypeGrowing)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), segment2.getRowCount())
segment2, err := loader.streamingReplica.getSegmentByID(segmentID2)
assert.NoError(t, err)
// Note: getRowCount currently does not return accurate counts. The deleted rows are also counted.
assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98
})
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册