未验证 提交 997e88d8 编写于 作者: Y yukun 提交者: GitHub

Add deleteMessage in flow_graph_insert_node (#9505)

Signed-off-by: Nfishpenguin <kun.yu@zilliz.com>
上级 0ea1cdb5
......@@ -35,6 +35,12 @@ type insertData struct {
insertOffset map[UniqueID]int64
}
type deleteData struct {
deleteIDs map[UniqueID][]int64
deleteTimestamps map[UniqueID][]Timestamp
deleteOffset map[UniqueID]int64
}
func (iNode *insertNode) Name() string {
return "iNode"
}
......@@ -165,6 +171,32 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.
wg.Done()
}
func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn("Cannot find segment:", zap.Int64("segmentID", segmentID))
return
}
if targetSegment.segmentType != segmentTypeGrowing {
return
}
ids := deleteData.deleteIDs[segmentID]
timestamps := deleteData.deleteTimestamps[segmentID]
offset := deleteData.deleteOffset[segmentID]
err = targetSegment.segmentDelete(offset, &ids, &timestamps)
if err != nil {
log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err))
return
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
}
func newInsertNode(replica ReplicaInterface) *insertNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -45,6 +45,25 @@ func genFlowGraphInsertData() (*insertData, error) {
return iData, nil
}
func genFlowGraphDeleteData() (*deleteData, error) {
deleteMsg, err := genSimpleDeleteMsg()
if err != nil {
return nil, err
}
dData := &deleteData{
deleteIDs: map[UniqueID][]UniqueID{
defaultSegmentID: deleteMsg.PrimaryKeys,
},
deleteTimestamps: map[UniqueID][]Timestamp{
defaultSegmentID: deleteMsg.Timestamps,
},
deleteOffset: map[UniqueID]int64{
defaultSegmentID: 0,
},
}
return dData, nil
}
func TestFlowGraphInsertNode_insert(t *testing.T) {
t.Run("test insert", func(t *testing.T) {
replica, err := genSimpleReplica()
......@@ -117,6 +136,84 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
})
}
func TestFlowGraphInsertNode_delete(t *testing.T) {
t.Run("test insert and delete", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
insertData, err := genFlowGraphInsertData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(insertData, defaultSegmentID, wg)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg.Add(1)
insertNode.delete(deleteData, defaultSegmentID, wg)
})
t.Run("test only delete", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.delete(deleteData, defaultSegmentID, wg)
})
t.Run("test segment delete error", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
err = replica.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2]
insertNode.delete(deleteData, defaultSegmentID, wg)
})
t.Run("test no target segment", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.delete(nil, defaultSegmentID, wg)
})
}
func TestFlowGraphInsertNode_operate(t *testing.T) {
t.Run("test operate", func(t *testing.T) {
replica, err := genSimpleReplica()
......
......@@ -71,7 +71,10 @@ const (
defaultPartitionName = "query-node-unittest-default-partition"
)
const defaultMsgLength = 100
const (
defaultMsgLength = 100
defaultDelLength = 10
)
const (
buildID = UniqueID(0)
......@@ -681,6 +684,15 @@ func genSimpleTimestampFieldData() []Timestamp {
return times
}
func genSimpleTimestampDeletedPK() []Timestamp {
times := make([]Timestamp, defaultDelLength)
for i := 0; i < defaultDelLength; i++ {
times[i] = Timestamp(i)
}
times[0] = 1
return times
}
func genSimpleRowIDField() []IntPrimaryKey {
ids := make([]IntPrimaryKey, defaultMsgLength)
for i := 0; i < defaultMsgLength; i++ {
......@@ -689,6 +701,14 @@ func genSimpleRowIDField() []IntPrimaryKey {
return ids
}
func genSimpleDeleteID() []IntPrimaryKey {
ids := make([]IntPrimaryKey, defaultDelLength)
for i := 0; i < defaultDelLength; i++ {
ids[0] = IntPrimaryKey(i)
}
return ids
}
func genMsgStreamBaseMsg() msgstream.BaseMsg {
return msgstream.BaseMsg{
HashValues: []uint32{0},
......@@ -725,6 +745,21 @@ func genSimpleInsertMsg() (*msgstream.InsertMsg, error) {
}, nil
}
func genSimpleDeleteMsg() (*msgstream.DeleteMsg, error) {
return &msgstream.DeleteMsg{
BaseMsg: genMsgStreamBaseMsg(),
DeleteRequest: internalpb.DeleteRequest{
Base: genCommonMsgBase(commonpb.MsgType_Delete),
CollectionName: defaultCollectionName,
PartitionName: defaultPartitionName,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
PrimaryKeys: genSimpleDeleteID(),
Timestamps: genSimpleTimestampDeletedPK(),
},
}, nil
}
// ---------- unittest util functions ----------
// functions of replica
func genSealedSegment(schemaForCreate *schemapb.CollectionSchema,
......
......@@ -598,6 +598,9 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
var sizeofPerRow = len((*records)[0].Value)
assert.Equal(nil, numOfRow, len(*records))
if numOfRow != len(*records) {
return errors.New("EntityIDs row num not equal to length of records")
}
var rawData = make([]byte, numOfRow*sizeofPerRow)
var copyOffset = 0
......@@ -651,6 +654,11 @@ func (s *Segment) segmentDelete(offset int64, entityIDs *[]UniqueID, timestamps
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
if len(*entityIDs) != len(*timestamps) {
return errors.New("Length of entityIDs not equal to length of timestamps")
}
var cOffset = C.long(offset)
var cSize = C.long(len(*entityIDs))
var cEntityIdsPtr = (*C.long)(&(*entityIDs)[0])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册