From daae05641a1302fd30476ae698f7fbb2f95c79d7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 25 Aug 2021 10:29:52 +0800 Subject: [PATCH] Construct bloom filter when inserting (#7268) Signed-off-by: Congqi Xia --- .../datanode/flow_graph_insert_buffer_node.go | 2 + internal/datanode/segment_replica.go | 57 ++++++++++++++ internal/datanode/segment_replica_test.go | 77 +++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index ac8984354..9658fa227 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -483,6 +483,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // store current endPositions as Segment->EndPostion ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0]) + // update segment pk filter + ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) } if len(iMsg.insertMessages) > 0 { diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 712d91f57..23f404f05 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -13,12 +13,15 @@ package datanode import ( "context" + "encoding/binary" "fmt" + "math" "sync" "sync/atomic" "go.uber.org/zap" + "github.com/bits-and-blooms/bloom/v3" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -26,6 +29,12 @@ import ( "github.com/milvus-io/milvus/internal/types" ) +const ( + // TODO silverxia maybe need set from config + bloomFilterSize uint = 100000 + maxBloomFalsePositive float64 = 0.005 +) + type Replica interface { getCollectionID() UniqueID getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) @@ -37,6 +46,7 @@ type Replica interface { listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) updateSegmentCheckPoint(segID UniqueID) + updateSegmentPKRange(segID UniqueID, rowIDs []int64) hasSegment(segID UniqueID, countFlushed bool) bool updateStatistics(segID UniqueID, numRows int64) error @@ -58,6 +68,11 @@ type Segment struct { checkPoint segmentCheckPoint startPos *internalpb.MsgPosition // TODO readonly endPos *internalpb.MsgPosition + + pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment + // TODO silverxia, needs to change to interface to support `string` type PK + minPK int64 // minimal pk value, shortcut for checking whether a pk is inside this segment + maxPK int64 // maximal pk value, same above } // SegmentReplica is the data replication of persistent data in datanode. @@ -74,6 +89,20 @@ type SegmentReplica struct { metaService *metaService } +func (s *Segment) updatePKRange(rowIDs []int64) { + buf := make([]byte, 8) + for _, rowID := range rowIDs { + binary.BigEndian.PutUint64(buf, uint64(rowID)) + s.pkFilter.Add(buf) + if rowID > s.maxPK { + s.maxPK = rowID + } + if rowID < s.minPK { + s.minPK = rowID + } + } +} + var _ Replica = &SegmentReplica{} func newReplica(rc types.RootCoord, collID UniqueID) Replica { @@ -176,6 +205,10 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID checkPoint: segmentCheckPoint{0, *startPos}, startPos: startPos, endPos: endPos, + + pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + minPK: math.MaxInt64, // use max value, represents no value + maxPK: math.MinInt64, // use min value represents no value } seg.isNew.Store(true) @@ -211,6 +244,11 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu checkPoint: *cp, endPos: &cp.pos, + + //TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files + pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), + minPK: math.MaxInt64, // use max value, represents no value + maxPK: math.MinInt64, // use min value represents no value } seg.isNew.Store(false) @@ -278,6 +316,25 @@ func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos * log.Warn("No match segment", zap.Int64("ID", segID)) } +func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, rowIDs []int64) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + seg, ok := replica.newSegments[segID] + if ok { + seg.updatePKRange(rowIDs) + return + } + + seg, ok = replica.normalSegments[segID] + if ok { + seg.updatePKRange(rowIDs) + return + } + + log.Warn("No match segment to update PK range", zap.Int64("ID", segID)) +} + func (replica *SegmentReplica) removeSegment(segID UniqueID) error { return nil } diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index af1805d1f..e193b49c7 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -12,8 +12,12 @@ package datanode import ( + "encoding/binary" + "math" + "math/rand" "testing" + "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,6 +44,13 @@ func TestSegmentReplica(t *testing.T) { rc := &RootCoordFactory{} collID := UniqueID(1) + t.Run("Test coll mot match", func(t *testing.T) { + replica := newSegmentReplica(rc, collID) + + err := replica.addNewSegment(1, collID+1, 0, "", nil, nil) + assert.NotNil(t, err) + }) + t.Run("Test segmentFlushed", func(t *testing.T) { testReplica := &SegmentReplica{ newSegments: make(map[UniqueID]*Segment), @@ -191,3 +202,69 @@ func TestSegmentReplica(t *testing.T) { assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) }) } + +func TestSegmentUpdatePKRange(t *testing.T) { + seg := &Segment{ + pkFilter: bloom.NewWithEstimates(100000, 0.005), + maxPK: math.MinInt64, + minPK: math.MaxInt64, + } + + cases := make([]int64, 0, 100) + for i := 0; i < 100; i++ { + cases = append(cases, rand.Int63()) + } + buf := make([]byte, 8) + for _, c := range cases { + seg.updatePKRange([]int64{c}) + + assert.LessOrEqual(t, seg.minPK, c) + assert.GreaterOrEqual(t, seg.maxPK, c) + + binary.BigEndian.PutUint64(buf, uint64(c)) + assert.True(t, seg.pkFilter.Test(buf)) + } +} + +func TestReplicaUpdatePKRange(t *testing.T) { + rc := &RootCoordFactory{} + collID := UniqueID(1) + partID := UniqueID(2) + chanName := "insert-02" + startPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(100)} + endPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(200)} + cpPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(10)} + cp := &segmentCheckPoint{int64(10), *cpPos} + + replica := newSegmentReplica(rc, collID) + + err := replica.addNewSegment(1, collID, partID, chanName, startPos, endPos) + assert.Nil(t, err) + err = replica.addNormalSegment(2, collID, partID, chanName, 100, cp) + assert.Nil(t, err) + + segNew := replica.newSegments[1] + segNormal := replica.normalSegments[2] + + cases := make([]int64, 0, 100) + for i := 0; i < 100; i++ { + cases = append(cases, rand.Int63()) + } + buf := make([]byte, 8) + for _, c := range cases { + replica.updateSegmentPKRange(1, []int64{c}) // new segment + replica.updateSegmentPKRange(2, []int64{c}) // normal segment + replica.updateSegmentPKRange(3, []int64{c}) // non-exist segment + + assert.LessOrEqual(t, segNew.minPK, c) + assert.GreaterOrEqual(t, segNew.maxPK, c) + assert.LessOrEqual(t, segNormal.minPK, c) + assert.GreaterOrEqual(t, segNormal.maxPK, c) + + binary.BigEndian.PutUint64(buf, uint64(c)) + assert.True(t, segNew.pkFilter.Test(buf)) + assert.True(t, segNormal.pkFilter.Test(buf)) + + } + +} -- GitLab