From 2be217bcf4ae7065da7990c828480bbafe2573de Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Thu, 3 Mar 2022 17:33:56 +0800 Subject: [PATCH] Specify appropriate timestamp in initPKBloomFilter (#15823) (#15826) Signed-off-by: Letian Jiang --- internal/datanode/data_sync_service.go | 7 ++++--- internal/datanode/segment_replica.go | 16 +++++++-------- internal/datanode/segment_replica_test.go | 24 +++++++++++------------ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index a96ab3c20..6d289c4e0 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -170,7 +170,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro pos: *us.GetDmlPosition(), } } - if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, cp); err != nil { + if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), + us.GetNumOfRows(), us.Statslogs, cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { return err } } @@ -192,8 +193,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro zap.Int64("SegmentID", fs.GetID()), zap.Int64("NumOfRows", fs.GetNumOfRows()), ) - if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID, - fs.PartitionID, fs.GetInsertChannel(), fs.GetNumOfRows(), fs.Statslogs); err != nil { + if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID, fs.PartitionID, fs.GetInsertChannel(), + fs.GetNumOfRows(), fs.Statslogs, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil { return err } } diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 6a1953b12..2d669c6ca 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -52,9 +52,9 @@ type Replica interface { listAllSegmentIDs() []UniqueID addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error - addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint) error + addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error filterSegments(channelName string, partitionID UniqueID) []*Segment - addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog) error + addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, recoverTs Timestamp) error listNewSegmentsStartPositions() []*datapb.SegmentStartPosition listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) @@ -322,7 +322,7 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un // addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no // such segment by `hasSegment` -func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint) error { +func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error { if collID != replica.collectionID { log.Warn("Mismatch collection", zap.Int64("input ID", collID), @@ -352,7 +352,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu seg.checkPoint = *cp seg.endPos = &cp.pos } - err := replica.initPKBloomFilter(seg, statsBinlogs) + err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs) if err != nil { return err } @@ -370,7 +370,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu // addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no // such segment by `hasSegment` -func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog) error { +func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, recoverTs Timestamp) error { if collID != replica.collectionID { log.Warn("Mismatch collection", @@ -399,7 +399,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq maxPK: math.MinInt64, // use min value represents no value } - err := replica.initPKBloomFilter(seg, statsBinlogs) + err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs) if err != nil { return err } @@ -414,9 +414,9 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq return nil } -func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog) error { +func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error { log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs))) - schema, err := replica.getCollectionSchema(s.collectionID, 0) + schema, err := replica.getCollectionSchema(s.collectionID, ts) if err != nil { return err } diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index 44782eeec..97d8b9a90 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -340,7 +340,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { sr.minIOKV = &mockMinioKV{} assert.Nil(t, err) require.False(t, sr.hasSegment(test.inSegID, true)) - err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}) + err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}, 0) if test.isValidCase { assert.NoError(t, err) assert.True(t, sr.hasSegment(test.inSegID, true)) @@ -361,7 +361,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { segID := int64(101) require.False(t, sr.hasSegment(segID, true)) assert.NotPanics(t, func() { - err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil) + err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil, 0) assert.NoError(t, err) }) }) @@ -587,9 +587,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} cp := &segmentCheckPoint{int64(10), *cpPos} - err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp) + err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0) assert.NotNil(t, err) - err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}) + err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0) assert.NotNil(t, err) }) @@ -600,9 +600,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} cp := &segmentCheckPoint{int64(10), *cpPos} - err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp) + err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0) assert.NotNil(t, err) - err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}) + err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0) assert.NotNil(t, err) }) @@ -613,9 +613,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} cp := &segmentCheckPoint{int64(10), *cpPos} - err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp) + err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0) assert.NotNil(t, err) - err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}) + err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0) assert.NotNil(t, err) }) @@ -678,7 +678,7 @@ func TestInnerFunctionSegment(t *testing.T) { cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} cp := &segmentCheckPoint{int64(10), *cpPos} - err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp) + err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0) assert.NoError(t, err) assert.True(t, replica.hasSegment(1, true)) assert.Equal(t, 1, len(replica.normalSegments)) @@ -695,7 +695,7 @@ func TestInnerFunctionSegment(t *testing.T) { assert.False(t, seg.isNew.Load().(bool)) assert.False(t, seg.isFlushed.Load().(bool)) - err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}) + err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}, 0) assert.Error(t, err) replica.updateStatistics(1, 10) @@ -730,7 +730,7 @@ func TestInnerFunctionSegment(t *testing.T) { replica.updateSegmentCheckPoint(1) assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) - err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}) + err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0) assert.Nil(t, err) totalSegments := replica.filterSegments("insert-01", common.InvalidPartitionID) @@ -776,7 +776,7 @@ func TestReplica_UpdatePKRange(t *testing.T) { err = replica.addNewSegment(1, collID, partID, chanName, startPos, endPos) assert.Nil(t, err) - err = replica.addNormalSegment(2, collID, partID, chanName, 100, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp) + err = replica.addNormalSegment(2, collID, partID, chanName, 100, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0) assert.Nil(t, err) segNew := replica.newSegments[1] -- GitLab