未验证 提交 daae0564 编写于 作者: C congqixia 提交者: GitHub

Construct bloom filter when inserting (#7268)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 2c3779cf
......@@ -483,6 +483,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// store current endPositions as Segment->EndPostion
ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
// update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
}
if len(iMsg.insertMessages) > 0 {
......
......@@ -13,12 +13,15 @@ package datanode
import (
"context"
"encoding/binary"
"fmt"
"math"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -26,6 +29,12 @@ import (
"github.com/milvus-io/milvus/internal/types"
)
const (
// TODO silverxia maybe need set from config
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
type Replica interface {
getCollectionID() UniqueID
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
......@@ -37,6 +46,7 @@ type Replica interface {
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
updateSegmentPKRange(segID UniqueID, rowIDs []int64)
hasSegment(segID UniqueID, countFlushed bool) bool
updateStatistics(segID UniqueID, numRows int64) error
......@@ -58,6 +68,11 @@ type Segment struct {
checkPoint segmentCheckPoint
startPos *internalpb.MsgPosition // TODO readonly
endPos *internalpb.MsgPosition
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
// TODO silverxia, needs to change to interface to support `string` type PK
minPK int64 // minimal pk value, shortcut for checking whether a pk is inside this segment
maxPK int64 // maximal pk value, same above
}
// SegmentReplica is the data replication of persistent data in datanode.
......@@ -74,6 +89,20 @@ type SegmentReplica struct {
metaService *metaService
}
func (s *Segment) updatePKRange(rowIDs []int64) {
buf := make([]byte, 8)
for _, rowID := range rowIDs {
binary.BigEndian.PutUint64(buf, uint64(rowID))
s.pkFilter.Add(buf)
if rowID > s.maxPK {
s.maxPK = rowID
}
if rowID < s.minPK {
s.minPK = rowID
}
}
}
var _ Replica = &SegmentReplica{}
func newReplica(rc types.RootCoord, collID UniqueID) Replica {
......@@ -176,6 +205,10 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
checkPoint: segmentCheckPoint{0, *startPos},
startPos: startPos,
endPos: endPos,
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value
}
seg.isNew.Store(true)
......@@ -211,6 +244,11 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
checkPoint: *cp,
endPos: &cp.pos,
//TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value
}
seg.isNew.Store(false)
......@@ -278,6 +316,25 @@ func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos *
log.Warn("No match segment", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, rowIDs []int64) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
seg, ok := replica.newSegments[segID]
if ok {
seg.updatePKRange(rowIDs)
return
}
seg, ok = replica.normalSegments[segID]
if ok {
seg.updatePKRange(rowIDs)
return
}
log.Warn("No match segment to update PK range", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) removeSegment(segID UniqueID) error {
return nil
}
......
......@@ -12,8 +12,12 @@
package datanode
import (
"encoding/binary"
"math"
"math/rand"
"testing"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
......@@ -40,6 +44,13 @@ func TestSegmentReplica(t *testing.T) {
rc := &RootCoordFactory{}
collID := UniqueID(1)
t.Run("Test coll mot match", func(t *testing.T) {
replica := newSegmentReplica(rc, collID)
err := replica.addNewSegment(1, collID+1, 0, "", nil, nil)
assert.NotNil(t, err)
})
t.Run("Test segmentFlushed", func(t *testing.T) {
testReplica := &SegmentReplica{
newSegments: make(map[UniqueID]*Segment),
......@@ -191,3 +202,69 @@ func TestSegmentReplica(t *testing.T) {
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
})
}
func TestSegmentUpdatePKRange(t *testing.T) {
seg := &Segment{
pkFilter: bloom.NewWithEstimates(100000, 0.005),
maxPK: math.MinInt64,
minPK: math.MaxInt64,
}
cases := make([]int64, 0, 100)
for i := 0; i < 100; i++ {
cases = append(cases, rand.Int63())
}
buf := make([]byte, 8)
for _, c := range cases {
seg.updatePKRange([]int64{c})
assert.LessOrEqual(t, seg.minPK, c)
assert.GreaterOrEqual(t, seg.maxPK, c)
binary.BigEndian.PutUint64(buf, uint64(c))
assert.True(t, seg.pkFilter.Test(buf))
}
}
func TestReplicaUpdatePKRange(t *testing.T) {
rc := &RootCoordFactory{}
collID := UniqueID(1)
partID := UniqueID(2)
chanName := "insert-02"
startPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(100)}
endPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(200)}
cpPos := &internalpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
replica := newSegmentReplica(rc, collID)
err := replica.addNewSegment(1, collID, partID, chanName, startPos, endPos)
assert.Nil(t, err)
err = replica.addNormalSegment(2, collID, partID, chanName, 100, cp)
assert.Nil(t, err)
segNew := replica.newSegments[1]
segNormal := replica.normalSegments[2]
cases := make([]int64, 0, 100)
for i := 0; i < 100; i++ {
cases = append(cases, rand.Int63())
}
buf := make([]byte, 8)
for _, c := range cases {
replica.updateSegmentPKRange(1, []int64{c}) // new segment
replica.updateSegmentPKRange(2, []int64{c}) // normal segment
replica.updateSegmentPKRange(3, []int64{c}) // non-exist segment
assert.LessOrEqual(t, segNew.minPK, c)
assert.GreaterOrEqual(t, segNew.maxPK, c)
assert.LessOrEqual(t, segNormal.minPK, c)
assert.GreaterOrEqual(t, segNormal.maxPK, c)
binary.BigEndian.PutUint64(buf, uint64(c))
assert.True(t, segNew.pkFilter.Test(buf))
assert.True(t, segNormal.pkFilter.Test(buf))
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册