diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index b39e0d37191f8284eede391f5df239057e011809..a437634680e31cd16303a0637c5309e2e188df30 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -35,6 +35,12 @@ type insertData struct { insertOffset map[UniqueID]int64 } +type deleteData struct { + deleteIDs map[UniqueID][]int64 + deleteTimestamps map[UniqueID][]Timestamp + deleteOffset map[UniqueID]int64 +} + func (iNode *insertNode) Name() string { return "iNode" } @@ -165,6 +171,32 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync. wg.Done() } +func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { + defer wg.Done() + log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID)) + var targetSegment, err = iNode.replica.getSegmentByID(segmentID) + if err != nil { + log.Warn("Cannot find segment:", zap.Int64("segmentID", segmentID)) + return + } + + if targetSegment.segmentType != segmentTypeGrowing { + return + } + + ids := deleteData.deleteIDs[segmentID] + timestamps := deleteData.deleteTimestamps[segmentID] + offset := deleteData.deleteOffset[segmentID] + + err = targetSegment.segmentDelete(offset, &ids, ×tamps) + if err != nil { + log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err)) + return + } + + log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) +} + func newInsertNode(replica ReplicaInterface) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index e25b8114c5acf2379e4cdffbf750126e51fd00ce..1dffa27ae341ea7f5759d816790f25516a1e3f10 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -45,6 +45,25 @@ func genFlowGraphInsertData() (*insertData, error) { return iData, nil } +func genFlowGraphDeleteData() (*deleteData, error) { + deleteMsg, err := genSimpleDeleteMsg() + if err != nil { + return nil, err + } + dData := &deleteData{ + deleteIDs: map[UniqueID][]UniqueID{ + defaultSegmentID: deleteMsg.PrimaryKeys, + }, + deleteTimestamps: map[UniqueID][]Timestamp{ + defaultSegmentID: deleteMsg.Timestamps, + }, + deleteOffset: map[UniqueID]int64{ + defaultSegmentID: 0, + }, + } + return dData, nil +} + func TestFlowGraphInsertNode_insert(t *testing.T) { t.Run("test insert", func(t *testing.T) { replica, err := genSimpleReplica() @@ -117,6 +136,84 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { }) } +func TestFlowGraphInsertNode_delete(t *testing.T) { + t.Run("test insert and delete", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + insertData, err := genFlowGraphInsertData() + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.insert(insertData, defaultSegmentID, wg) + + deleteData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + wg.Add(1) + insertNode.delete(deleteData, defaultSegmentID, wg) + }) + + t.Run("test only delete", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + deleteData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.delete(deleteData, defaultSegmentID, wg) + }) + + t.Run("test segment delete error", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + deleteData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + wg := &sync.WaitGroup{} + wg.Add(1) + deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2] + insertNode.delete(deleteData, defaultSegmentID, wg) + }) + + t.Run("test no target segment", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.delete(nil, defaultSegmentID, wg) + }) +} + func TestFlowGraphInsertNode_operate(t *testing.T) { t.Run("test operate", func(t *testing.T) { replica, err := genSimpleReplica() diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 4c30c5dc28819a246e8d37a18f248237b0ea50cd..c60f10ef1fb3d44966576382efb30e0469cb0ead 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -71,7 +71,10 @@ const ( defaultPartitionName = "query-node-unittest-default-partition" ) -const defaultMsgLength = 100 +const ( + defaultMsgLength = 100 + defaultDelLength = 10 +) const ( buildID = UniqueID(0) @@ -681,6 +684,15 @@ func genSimpleTimestampFieldData() []Timestamp { return times } +func genSimpleTimestampDeletedPK() []Timestamp { + times := make([]Timestamp, defaultDelLength) + for i := 0; i < defaultDelLength; i++ { + times[i] = Timestamp(i) + } + times[0] = 1 + return times +} + func genSimpleRowIDField() []IntPrimaryKey { ids := make([]IntPrimaryKey, defaultMsgLength) for i := 0; i < defaultMsgLength; i++ { @@ -689,6 +701,14 @@ func genSimpleRowIDField() []IntPrimaryKey { return ids } +func genSimpleDeleteID() []IntPrimaryKey { + ids := make([]IntPrimaryKey, defaultDelLength) + for i := 0; i < defaultDelLength; i++ { + ids[0] = IntPrimaryKey(i) + } + return ids +} + func genMsgStreamBaseMsg() msgstream.BaseMsg { return msgstream.BaseMsg{ HashValues: []uint32{0}, @@ -725,6 +745,21 @@ func genSimpleInsertMsg() (*msgstream.InsertMsg, error) { }, nil } +func genSimpleDeleteMsg() (*msgstream.DeleteMsg, error) { + return &msgstream.DeleteMsg{ + BaseMsg: genMsgStreamBaseMsg(), + DeleteRequest: internalpb.DeleteRequest{ + Base: genCommonMsgBase(commonpb.MsgType_Delete), + CollectionName: defaultCollectionName, + PartitionName: defaultPartitionName, + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + PrimaryKeys: genSimpleDeleteID(), + Timestamps: genSimpleTimestampDeletedPK(), + }, + }, nil +} + // ---------- unittest util functions ---------- // functions of replica func genSealedSegment(schemaForCreate *schemapb.CollectionSchema, diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index f3c60f3473884c24123e2d5b37c04aab3c3e6968..578b8427cdd317dbc9d5d184b01b97c9e52faf6a 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -598,6 +598,9 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps var sizeofPerRow = len((*records)[0].Value) assert.Equal(nil, numOfRow, len(*records)) + if numOfRow != len(*records) { + return errors.New("EntityIDs row num not equal to length of records") + } var rawData = make([]byte, numOfRow*sizeofPerRow) var copyOffset = 0 @@ -651,6 +654,11 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps if s.segmentPtr == nil { return errors.New("null seg core pointer") } + + if len(*entityIDs) != len(*timestamps) { + return errors.New("Length of entityIDs not equal to length of timestamps") + } + var cOffset = C.long(offset) var cSize = C.long(len(*entityIDs)) var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])