未验证 提交 3c3ba55c 编写于 作者: C congqixia 提交者: GitHub

Fix bloom filter size not match (#19173)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 4078f454
......@@ -25,7 +25,6 @@ import (
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -129,28 +128,6 @@ func (t *compactionTask) getChannelName() string {
return t.plan.GetChannel()
}
func (t *compactionTask) getPlanTargetEntryNumber() int64 {
if t.plan == nil {
// if plan empty return default size
return int64(bloomFilterSize)
}
var result int64
for _, info := range t.plan.GetSegmentBinlogs() {
for _, fieldLog := range info.GetFieldBinlogs() {
for _, binlog := range fieldLog.GetBinlogs() {
result += binlog.GetEntriesNum()
}
}
}
// prevent bloom filter too small
if result == 0 {
log.Warn("compaction target entry number zero", zap.Int64("planID", t.getPlanID()))
return int64(bloomFilterSize)
}
return result
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
......@@ -210,7 +187,7 @@ func nano2Milli(nano time.Duration) float64 {
return float64(nano) / float64(time.Millisecond)
}
func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, []byte, int64, error) {
func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, *Segment, int64, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
......@@ -224,7 +201,6 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
// statslog generation
segment *Segment // empty segment used for bf generation
pkID UniqueID
pkType schemapb.DataType
iDatas = make([]*InsertData, 0)
fID2Type = make(map[UniqueID]schemapb.DataType)
......@@ -240,19 +216,14 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
return false
}
//
targetRowCount := t.getPlanTargetEntryNumber()
log.Debug("merge estimate target row count", zap.Int64("row count", targetRowCount))
segment = &Segment{
pkFilter: bloom.NewWithEstimates(uint(targetRowCount), maxBloomFalsePositive),
}
segment = &Segment{}
t.Replica.initSegmentBloomFilter(segment)
// get dim
for _, fs := range schema.GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
if fs.GetIsPrimaryKey() {
pkID = fs.GetFieldID()
pkType = fs.GetDataType()
}
if fs.GetDataType() == schemapb.DataType_FloatVector ||
fs.GetDataType() == schemapb.DataType_BinaryVector {
......@@ -349,17 +320,10 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestam
}
}
// marshal segment statslog
segStats, err := segment.getSegmentStatslog(pkID, pkType)
if err != nil {
log.Warn("failed to generate segment statslog", zap.Int64("pkID", pkID), zap.Error(err))
return nil, nil, 0, err
}
log.Debug("merge end", zap.Int64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
return iDatas, segStats, numRows, nil
return iDatas, segment, numRows, nil
}
func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
......@@ -526,12 +490,19 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
return nil, err
}
iDatas, segStats, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime())
iDatas, segment, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return nil, err
}
// marshal segment statslog
segStats, err := segment.getSegmentStatslog(PKfieldID, PkType)
if err != nil {
log.Warn("failed to generate segment statslog", zap.Int64("pkID", PKfieldID), zap.Error(err))
return nil, err
}
uploadStart := time.Now()
segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, segStats, deltaBuf.delData, meta)
if err != nil {
......@@ -572,7 +543,13 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
}
// no need to shorten the PK range of a segment, deleting dup PKs is valid
} else {
err = t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows)
segment.collectionID = collID
segment.partitionID = partID
segment.segmentID = targetSegID
segment.channelName = t.plan.GetChannel()
segment.numRows = numRows
err = t.mergeFlushedSegments(segment, t.plan.GetPlanID(), segIDs)
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return nil, err
......
......@@ -24,13 +24,16 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
......@@ -247,10 +250,19 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Test merge", func(t *testing.T) {
collectionID := int64(1)
meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64)
rc := &mocks.RootCoord{}
rc.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Schema: meta.GetSchema(),
}, nil)
replica, err := newReplica(context.Background(), rc, nil, collectionID)
require.NoError(t, err)
t.Run("Merge without expiration", func(t *testing.T) {
Params.CommonCfg.EntityExpirationTTL = 0
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
......@@ -264,13 +276,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{}
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
ct := &compactionTask{
Replica: replica,
}
idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, idata[0].Data)
assert.NotEmpty(t, segStats)
assert.NotNil(t, segment)
})
t.Run("Merge without expiration2", func(t *testing.T) {
Params.CommonCfg.EntityExpirationTTL = 0
......@@ -292,13 +306,15 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{}
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
ct := &compactionTask{
Replica: replica,
}
idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(idata))
assert.NotEmpty(t, idata[0].Data)
assert.NotEmpty(t, segStats)
assert.NotEmpty(t, segment)
})
t.Run("Merge with expiration", func(t *testing.T) {
......@@ -318,12 +334,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{}
idata, segStats, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
ct := &compactionTask{
Replica: replica,
}
idata, segment, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
assert.NoError(t, err)
assert.Equal(t, int64(1), numOfRow)
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, segStats)
assert.NotEmpty(t, segment)
})
t.Run("Merge with meta error", func(t *testing.T) {
......@@ -343,7 +361,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{}
ct := &compactionTask{
Replica: replica,
}
_, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "64"},
......@@ -369,7 +389,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{}
ct := &compactionTask{
Replica: replica,
}
_, _, _, err = ct.merge(mitr, dm, &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "dim"},
......@@ -575,7 +597,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
PlanID: 10080,
SegmentBinlogs: segBinlogs,
StartTime: 0,
TimeoutInSeconds: 1,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_InnerCompaction,
Timetravel: 30000,
Channel: "channelname",
......@@ -760,7 +782,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
},
},
StartTime: 0,
TimeoutInSeconds: 1,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 40000,
Channel: "channelname",
......@@ -892,7 +914,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
},
},
StartTime: 0,
TimeoutInSeconds: 1,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 40000,
Channel: "channelname",
......
......@@ -187,7 +187,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
err := retry.Do(ddn.ctx, func() error {
return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
}, flowGraphRetryOpt)
}, getFlowGraphRetryOpt())
if err != nil {
err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vChannelName, err)
log.Error(err.Error())
......
......@@ -320,7 +320,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
// Test
flowGraphRetryOpt = retry.Attempts(1)
setFlowGraphRetryOpt(retry.Attempts(1))
assert.Panics(t, func() {
ddn.Operate([]Msg{msgStreamMsg})
})
......
......@@ -183,7 +183,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
} else {
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
}, flowGraphRetryOpt)
}, getFlowGraphRetryOpt())
if err != nil {
err = fmt.Errorf("failed to flush delete data, err = %s", err)
log.Error(err.Error())
......
......@@ -418,7 +418,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
var fgMsg flowgraph.Msg = &msg
flowGraphRetryOpt = retry.Attempts(1)
setFlowGraphRetryOpt(retry.Attempts(1))
assert.Panics(te, func() {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
......@@ -462,7 +462,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
var fgMsg flowgraph.Msg = &msg
flowGraphRetryOpt = retry.Attempts(1)
setFlowGraphRetryOpt(retry.Attempts(1))
assert.NotPanics(t, func() {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
......
......@@ -416,7 +416,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
task.flushed,
task.dropped,
endPositions[0])
}, flowGraphRetryOpt)
}, getFlowGraphRetryOpt())
if err != nil {
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
......
......@@ -235,7 +235,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test flushBufferData failed
flowGraphRetryOpt = retry.Attempts(1)
setFlowGraphRetryOpt(retry.Attempts(1))
inMsg = genFlowGraphInsertMsg(insertChannelName)
iBNode.flushManager = &mockFlushManager{returnError: true}
iBNode.insertBuffer.Store(inMsg.insertMessages[0].SegmentID, &BufferData{})
......
......@@ -17,6 +17,8 @@
package datanode
import (
"sync/atomic"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/retry"
)
......@@ -33,3 +35,19 @@ type (
)
var flowGraphRetryOpt = retry.Attempts(5)
var fgRetryOptVal atomic.Value
func init() {
setFlowGraphRetryOpt(retry.Attempts(5))
}
// setFlowGraphRetryOpt set retry option for flowgraph
// used for tests only
func setFlowGraphRetryOpt(opt retry.Option) {
fgRetryOptVal.Store(opt)
}
func getFlowGraphRetryOpt() retry.Option {
return fgRetryOptVal.Load().(retry.Option)
}
......@@ -34,11 +34,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TODO silverxia maybe need set from config
bloomFilterSize uint = 100000
maxBloomFalsePositive float64 = 0.005
)
......@@ -71,7 +70,7 @@ type Replica interface {
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) error
mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
hasSegment(segID UniqueID, countFlushed bool) bool
removeSegments(segID ...UniqueID)
listCompactedSegmentIDs() map[UniqueID][]UniqueID
......@@ -81,6 +80,7 @@ type Replica interface {
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
segmentFlushed(segID UniqueID)
getSegmentStatslog(segID UniqueID) ([]byte, error)
initSegmentBloomFilter(seg *Segment) error
}
// Segment is the data structure of segments in data node replica.
......@@ -198,6 +198,12 @@ func newReplica(ctx context.Context, rc types.RootCoord, cm storage.ChunkManager
metaService: metaService,
chunkManager: cm,
}
// try to cache latest schema
_, err := replica.getCollectionSchema(collID, 0)
if err != nil {
log.Warn("failed to get schema when create replica", zap.Int64("collID", collID), zap.Error(err))
return nil, err
}
return replica, nil
}
......@@ -267,24 +273,58 @@ func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (coll
return 0, 0, fmt.Errorf("cannot find segment, id = %v", segID)
}
// maxRowCountPerSegment returns max row count for a segment based on estimation of row size.
func (replica *SegmentReplica) maxRowCountPerSegment(ts Timestamp) (int64, error) {
log := log.With(zap.Int64("collectionID", replica.collectionID), zap.Uint64("timpstamp", ts))
schema, err := replica.getCollectionSchema(replica.collectionID, ts)
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
return 0, err
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return 0, err
}
threshold := Params.DataCoordCfg.SegmentMaxSize * 1024 * 1024
return int64(threshold / float64(sizePerRecord)), nil
}
// initSegmentBloomFilter initialize segment pkFilter with a new bloom filter.
// this new BF will be initialized with estimated max rows and default false positive rate.
func (replica *SegmentReplica) initSegmentBloomFilter(s *Segment) error {
var ts Timestamp
if s.startPos != nil {
ts = s.startPos.Timestamp
}
maxRowCount, err := replica.maxRowCountPerSegment(ts)
if err != nil {
log.Warn("initSegmentBloomFilter failed, cannot estimate max row count", zap.Error(err))
return err
}
s.pkFilter = bloom.NewWithEstimates(uint(maxRowCount), maxBloomFalsePositive)
return nil
}
// addNewSegment adds a *New* and *NotFlushed* new segment. Before add, please make sure there's no
// such segment by `hasSegment`
func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID, channelName string,
startPos, endPos *internalpb.MsgPosition) error {
log := log.With(
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName))
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
zap.Int64("expected collectionID", replica.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", collID)
}
log.Info("Add new segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
log.Info("Add new segment")
seg := &Segment{
collectionID: collID,
......@@ -295,8 +335,12 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
checkPoint: segmentCheckPoint{0, *startPos},
startPos: startPos,
endPos: endPos,
}
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
err := replica.initSegmentBloomFilter(seg)
if err != nil {
log.Warn("failed to addNewSegment, init segment bf returns error", zap.Error(err))
return err
}
seg.isNew.Store(true)
......@@ -360,19 +404,19 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un
// addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no
// such segment by `hasSegment`
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error {
log := log.With(
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName))
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
zap.Int64("expected collectionID", replica.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", collID)
}
log.Info("Add Normal segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
log.Info("Add Normal segment")
seg := &Segment{
collectionID: collID,
......@@ -380,8 +424,6 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
}
if cp != nil {
......@@ -408,19 +450,19 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
// such segment by `hasSegment`
func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, recoverTs Timestamp) error {
log := log.With(
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName))
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
zap.Int64("expected collectionID", replica.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", collID)
}
log.Info("Add Flushed segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
log.Info("Add Flushed segment")
seg := &Segment{
collectionID: collID,
......@@ -428,9 +470,6 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
//TODO silverxia, normal segments bloom filter and pk range should be loaded from serialized files
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
}
err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs)
......@@ -449,9 +488,11 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
}
func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
log := log.With(zap.Int64("segmentID", s.segmentID))
log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs)))
schema, err := replica.getCollectionSchema(s.collectionID, ts)
if err != nil {
log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err))
return err
}
......@@ -475,6 +516,12 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat
}
}
// no stats log to parse, initialize a new BF
if len(bloomFilterFiles) == 0 {
log.Warn("no stats files to load, initializa a new one")
return replica.initSegmentBloomFilter(s)
}
values, err := replica.chunkManager.MultiRead(bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
......@@ -491,13 +538,22 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat
return err
}
for _, stat := range stats {
err = s.pkFilter.Merge(stat.BF)
if err != nil {
return err
// use first BF to merge
if s.pkFilter == nil {
s.pkFilter = stat.BF
} else {
// for compatibility, statslog before 2.1.2 uses separated stats log which needs to be merged
// assuming all legacy BF has same attributes.
err = s.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
}
s.updatePk(stat.MinPk)
s.updatePk(stat.MaxPk)
}
return nil
}
......@@ -730,32 +786,23 @@ func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) {
log.Warn("There's no segment", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planID UniqueID, compactedFrom []UniqueID, channelName string, numOfRows int64) error {
if collID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", replica.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", collID)
}
func (replica *SegmentReplica) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
log.Info("merge flushed segments",
log := log.With(
zap.Int64("segment ID", seg.segmentID),
zap.Int64("collection ID", seg.collectionID),
zap.Int64("partition ID", seg.partitionID),
zap.Int64s("compacted from", compactedFrom),
zap.Int64("planID", planID),
zap.Int64("compacted To segmentID", segID),
zap.Int64s("compacted From segmentIDs", compactedFrom),
zap.Int64("partition ID", partID),
zap.String("channel name", channelName),
)
zap.String("channel name", seg.channelName))
seg := &Segment{
collectionID: collID,
partitionID: partID,
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
if seg.collectionID != replica.collectionID {
log.Warn("Mismatch collection",
zap.Int64("expected collectionID", replica.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", seg.collectionID)
}
log.Info("merge flushed segments")
replica.segMu.Lock()
for _, ID := range compactedFrom {
s, ok := replica.flushedSegments[ID]
......@@ -765,11 +812,9 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planI
continue
}
s.compactedTo = segID
s.compactedTo = seg.segmentID
replica.compactedSegments[ID] = s
delete(replica.flushedSegments, ID)
seg.pkFilter.Merge(s.pkFilter)
}
replica.segMu.Unlock()
......@@ -777,7 +822,7 @@ func (replica *SegmentReplica) mergeFlushedSegments(segID, collID, partID, planI
seg.isFlushed.Store(true)
replica.segMu.Lock()
replica.flushedSegments[segID] = seg
replica.flushedSegments[seg.segmentID] = seg
replica.segMu.Unlock()
return nil
......@@ -805,8 +850,11 @@ func (replica *SegmentReplica) addFlushedSegmentWithPKs(segID, collID, partID Un
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
}
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
err := replica.initSegmentBloomFilter(seg)
if err != nil {
return err
}
seg.updatePKRange(ids)
......
......@@ -19,6 +19,7 @@ package datanode
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"testing"
......@@ -56,7 +57,7 @@ func (kv *mockDataCM) MultiRead(keys []string) ([][]byte, error) {
FieldID: common.RowIDField,
Min: 0,
Max: 10,
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
BF: bloom.NewWithEstimates(100000, maxBloomFalsePositive),
}
buffer, _ := json.Marshal(stats)
return [][]byte{buffer}, nil
......@@ -67,14 +68,16 @@ type mockPkfilterMergeError struct {
}
func (kv *mockPkfilterMergeError) MultiRead(keys []string) ([][]byte, error) {
stats := &storage.PrimaryKeyStats{
FieldID: common.RowIDField,
Min: 0,
Max: 10,
BF: bloom.NewWithEstimates(1, 0.0001),
}
buffer, _ := json.Marshal(stats)
return [][]byte{buffer}, nil
/*
stats := &storage.PrimaryKeyStats{
FieldID: common.RowIDField,
Min: 0,
Max: 10,
BF: bloom.NewWithEstimates(1, 0.0001),
}
buffer, _ := json.Marshal(stats)
return [][]byte{buffer}, nil*/
return nil, errors.New("mocked multi read error")
}
type mockDataCMError struct {
......@@ -566,6 +569,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
assert.Nil(t, err)
if test.metaServiceErr {
sr.collSchema = nil
rc.setCollectionID(-1)
} else {
rc.setCollectionID(1)
......@@ -657,7 +661,15 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
require.True(t, sr.hasSegment(1, true))
require.True(t, sr.hasSegment(2, true))
sr.mergeFlushedSegments(3, 1, 0, 100, []UniqueID{1, 2}, "channel", 15)
s := &Segment{
segmentID: 3,
collectionID: 1,
partitionID: 0,
channelName: "channel",
numRows: 15,
}
sr.mergeFlushedSegments(s, 100, []UniqueID{1, 2})
assert.True(t, sr.hasSegment(3, true))
assert.False(t, sr.hasSegment(1, true))
assert.False(t, sr.hasSegment(2, true))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册