From 8daeb0d519675be2e1446b3b5a2a350aba4fce68 Mon Sep 17 00:00:00 2001 From: godchen Date: Tue, 19 Oct 2021 20:18:47 +0800 Subject: [PATCH] Change bloom filter use pk (#10193) Signed-off-by: godchen --- .../datanode/flow_graph_insert_buffer_node.go | 6 +- internal/datanode/mock_test.go | 13 +- internal/datanode/segment_replica.go | 104 +++++----- internal/datanode/segment_replica_test.go | 185 +++++++++--------- internal/kv/minio/minio_kv.go | 3 +- internal/querynode/flow_graph_insert_node.go | 85 ++++++++ .../querynode/flow_graph_insert_node_test.go | 8 + internal/querynode/mock_test.go | 24 ++- internal/querynode/segment.go | 14 +- internal/querynode/segment_loader.go | 16 +- internal/storage/data_codec.go | 2 +- internal/storage/stats.go | 7 +- internal/storage/stats_test.go | 4 +- 13 files changed, 299 insertions(+), 172 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 47de6ba2d..9962532a2 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -550,6 +550,10 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) } + if field.IsPrimaryKey { + // update segment pk filter + ibNode.replica.updateSegmentPKRange(currentSegID, fieldData.Data) + } case schemapb.DataType_Float: if _, ok := idata.Data[field.FieldID]; !ok { @@ -598,8 +602,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos // store current endPositions as Segment->EndPostion ibNode.replica.updateSegmentEndPosition(currentSegID, endPos) - // update segment pk filter - ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) return nil } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 0540cd91b..d5ad3f784 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -263,12 +263,13 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa IndexParams: []*commonpb.KeyValuePair{}, }, { - FieldID: 106, - Name: "int64_field", - Description: "field 106", - DataType: schemapb.DataType_Int64, - TypeParams: []*commonpb.KeyValuePair{}, - IndexParams: []*commonpb.KeyValuePair{}, + FieldID: 106, + Name: "int64_field", + Description: "field 106", + DataType: schemapb.DataType_Int64, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + IsPrimaryKey: true, }, { FieldID: 107, diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 0a4a9c339..5d20c0e8f 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -28,7 +28,6 @@ import ( "go.uber.org/zap" "github.com/bits-and-blooms/bloom/v3" - "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" @@ -103,16 +102,16 @@ type SegmentReplica struct { minIOKV kv.BaseKV } -func (s *Segment) updatePKRange(rowIDs []int64) { +func (s *Segment) updatePKRange(pks []int64) { buf := make([]byte, 8) - for _, rowID := range rowIDs { - binary.BigEndian.PutUint64(buf, uint64(rowID)) + for _, pk := range pks { + binary.BigEndian.PutUint64(buf, uint64(pk)) s.pkFilter.Add(buf) - if rowID > s.maxPK { - s.maxPK = rowID + if pk > s.maxPK { + s.maxPK = pk } - if rowID < s.minPK { - s.minPK = rowID + if pk < s.minPK { + s.minPK = pk } } } @@ -288,9 +287,6 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un // addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no // such segment by `hasSegment` func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error { - replica.segMu.Lock() - defer replica.segMu.Unlock() - if collID != replica.collectionID { log.Warn("Mismatch collection", zap.Int64("input ID", collID), @@ -319,47 +315,24 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu minPK: math.MaxInt64, // use max value, represents no value maxPK: math.MinInt64, // use min value represents no value } - - p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField)) - keys, values, err := replica.minIOKV.LoadWithPrefix(p) + err := replica.initPKBloomFilter(seg) if err != nil { return err } - blobs := make([]*Blob, 0) - for i := 0; i < len(keys); i++ { - blobs = append(blobs, &Blob{Key: keys[i], Value: []byte(values[i])}) - } - - stats, err := storage.DeserializeStats(blobs) - if err != nil { - return err - } - for _, stat := range stats { - err = seg.pkFilter.Merge(stat.BF) - if err != nil { - return err - } - if seg.minPK > stat.Min { - seg.minPK = stat.Min - } - - if seg.maxPK < stat.Max { - seg.maxPK = stat.Max - } - } seg.isNew.Store(false) seg.isFlushed.Store(false) + replica.segMu.Lock() replica.normalSegments[segID] = seg + replica.segMu.Unlock() + return nil } // addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no // such segment by `hasSegment` func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64) error { - replica.segMu.Lock() - defer replica.segMu.Unlock() if collID != replica.collectionID { log.Warn("Mismatch collection", @@ -388,7 +361,36 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq maxPK: math.MinInt64, // use min value represents no value } - p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(collID, partitionID, segID, common.RowIDField)) + err := replica.initPKBloomFilter(seg) + if err != nil { + return err + } + + seg.isNew.Store(false) + seg.isFlushed.Store(true) + + replica.segMu.Lock() + replica.flushedSegments[segID] = seg + replica.segMu.Unlock() + + return nil +} + +func (replica *SegmentReplica) initPKBloomFilter(s *Segment) error { + schema, err := replica.getCollectionSchema(s.collectionID, 0) + if err != nil { + return err + } + + pkField := int64(-1) + for _, field := range schema.Fields { + if field.IsPrimaryKey { + pkField = field.FieldID + break + } + } + + p := path.Join(Params.StatsBinlogRootPath, JoinIDPath(s.collectionID, s.partitionID, s.segmentID, pkField)) keys, values, err := replica.minIOKV.LoadWithPrefix(p) if err != nil { return err @@ -403,23 +405,18 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq return err } for _, stat := range stats { - err = seg.pkFilter.Merge(stat.BF) + err = s.pkFilter.Merge(stat.BF) if err != nil { return err } - if seg.minPK > stat.Min { - seg.minPK = stat.Min + if s.minPK > stat.Min { + s.minPK = stat.Min } - if seg.maxPK < stat.Max { - seg.maxPK = stat.Max + if s.maxPK < stat.Max { + s.maxPK = stat.Max } } - - seg.isNew.Store(false) - seg.isFlushed.Store(true) - - replica.flushedSegments[segID] = seg return nil } @@ -481,19 +478,19 @@ func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos * log.Warn("No match segment", zap.Int64("ID", segID)) } -func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, rowIDs []int64) { +func (replica *SegmentReplica) updateSegmentPKRange(segID UniqueID, pks []int64) { replica.segMu.Lock() defer replica.segMu.Unlock() seg, ok := replica.newSegments[segID] if ok { - seg.updatePKRange(rowIDs) + seg.updatePKRange(pks) return } seg, ok = replica.normalSegments[segID] if ok { - seg.updatePKRange(rowIDs) + seg.updatePKRange(pks) return } @@ -575,9 +572,6 @@ func (replica *SegmentReplica) getCollectionID() UniqueID { // getCollectionSchema gets collection schema from rootcoord for a certain timestamp. // If you want the latest collection schema, ts should be 0. func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { - replica.segMu.Lock() - defer replica.segMu.Unlock() - if !replica.validCollection(collID) { log.Warn("Mismatch collection for the replica", zap.Int64("Want", replica.collectionID), diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index 5785748a6..a4a4b983a 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -561,99 +561,100 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) { assert.NotNil(to, err) }) - te.Run("Test inner function segment", func(t *testing.T) { - collID := UniqueID(1) - replica, err := newReplica(context.Background(), rc, collID) - assert.Nil(t, err) - replica.minIOKV = &mockMinioKV{} - assert.False(t, replica.hasSegment(0, true)) - assert.False(t, replica.hasSegment(0, false)) - - startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)} - endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)} - err = replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos) - assert.NoError(t, err) - assert.True(t, replica.hasSegment(0, true)) - assert.Equal(t, 1, len(replica.newSegments)) - - seg, ok := replica.newSegments[UniqueID(0)] - assert.True(t, ok) - require.NotNil(t, seg) - assert.Equal(t, UniqueID(0), seg.segmentID) - assert.Equal(t, UniqueID(1), seg.collectionID) - assert.Equal(t, UniqueID(2), seg.partitionID) - assert.Equal(t, "insert-01", seg.channelName) - assert.Equal(t, Timestamp(100), seg.startPos.Timestamp) - assert.Equal(t, Timestamp(200), seg.endPos.Timestamp) - assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName) - assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp) - assert.Equal(t, int64(0), seg.numRows) - assert.True(t, seg.isNew.Load().(bool)) - assert.False(t, seg.isFlushed.Load().(bool)) - - replica.updateStatistics(0, 10) - assert.Equal(t, int64(10), seg.numRows) - - cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} - cp := &segmentCheckPoint{int64(10), *cpPos} - err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp) - assert.NoError(t, err) - assert.True(t, replica.hasSegment(1, true)) - assert.Equal(t, 1, len(replica.normalSegments)) - seg, ok = replica.normalSegments[UniqueID(1)] - assert.True(t, ok) - require.NotNil(t, seg) - assert.Equal(t, UniqueID(1), seg.segmentID) - assert.Equal(t, UniqueID(1), seg.collectionID) - assert.Equal(t, UniqueID(2), seg.partitionID) - assert.Equal(t, "insert-01", seg.channelName) - assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName) - assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp) - assert.Equal(t, int64(10), seg.numRows) - assert.False(t, seg.isNew.Load().(bool)) - assert.False(t, seg.isFlushed.Load().(bool)) - - err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), &segmentCheckPoint{}) - assert.Error(t, err) - - replica.updateStatistics(1, 10) - assert.Equal(t, int64(20), seg.numRows) - - segPos := replica.listNewSegmentsStartPositions() - assert.Equal(t, 1, len(segPos)) - assert.Equal(t, UniqueID(0), segPos[0].SegmentID) - assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName) - assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp) - - assert.Equal(t, 0, len(replica.newSegments)) - assert.Equal(t, 2, len(replica.normalSegments)) - - cps := replica.listSegmentsCheckPoints() - assert.Equal(t, 2, len(cps)) - assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp) - assert.Equal(t, int64(0), cps[UniqueID(0)].numRows) - assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp) - assert.Equal(t, int64(10), cps[UniqueID(1)].numRows) - - updates, err := replica.getSegmentStatisticsUpdates(0) - assert.NoError(t, err) - assert.Equal(t, int64(10), updates.NumRows) - - updates, err = replica.getSegmentStatisticsUpdates(1) - assert.NoError(t, err) - assert.Equal(t, int64(20), updates.NumRows) - - replica.updateSegmentCheckPoint(0) - assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows) - replica.updateSegmentCheckPoint(1) - assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) - - err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0)) - assert.Nil(t, err) +} +func TestInnerFunctionSegment(t *testing.T) { + rc := &RootCoordFactory{} + collID := UniqueID(1) + replica, err := newReplica(context.Background(), rc, collID) + assert.Nil(t, err) + replica.minIOKV = &mockMinioKV{} + assert.False(t, replica.hasSegment(0, true)) + assert.False(t, replica.hasSegment(0, false)) + + startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)} + endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)} + err = replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos) + assert.NoError(t, err) + assert.True(t, replica.hasSegment(0, true)) + assert.Equal(t, 1, len(replica.newSegments)) + + seg, ok := replica.newSegments[UniqueID(0)] + assert.True(t, ok) + require.NotNil(t, seg) + assert.Equal(t, UniqueID(0), seg.segmentID) + assert.Equal(t, UniqueID(1), seg.collectionID) + assert.Equal(t, UniqueID(2), seg.partitionID) + assert.Equal(t, "insert-01", seg.channelName) + assert.Equal(t, Timestamp(100), seg.startPos.Timestamp) + assert.Equal(t, Timestamp(200), seg.endPos.Timestamp) + assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName) + assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp) + assert.Equal(t, int64(0), seg.numRows) + assert.True(t, seg.isNew.Load().(bool)) + assert.False(t, seg.isFlushed.Load().(bool)) + + replica.updateStatistics(0, 10) + assert.Equal(t, int64(10), seg.numRows) + + cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} + cp := &segmentCheckPoint{int64(10), *cpPos} + err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp) + assert.NoError(t, err) + assert.True(t, replica.hasSegment(1, true)) + assert.Equal(t, 1, len(replica.normalSegments)) + seg, ok = replica.normalSegments[UniqueID(1)] + assert.True(t, ok) + require.NotNil(t, seg) + assert.Equal(t, UniqueID(1), seg.segmentID) + assert.Equal(t, UniqueID(1), seg.collectionID) + assert.Equal(t, UniqueID(2), seg.partitionID) + assert.Equal(t, "insert-01", seg.channelName) + assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName) + assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp) + assert.Equal(t, int64(10), seg.numRows) + assert.False(t, seg.isNew.Load().(bool)) + assert.False(t, seg.isFlushed.Load().(bool)) + + err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), &segmentCheckPoint{}) + assert.Error(t, err) + + replica.updateStatistics(1, 10) + assert.Equal(t, int64(20), seg.numRows) + + segPos := replica.listNewSegmentsStartPositions() + assert.Equal(t, 1, len(segPos)) + assert.Equal(t, UniqueID(0), segPos[0].SegmentID) + assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName) + assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp) + + assert.Equal(t, 0, len(replica.newSegments)) + assert.Equal(t, 2, len(replica.normalSegments)) + + cps := replica.listSegmentsCheckPoints() + assert.Equal(t, 2, len(cps)) + assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp) + assert.Equal(t, int64(0), cps[UniqueID(0)].numRows) + assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp) + assert.Equal(t, int64(10), cps[UniqueID(1)].numRows) + + updates, err := replica.getSegmentStatisticsUpdates(0) + assert.NoError(t, err) + assert.Equal(t, int64(10), updates.NumRows) + + updates, err = replica.getSegmentStatisticsUpdates(1) + assert.NoError(t, err) + assert.Equal(t, int64(20), updates.NumRows) + + replica.updateSegmentCheckPoint(0) + assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows) + replica.updateSegmentCheckPoint(1) + assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) + + err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0)) + assert.Nil(t, err) - totalSegments := replica.filterSegments("insert-01", 0) - assert.Equal(t, len(totalSegments), 3) - }) + totalSegments := replica.filterSegments("insert-01", 0) + assert.Equal(t, len(totalSegments), 3) } func TestSegmentReplica_UpdatePKRange(t *testing.T) { diff --git a/internal/kv/minio/minio_kv.go b/internal/kv/minio/minio_kv.go index a8aef4e6d..ea92b5423 100644 --- a/internal/kv/minio/minio_kv.go +++ b/internal/kv/minio/minio_kv.go @@ -105,7 +105,8 @@ func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) { } objectsValues, err := kv.MultiLoad(objectsKeys) if err != nil { - log.Debug("MinIO", zap.String("cannot load value with prefix:%s", key)) + log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err)) + return nil, nil, err } return objectsKeys, objectsValues, nil diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 800032188..b6f22c5ac 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -12,15 +12,20 @@ package querynode import ( + "bytes" "encoding/binary" "fmt" + "io" + "strconv" "sync" "github.com/opentracing/opentracing-go" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/trace" ) @@ -35,6 +40,7 @@ type insertData struct { insertTimestamps map[UniqueID][]Timestamp insertRecords map[UniqueID][]*commonpb.Blob insertOffset map[UniqueID]int64 + insertPKs map[UniqueID][]int64 } type deleteData struct { @@ -66,6 +72,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { insertTimestamps: make(map[UniqueID][]Timestamp), insertRecords: make(map[UniqueID][]*commonpb.Blob), insertOffset: make(map[UniqueID]int64), + insertPKs: make(map[UniqueID][]int64), } if iMsg == nil { @@ -102,6 +109,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { iData.insertIDs[task.SegmentID] = append(iData.insertIDs[task.SegmentID], task.RowIDs...) iData.insertTimestamps[task.SegmentID] = append(iData.insertTimestamps[task.SegmentID], task.Timestamps...) iData.insertRecords[task.SegmentID] = append(iData.insertRecords[task.SegmentID], task.RowData...) + iData.insertPKs[task.SegmentID] = iNode.getPrimaryKeys(task) } // 2. do preInsert @@ -119,6 +127,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } iData.insertOffset[segmentID] = offset log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID)) + targetSegment.updateBloomFilter(iData.insertPKs[segmentID]) } } @@ -291,6 +300,82 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg * log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) } +func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { + if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { + log.Warn("misaligned messages detected") + return nil + } + collectionID := msg.GetCollectionID() + + collection, err := iNode.replica.getCollectionByID(collectionID) + if err != nil { + log.Warn("collection cannot be found") + return nil + } + + offset := 0 + for _, field := range collection.schema.Fields { + if field.IsPrimaryKey { + break + } + switch field.DataType { + case schemapb.DataType_Bool: + offset++ + case schemapb.DataType_Int8: + offset++ + case schemapb.DataType_Int16: + offset += 2 + case schemapb.DataType_Int32: + offset += 4 + case schemapb.DataType_Int64: + offset += 8 + case schemapb.DataType_Float: + offset += 4 + case schemapb.DataType_Double: + offset += 8 + case schemapb.DataType_FloatVector: + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err := strconv.Atoi(t.Value) + if err != nil { + log.Error("strconv wrong on get dim", zap.Error(err)) + break + } + offset += dim * 4 + break + } + } + case schemapb.DataType_BinaryVector: + var dim int + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err = strconv.Atoi(t.Value) + if err != nil { + log.Error("strconv wrong on get dim", zap.Error(err)) + return nil + } + offset += dim / 8 + break + } + } + } + } + + blobReaders := make([]io.Reader, len(msg.RowData)) + for i, blob := range msg.RowData { + blobReaders[i] = bytes.NewReader(blob.GetValue()[offset : offset+8]) + } + pks := make([]int64, len(blobReaders)) + + for i, reader := range blobReaders { + err = binary.Read(reader, binary.LittleEndian, &pks[i]) + if err != nil { + log.Warn("binary read blob value failed", zap.Error(err)) + } + } + + return pks +} func newInsertNode(replica ReplicaInterface) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index 5440468b7..c7c98556a 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -245,6 +245,14 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { } msg := []flowgraph.Msg{&iMsg} insertNode.Operate(msg) + s, err := replica.getSegmentByID(defaultSegmentID) + assert.Nil(t, err) + buf := make([]byte, 8) + for i := 0; i < defaultMsgLength; i++ { + binary.BigEndian.PutUint64(buf, uint64(i)) + assert.True(t, s.pkFilter.Test(buf)) + } + }) t.Run("test invalid partitionID", func(t *testing.T) { diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index c60f10ef1..ea9a4d6e3 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -49,6 +49,7 @@ const ( defaultVecFieldName = "vec" defaultConstFieldName = "const" + defaultPKFieldName = "pk" defaultTopK = int64(10) defaultRoundDecimal = int64(6) defaultDim = 128 @@ -108,6 +109,11 @@ var simpleConstField = constFieldParam{ dataType: schemapb.DataType_Int32, } +var simplePKField = constFieldParam{ + id: 102, + dataType: schemapb.DataType_Int64, +} + var uidField = constFieldParam{ id: rowIDFieldID, dataType: schemapb.DataType_Int64, @@ -128,6 +134,16 @@ func genConstantField(param constFieldParam) *schemapb.FieldSchema { return field } +func genPKField(param constFieldParam) *schemapb.FieldSchema { + field := &schemapb.FieldSchema{ + FieldID: param.id, + Name: defaultPKFieldName, + IsPrimaryKey: true, + DataType: param.dataType, + } + return field +} + func genFloatVectorField(param vecFieldParam) *schemapb.FieldSchema { fieldVec := &schemapb.FieldSchema{ FieldID: param.id, @@ -284,13 +300,15 @@ func generateIndex(segmentID UniqueID) ([]string, error) { func genSimpleSegCoreSchema() *schemapb.CollectionSchema { fieldVec := genFloatVectorField(simpleVecField) fieldInt := genConstantField(simpleConstField) + fieldPK := genPKField(simplePKField) schema := schemapb.CollectionSchema{ // schema for segCore Name: defaultCollectionName, - AutoID: true, + AutoID: false, Fields: []*schemapb.FieldSchema{ fieldVec, fieldInt, + fieldPK, }, } return &schema @@ -580,6 +598,10 @@ func genCommonBlob(msgLength int, schema *schemapb.CollectionSchema) ([]*commonp bs := make([]byte, 4) binary.LittleEndian.PutUint32(bs, uint32(i)) rawData = append(rawData, bs...) + case schemapb.DataType_Int64: + bs := make([]byte, 8) + binary.LittleEndian.PutUint32(bs, uint32(i)) + rawData = append(rawData, bs...) case schemapb.DataType_FloatVector: dim := simpleVecField.dim // if no dim specified, use simpleVecField's dim for _, p := range f.TypeParams { diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 91d5625e4..47bd0d5d1 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -541,6 +541,14 @@ func (s *Segment) checkIndexReady(fieldID int64) bool { return s.indexInfos[fieldID].getReadyLoad() } +func (s *Segment) updateBloomFilter(pks []int64) { + buf := make([]byte, 8) + for _, pk := range pks { + binary.BigEndian.PutUint64(buf, uint64(pk)) + s.pkFilter.Add(buf) + } +} + //-------------------------------------------------------------------------------------- interfaces for growing segment func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) { /* @@ -601,12 +609,6 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps return errors.New("null seg core pointer") } - for _, id := range *entityIDs { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(id)) - s.pkFilter.Add(b) - } - // Blobs to one big blob var numOfRow = len(*entityIDs) var sizeofPerRow = len((*records)[0].Value) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 03970d61c..234284e56 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -322,8 +322,20 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog } func (loader *segmentLoader) loadSegmentBloomFilter(segment *Segment) error { // Todo: get path from etcd - p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, common.RowIDField)) - keys, values, err := loader.minioKV.LoadWithPrefix(p) + collection, err := loader.historicalReplica.getCollectionByID(segment.collectionID) + if err != nil { + return err + } + pkField := int64(-1) + for _, field := range collection.schema.Fields { + if field.IsPrimaryKey { + pkField = field.FieldID + break + } + } + + p := path.Join("files/stats_log", JoinIDPath(segment.collectionID, segment.partitionID, segment.segmentID, pkField)) + keys, values, err := loader.minioKV.LoadWithPrefix(p + "/") if err != nil { return err } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 425278a2a..c908ef569 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -306,7 +306,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique statsWriter := &StatsWriter{} switch field.DataType { case schemapb.DataType_Int64: - err = statsWriter.StatsInt64(field.FieldID, singleData.(*Int64FieldData).Data) + err = statsWriter.StatsInt64(field.FieldID, field.IsPrimaryKey, singleData.(*Int64FieldData).Data) } if err != nil { return nil, nil, err diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 3f008c7de..b9caf5354 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -16,7 +16,6 @@ import ( "encoding/json" "github.com/bits-and-blooms/bloom/v3" - "github.com/milvus-io/milvus/internal/common" ) const ( @@ -43,7 +42,7 @@ func (sw *StatsWriter) GetBuffer() []byte { return sw.buffer } -func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error { +func (sw *StatsWriter) StatsInt64(fieldID int64, isPrimaryKey bool, msgs []int64) error { if len(msgs) < 1 { // return error: msgs must has one element at least return nil @@ -53,9 +52,9 @@ func (sw *StatsWriter) StatsInt64(fieldID int64, msgs []int64) error { FieldID: fieldID, Max: msgs[len(msgs)-1], Min: msgs[0], - BF: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), } - if fieldID == common.RowIDField { + if isPrimaryKey { + stats.BF = bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive) b := make([]byte, 8) for _, msg := range msgs { binary.LittleEndian.PutUint64(b, uint64(msg)) diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index 78c2869e1..7a68ce7c5 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -23,7 +23,7 @@ import ( func TestStatsWriter_StatsInt64(t *testing.T) { data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9} sw := &StatsWriter{} - err := sw.StatsInt64(common.RowIDField, data) + err := sw.StatsInt64(common.RowIDField, true, data) assert.NoError(t, err) b := sw.GetBuffer() @@ -40,6 +40,6 @@ func TestStatsWriter_StatsInt64(t *testing.T) { } msgs := []int64{} - err = sw.StatsInt64(rootcoord.RowIDField, msgs) + err = sw.StatsInt64(rootcoord.RowIDField, true, msgs) assert.Nil(t, err) } -- GitLab