未验证 提交 8daeb0d5 编写于 作者: G godchen 提交者: GitHub

Change bloom filter use pk (#10193)

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 a58b0161
...@@ -550,6 +550,10 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos ...@@ -550,6 +550,10 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
} }
if field.IsPrimaryKey {
// update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, fieldData.Data)
}
case schemapb.DataType_Float: case schemapb.DataType_Float:
if _, ok := idata.Data[field.FieldID]; !ok { if _, ok := idata.Data[field.FieldID]; !ok {
...@@ -598,8 +602,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos ...@@ -598,8 +602,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
// store current endPositions as Segment->EndPostion // store current endPositions as Segment->EndPostion
ibNode.replica.updateSegmentEndPosition(currentSegID, endPos) ibNode.replica.updateSegmentEndPosition(currentSegID, endPos)
// update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
return nil return nil
} }
......
...@@ -263,12 +263,13 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa ...@@ -263,12 +263,13 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
IndexParams: []*commonpb.KeyValuePair{}, IndexParams: []*commonpb.KeyValuePair{},
}, },
{ {
FieldID: 106, FieldID: 106,
Name: "int64_field", Name: "int64_field",
Description: "field 106", Description: "field 106",
DataType: schemapb.DataType_Int64, DataType: schemapb.DataType_Int64,
TypeParams: []*commonpb.KeyValuePair{}, TypeParams: []*commonpb.KeyValuePair{},
IndexParams: []*commonpb.KeyValuePair{}, IndexParams: []*commonpb.KeyValuePair{},
IsPrimaryKey: true,
}, },
{ {
FieldID: 107, FieldID: 107,
......
...@@ -28,7 +28,6 @@ import ( ...@@ -28,7 +28,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/bits-and-blooms/bloom/v3" "github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio" miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
...@@ -103,16 +102,16 @@ type SegmentReplica struct { ...@@ -103,16 +102,16 @@ type SegmentReplica struct {
minIOKV kv.BaseKV minIOKV kv.BaseKV
} }
func (s *Segment) updatePKRange(rowIDs []int64) { func (s *Segment) updatePKRange(pks []int64) {
buf := make([]byte, 8) buf := make([]byte, 8)
for _, rowID := range rowIDs { for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(rowID)) binary.BigEndian.PutUint64(buf, uint64(pk))
s.pkFilter.Add(buf) s.pkFilter.Add(buf)
if rowID > s.maxPK { if pk > s.maxPK {
s.maxPK = rowID s.maxPK = pk
} }
if rowID < s.minPK { if pk < s.minPK {
s.minPK = rowID s.minPK = pk
} }
} }
} }
...@@ -288,9 +287,6 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un ...@@ -288,9 +287,6 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un
// addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no // addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no
// such segment by `hasSegment` // such segment by `hasSegment`
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error { func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if collID != replica.collectionID { if collID != replica.collectionID {
log.Warn("Mismatch collection", log.Warn("Mismatch collection",
zap.Int64("input ID", collID), zap.Int64("input ID", collID),
...@@ -319,47 +315,24 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu ...@@ -319,47 +315,24 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
minPK: math.MaxInt64, // use max value, represents no value minPK: math.MaxInt64, // use max value, represents no value
maxPK: math.MinInt64, // use min value represents no value maxPK: math.MinInt64, // use min value represents no value
} }
err := replica.initPKBloomFilter(seg)
p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField))
keys, values, err := replica.minIOKV.LoadWithPrefix(p)
if err != nil { if err != nil {
return err return err
} }
blobs := make([]*Blob, 0)
for i := 0; i < len(keys); i++ {
blobs = append(blobs, &Blob{Key: keys[i], Value: []byte(values[i])})
}
stats, err := storage.DeserializeStats(blobs)
if err != nil {
return err
}
for _, stat := range stats {
err = seg.pkFilter.Merge(stat.BF)
if err != nil {
return err
}
if seg.minPK > stat.Min {
seg.minPK = stat.Min
}
if seg.maxPK < stat.Max {
seg.maxPK = stat.Max
}
}
seg.isNew.Store(false) seg.isNew.Store(false)
seg.isFlushed.Store(false) seg.isFlushed.Store(false)
replica.segMu.Lock()
replica.normalSegments[segID] = seg replica.normalSegments[segID] = seg
replica.segMu.Unlock()
return nil return nil
} }
// addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no // addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no
// such segment by `hasSegment` // such segment by `hasSegment`
func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64) error { func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if collID != replica.collectionID { if collID != replica.collectionID {
log.Warn("Mismatch collection", log.Warn("Mismatch collection",
...@@ -388,7 +361,36 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq ...@@ -388,7 +361,36 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
maxPK: math.MinInt64, // use min value represents no value maxPK: math.MinInt64, // use min value represents no value
} }
p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField)) err := replica.initPKBloomFilter(seg)
if err != nil {
return err
}
seg.isNew.Store(false)
seg.isFlushed.Store(true)
replica.segMu.Lock()
replica.flushedSegments[segID] = seg
replica.segMu.Unlock()
return nil
}
func (replica *SegmentReplica) initPKBloomFilter(s *Segment) error {
schema, err := replica.getCollectionSchema(s.collectionID, 0)
if err != nil {
return err
}
pkField := int64(-1)
for _, field := range schema.Fields {
if field.IsPrimaryKey {
pkField = field.FieldID
break
}
}
p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(s.collectionID, s.partitionID, s.segmentID, pkField))
keys, values, err := replica.minIOKV.LoadWithPrefix(p) keys, values, err := replica.minIOKV.LoadWithPrefix(p)
if err != nil { if err != nil {
return err return err
...@@ -403,23 +405,18 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq ...@@ -403,23 +405,18 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
return err return err
} }
for _, stat := range stats { for _, stat := range stats {
err = seg.pkFilter.Merge(stat.BF) err = s.pkFilter.Merge(stat.BF)
if err != nil { if err != nil {
return err return err
} }
if seg.minPK > stat.Min { if s.minPK > stat.Min {
seg.minPK = stat.Min s.minPK = stat.Min
} }
if seg.maxPK < stat.Max { if s.maxPK < stat.Max {
seg.maxPK = stat.Max s.maxPK = stat.Max
} }
} }
seg.isNew.Store(false)
seg.isFlushed.Store(true)
replica.flushedSegments[segID] = seg
return nil return nil
} }
...@@ -481,19 +478,19 @@ func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos * ...@@ -481,19 +478,19 @@ func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos *
log.Warn("No match segment", zap.Int64("ID", segID)) log.Warn("No match segment", zap.Int64("ID", segID))
} }
func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, rowIDs []int64) { func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, pks []int64) {
replica.segMu.Lock() replica.segMu.Lock()
defer replica.segMu.Unlock() defer replica.segMu.Unlock()
seg, ok := replica.newSegments[segID] seg, ok := replica.newSegments[segID]
if ok { if ok {
seg.updatePKRange(rowIDs) seg.updatePKRange(pks)
return return
} }
seg, ok = replica.normalSegments[segID] seg, ok = replica.normalSegments[segID]
if ok { if ok {
seg.updatePKRange(rowIDs) seg.updatePKRange(pks)
return return
} }
...@@ -575,9 +572,6 @@ func (replica *SegmentReplica) getCollectionID() UniqueID { ...@@ -575,9 +572,6 @@ func (replica *SegmentReplica) getCollectionID() UniqueID {
// getCollectionSchema gets collection schema from rootcoord for a certain timestamp. // getCollectionSchema gets collection schema from rootcoord for a certain timestamp.
// If you want the latest collection schema, ts should be 0. // If you want the latest collection schema, ts should be 0.
func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if !replica.validCollection(collID) { if !replica.validCollection(collID) {
log.Warn("Mismatch collection for the replica", log.Warn("Mismatch collection for the replica",
zap.Int64("Want", replica.collectionID), zap.Int64("Want", replica.collectionID),
......
...@@ -561,99 +561,100 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) { ...@@ -561,99 +561,100 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
assert.NotNil(to, err) assert.NotNil(to, err)
}) })
te.Run("Test inner function segment", func(t *testing.T) { }
collID := UniqueID(1) func TestInnerFunctionSegment(t *testing.T) {
replica, err := newReplica(context.Background(), rc, collID) rc := &RootCoordFactory{}
assert.Nil(t, err) collID := UniqueID(1)
replica.minIOKV = &mockMinioKV{} replica, err := newReplica(context.Background(), rc, collID)
assert.False(t, replica.hasSegment(0, true)) assert.Nil(t, err)
assert.False(t, replica.hasSegment(0, false)) replica.minIOKV = &mockMinioKV{}
assert.False(t, replica.hasSegment(0, true))
startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)} assert.False(t, replica.hasSegment(0, false))
endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)}
err = replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos) startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)}
assert.NoError(t, err) endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)}
assert.True(t, replica.hasSegment(0, true)) err = replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos)
assert.Equal(t, 1, len(replica.newSegments)) assert.NoError(t, err)
assert.True(t, replica.hasSegment(0, true))
seg, ok := replica.newSegments[UniqueID(0)] assert.Equal(t, 1, len(replica.newSegments))
assert.True(t, ok)
require.NotNil(t, seg) seg, ok := replica.newSegments[UniqueID(0)]
assert.Equal(t, UniqueID(0), seg.segmentID) assert.True(t, ok)
assert.Equal(t, UniqueID(1), seg.collectionID) require.NotNil(t, seg)
assert.Equal(t, UniqueID(2), seg.partitionID) assert.Equal(t, UniqueID(0), seg.segmentID)
assert.Equal(t, "insert-01", seg.channelName) assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, Timestamp(100), seg.startPos.Timestamp) assert.Equal(t, UniqueID(2), seg.partitionID)
assert.Equal(t, Timestamp(200), seg.endPos.Timestamp) assert.Equal(t, "insert-01", seg.channelName)
assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName) assert.Equal(t, Timestamp(100), seg.startPos.Timestamp)
assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp) assert.Equal(t, Timestamp(200), seg.endPos.Timestamp)
assert.Equal(t, int64(0), seg.numRows) assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName)
assert.True(t, seg.isNew.Load().(bool)) assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp)
assert.False(t, seg.isFlushed.Load().(bool)) assert.Equal(t, int64(0), seg.numRows)
assert.True(t, seg.isNew.Load().(bool))
replica.updateStatistics(0, 10) assert.False(t, seg.isFlushed.Load().(bool))
assert.Equal(t, int64(10), seg.numRows)
replica.updateStatistics(0, 10)
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} assert.Equal(t, int64(10), seg.numRows)
cp := &segmentCheckPoint{int64(10), *cpPos}
err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp) cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
assert.NoError(t, err) cp := &segmentCheckPoint{int64(10), *cpPos}
assert.True(t, replica.hasSegment(1, true)) err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
assert.Equal(t, 1, len(replica.normalSegments)) assert.NoError(t, err)
seg, ok = replica.normalSegments[UniqueID(1)] assert.True(t, replica.hasSegment(1, true))
assert.True(t, ok) assert.Equal(t, 1, len(replica.normalSegments))
require.NotNil(t, seg) seg, ok = replica.normalSegments[UniqueID(1)]
assert.Equal(t, UniqueID(1), seg.segmentID) assert.True(t, ok)
assert.Equal(t, UniqueID(1), seg.collectionID) require.NotNil(t, seg)
assert.Equal(t, UniqueID(2), seg.partitionID) assert.Equal(t, UniqueID(1), seg.segmentID)
assert.Equal(t, "insert-01", seg.channelName) assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName) assert.Equal(t, UniqueID(2), seg.partitionID)
assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp) assert.Equal(t, "insert-01", seg.channelName)
assert.Equal(t, int64(10), seg.numRows) assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName)
assert.False(t, seg.isNew.Load().(bool)) assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp)
assert.False(t, seg.isFlushed.Load().(bool)) assert.Equal(t, int64(10), seg.numRows)
assert.False(t, seg.isNew.Load().(bool))
err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), &segmentCheckPoint{}) assert.False(t, seg.isFlushed.Load().(bool))
assert.Error(t, err)
err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), &segmentCheckPoint{})
replica.updateStatistics(1, 10) assert.Error(t, err)
assert.Equal(t, int64(20), seg.numRows)
replica.updateStatistics(1, 10)
segPos := replica.listNewSegmentsStartPositions() assert.Equal(t, int64(20), seg.numRows)
assert.Equal(t, 1, len(segPos))
assert.Equal(t, UniqueID(0), segPos[0].SegmentID) segPos := replica.listNewSegmentsStartPositions()
assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName) assert.Equal(t, 1, len(segPos))
assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp) assert.Equal(t, UniqueID(0), segPos[0].SegmentID)
assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName)
assert.Equal(t, 0, len(replica.newSegments)) assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp)
assert.Equal(t, 2, len(replica.normalSegments))
assert.Equal(t, 0, len(replica.newSegments))
cps := replica.listSegmentsCheckPoints() assert.Equal(t, 2, len(replica.normalSegments))
assert.Equal(t, 2, len(cps))
assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp) cps := replica.listSegmentsCheckPoints()
assert.Equal(t, int64(0), cps[UniqueID(0)].numRows) assert.Equal(t, 2, len(cps))
assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp) assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp)
assert.Equal(t, int64(10), cps[UniqueID(1)].numRows) assert.Equal(t, int64(0), cps[UniqueID(0)].numRows)
assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp)
updates, err := replica.getSegmentStatisticsUpdates(0) assert.Equal(t, int64(10), cps[UniqueID(1)].numRows)
assert.NoError(t, err)
assert.Equal(t, int64(10), updates.NumRows) updates, err := replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
updates, err = replica.getSegmentStatisticsUpdates(1) assert.Equal(t, int64(10), updates.NumRows)
assert.NoError(t, err)
assert.Equal(t, int64(20), updates.NumRows) updates, err = replica.getSegmentStatisticsUpdates(1)
assert.NoError(t, err)
replica.updateSegmentCheckPoint(0) assert.Equal(t, int64(20), updates.NumRows)
assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows)
replica.updateSegmentCheckPoint(1) replica.updateSegmentCheckPoint(0)
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows)
replica.updateSegmentCheckPoint(1)
err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0)) assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
assert.Nil(t, err)
err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0))
assert.Nil(t, err)
totalSegments := replica.filterSegments("insert-01", 0) totalSegments := replica.filterSegments("insert-01", 0)
assert.Equal(t, len(totalSegments), 3) assert.Equal(t, len(totalSegments), 3)
})
} }
func TestSegmentReplica_UpdatePKRange(t *testing.T) { func TestSegmentReplica_UpdatePKRange(t *testing.T) {
......
...@@ -105,7 +105,8 @@ func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) { ...@@ -105,7 +105,8 @@ func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) {
} }
objectsValues, err := kv.MultiLoad(objectsKeys) objectsValues, err := kv.MultiLoad(objectsKeys)
if err != nil { if err != nil {
log.Debug("MinIO", zap.String("cannot load value with prefix:%s", key)) log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err))
return nil, nil, err
} }
return objectsKeys, objectsValues, nil return objectsKeys, objectsValues, nil
......
...@@ -12,15 +12,20 @@ ...@@ -12,15 +12,20 @@
package querynode package querynode
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"strconv"
"sync" "sync"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/trace"
) )
...@@ -35,6 +40,7 @@ type insertData struct { ...@@ -35,6 +40,7 @@ type insertData struct {
insertTimestamps map[UniqueID][]Timestamp insertTimestamps map[UniqueID][]Timestamp
insertRecords map[UniqueID][]*commonpb.Blob insertRecords map[UniqueID][]*commonpb.Blob
insertOffset map[UniqueID]int64 insertOffset map[UniqueID]int64
insertPKs map[UniqueID][]int64
} }
type deleteData struct { type deleteData struct {
...@@ -66,6 +72,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -66,6 +72,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
insertTimestamps: make(map[UniqueID][]Timestamp), insertTimestamps: make(map[UniqueID][]Timestamp),
insertRecords: make(map[UniqueID][]*commonpb.Blob), insertRecords: make(map[UniqueID][]*commonpb.Blob),
insertOffset: make(map[UniqueID]int64), insertOffset: make(map[UniqueID]int64),
insertPKs: make(map[UniqueID][]int64),
} }
if iMsg == nil { if iMsg == nil {
...@@ -102,6 +109,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -102,6 +109,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...) iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...)
iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...) iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...)
iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...) iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...)
iData.insertPKs[task.SegmentID] = iNode.getPrimaryKeys(task)
} }
// 2. do preInsert // 2. do preInsert
...@@ -119,6 +127,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ...@@ -119,6 +127,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
} }
iData.insertOffset[segmentID] = offset iData.insertOffset[segmentID] = offset
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID)) log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
targetSegment.updateBloomFilter(iData.insertPKs[segmentID])
} }
} }
...@@ -291,6 +300,82 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg * ...@@ -291,6 +300,82 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
} }
func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Warn("misaligned messages detected")
return nil
}
collectionID := msg.GetCollectionID()
collection, err := iNode.replica.getCollectionByID(collectionID)
if err != nil {
log.Warn("collection cannot be found")
return nil
}
offset := 0
for _, field := range collection.schema.Fields {
if field.IsPrimaryKey {
break
}
switch field.DataType {
case schemapb.DataType_Bool:
offset++
case schemapb.DataType_Int8:
offset++
case schemapb.DataType_Int16:
offset += 2
case schemapb.DataType_Int32:
offset += 4
case schemapb.DataType_Int64:
offset += 8
case schemapb.DataType_Float:
offset += 4
case schemapb.DataType_Double:
offset += 8
case schemapb.DataType_FloatVector:
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
break
}
offset += dim * 4
break
}
}
case schemapb.DataType_BinaryVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
return nil
}
offset += dim / 8
break
}
}
}
}
blobReaders := make([]io.Reader, len(msg.RowData))
for i, blob := range msg.RowData {
blobReaders[i] = bytes.NewReader(blob.GetValue()[offset : offset+8])
}
pks := make([]int64, len(blobReaders))
for i, reader := range blobReaders {
err = binary.Read(reader, binary.LittleEndian, &pks[i])
if err != nil {
log.Warn("binary read blob value failed", zap.Error(err))
}
}
return pks
}
func newInsertNode(replica ReplicaInterface) *insertNode { func newInsertNode(replica ReplicaInterface) *insertNode {
maxQueueLength := Params.FlowGraphMaxQueueLength maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism maxParallelism := Params.FlowGraphMaxParallelism
......
...@@ -245,6 +245,14 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { ...@@ -245,6 +245,14 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
} }
msg := []flowgraph.Msg{&iMsg} msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg) insertNode.Operate(msg)
s, err := replica.getSegmentByID(defaultSegmentID)
assert.Nil(t, err)
buf := make([]byte, 8)
for i := 0; i < defaultMsgLength; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
assert.True(t, s.pkFilter.Test(buf))
}
}) })
t.Run("test invalid partitionID", func(t *testing.T) { t.Run("test invalid partitionID", func(t *testing.T) {
......
...@@ -49,6 +49,7 @@ const ( ...@@ -49,6 +49,7 @@ const (
defaultVecFieldName = "vec" defaultVecFieldName = "vec"
defaultConstFieldName = "const" defaultConstFieldName = "const"
defaultPKFieldName = "pk"
defaultTopK = int64(10) defaultTopK = int64(10)
defaultRoundDecimal = int64(6) defaultRoundDecimal = int64(6)
defaultDim = 128 defaultDim = 128
...@@ -108,6 +109,11 @@ var simpleConstField = constFieldParam{ ...@@ -108,6 +109,11 @@ var simpleConstField = constFieldParam{
dataType: schemapb.DataType_Int32, dataType: schemapb.DataType_Int32,
} }
var simplePKField = constFieldParam{
id: 102,
dataType: schemapb.DataType_Int64,
}
var uidField = constFieldParam{ var uidField = constFieldParam{
id: rowIDFieldID, id: rowIDFieldID,
dataType: schemapb.DataType_Int64, dataType: schemapb.DataType_Int64,
...@@ -128,6 +134,16 @@ func genConstantField(param constFieldParam) *schemapb.FieldSchema { ...@@ -128,6 +134,16 @@ func genConstantField(param constFieldParam) *schemapb.FieldSchema {
return field return field
} }
func genPKField(param constFieldParam) *schemapb.FieldSchema {
field := &schemapb.FieldSchema{
FieldID: param.id,
Name: defaultPKFieldName,
IsPrimaryKey: true,
DataType: param.dataType,
}
return field
}
func genFloatVectorField(param vecFieldParam) *schemapb.FieldSchema { func genFloatVectorField(param vecFieldParam) *schemapb.FieldSchema {
fieldVec := &schemapb.FieldSchema{ fieldVec := &schemapb.FieldSchema{
FieldID: param.id, FieldID: param.id,
...@@ -284,13 +300,15 @@ func generateIndex(segmentID UniqueID) ([]string, error) { ...@@ -284,13 +300,15 @@ func generateIndex(segmentID UniqueID) ([]string, error) {
func genSimpleSegCoreSchema() *schemapb.CollectionSchema { func genSimpleSegCoreSchema() *schemapb.CollectionSchema {
fieldVec := genFloatVectorField(simpleVecField) fieldVec := genFloatVectorField(simpleVecField)
fieldInt := genConstantField(simpleConstField) fieldInt := genConstantField(simpleConstField)
fieldPK := genPKField(simplePKField)
schema := schemapb.CollectionSchema{ // schema for segCore schema := schemapb.CollectionSchema{ // schema for segCore
Name: defaultCollectionName, Name: defaultCollectionName,
AutoID: true, AutoID: false,
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
fieldVec, fieldVec,
fieldInt, fieldInt,
fieldPK,
}, },
} }
return &schema return &schema
...@@ -580,6 +598,10 @@ func genCommonBlob(msgLength int, schema *schemapb.CollectionSchema) ([]*commonp ...@@ -580,6 +598,10 @@ func genCommonBlob(msgLength int, schema *schemapb.CollectionSchema) ([]*commonp
bs := make([]byte, 4) bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, uint32(i)) binary.LittleEndian.PutUint32(bs, uint32(i))
rawData = append(rawData, bs...) rawData = append(rawData, bs...)
case schemapb.DataType_Int64:
bs := make([]byte, 8)
binary.LittleEndian.PutUint32(bs, uint32(i))
rawData = append(rawData, bs...)
case schemapb.DataType_FloatVector: case schemapb.DataType_FloatVector:
dim := simpleVecField.dim // if no dim specified, use simpleVecField's dim dim := simpleVecField.dim // if no dim specified, use simpleVecField's dim
for _, p := range f.TypeParams { for _, p := range f.TypeParams {
......
...@@ -541,6 +541,14 @@ func (s *Segment) checkIndexReady(fieldID int64) bool { ...@@ -541,6 +541,14 @@ func (s *Segment) checkIndexReady(fieldID int64) bool {
return s.indexInfos[fieldID].getReadyLoad() return s.indexInfos[fieldID].getReadyLoad()
} }
func (s *Segment) updateBloomFilter(pks []int64) {
buf := make([]byte, 8)
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
s.pkFilter.Add(buf)
}
}
//-------------------------------------------------------------------------------------- interfaces for growing segment //-------------------------------------------------------------------------------------- interfaces for growing segment
func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) { func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
/* /*
...@@ -601,12 +609,6 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps ...@@ -601,12 +609,6 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
return errors.New("null seg core pointer") return errors.New("null seg core pointer")
} }
for _, id := range *entityIDs {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
s.pkFilter.Add(b)
}
// Blobs to one big blob // Blobs to one big blob
var numOfRow = len(*entityIDs) var numOfRow = len(*entityIDs)
var sizeofPerRow = len((*records)[0].Value) var sizeofPerRow = len((*records)[0].Value)
......
...@@ -322,8 +322,20 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog ...@@ -322,8 +322,20 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
} }
func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment) error { func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment) error {
// Todo: get path from etcd // Todo: get path from etcd
p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, common.RowIDField)) collection, err := loader.historicalReplica.getCollectionByID(segment.collectionID)
keys, values, err := loader.minioKV.LoadWithPrefix(p) if err != nil {
return err
}
pkField := int64(-1)
for _, field := range collection.schema.Fields {
if field.IsPrimaryKey {
pkField = field.FieldID
break
}
}
p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, pkField))
keys, values, err := loader.minioKV.LoadWithPrefix(p + "/")
if err != nil { if err != nil {
return err return err
} }
......
...@@ -306,7 +306,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique ...@@ -306,7 +306,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
statsWriter := &StatsWriter{} statsWriter := &StatsWriter{}
switch field.DataType { switch field.DataType {
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
err = statsWriter.StatsInt64(field.FieldID, singleData.(*Int64FieldData).Data) err = statsWriter.StatsInt64(field.FieldID, field.IsPrimaryKey, singleData.(*Int64FieldData).Data)
} }
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
......
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"encoding/json" "encoding/json"
"github.com/bits-and-blooms/bloom/v3" "github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
) )
const ( const (
...@@ -43,7 +42,7 @@ func (sw *StatsWriter) GetBuffer() []byte { ...@@ -43,7 +42,7 @@ func (sw *StatsWriter) GetBuffer() []byte {
return sw.buffer return sw.buffer
} }
func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error { func (sw *StatsWriter) StatsInt64(fieldID int64, isPrimaryKey bool, msgs []int64) error {
if len(msgs) < 1 { if len(msgs) < 1 {
// return error: msgs must has one element at least // return error: msgs must has one element at least
return nil return nil
...@@ -53,9 +52,9 @@ func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error { ...@@ -53,9 +52,9 @@ func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error {
FieldID: fieldID, FieldID: fieldID,
Max: msgs[len(msgs)-1], Max: msgs[len(msgs)-1],
Min: msgs[0], Min: msgs[0],
BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
} }
if fieldID == common.RowIDField { if isPrimaryKey {
stats.BF = bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive)
b := make([]byte, 8) b := make([]byte, 8)
for _, msg := range msgs { for _, msg := range msgs {
binary.LittleEndian.PutUint64(b, uint64(msg)) binary.LittleEndian.PutUint64(b, uint64(msg))
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
func TestStatsWriter_StatsInt64(t *testing.T) { func TestStatsWriter_StatsInt64(t *testing.T) {
data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9} data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}
sw := &StatsWriter{} sw := &StatsWriter{}
err := sw.StatsInt64(common.RowIDField, data) err := sw.StatsInt64(common.RowIDField, true, data)
assert.NoError(t, err) assert.NoError(t, err)
b := sw.GetBuffer() b := sw.GetBuffer()
...@@ -40,6 +40,6 @@ func TestStatsWriter_StatsInt64(t *testing.T) { ...@@ -40,6 +40,6 @@ func TestStatsWriter_StatsInt64(t *testing.T) {
} }
msgs := []int64{} msgs := []int64{}
err = sw.StatsInt64(rootcoord.RowIDField, msgs) err = sw.StatsInt64(rootcoord.RowIDField, true, msgs)
assert.Nil(t, err) assert.Nil(t, err)
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册