未验证 提交 8542b266 编写于 作者: C congqixia 提交者: GitHub

Use normal msgstream instead of mqttmsgstream for delta channel (#25849)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 d32bf392
......@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"sync/atomic"
"github.com/milvus-io/milvus/internal/util/timerecord"
......@@ -29,6 +30,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
......@@ -220,13 +222,15 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
for i := int64(0); i < dmsg.NumRows; i++ {
dmsg.HashValues = append(dmsg.HashValues, uint32(0))
}
forwardMsgs = append(forwardMsgs, dmsg)
if dmsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid DeleteMsg, collection mis-match",
zap.Int64("Get collID", dmsg.CollectionID),
zap.Int64("Expected collID", ddn.collectionID))
continue
}
dmsg.ShardName = ddn.vChannelName
forwardMsgs = append(forwardMsgs, dmsg)
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
metrics.DataNodeConsumeBytesCount.
......@@ -300,26 +304,48 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool {
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
tr := timerecord.NewTimeRecorder("forwardDeleteMsg")
if len(msgs) != 0 {
var msgPack = msgstream.MsgPack{
Msgs: msgs,
BeginTs: minTs,
EndTs: maxTs,
}
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
if len(msgs) == 0 {
msgs = []msgstream.TsMsg{ddn.composeEmptyDeleteMsg(maxTs)}
}
// produce in timestamp order
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].EndTs() < msgs[j].EndTs()
})
var msgPack = msgstream.MsgPack{
Msgs: msgs,
BeginTs: minTs,
EndTs: maxTs,
}
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
metrics.DataNodeForwardDeleteMsgTimeTaken.
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).
Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
func (ddn *ddNode) composeEmptyDeleteMsg(ts Timestamp) *msgstream.DeleteMsg {
return &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{1},
},
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: commonpbutil.MsgIDNeedFill,
Timestamp: ts, // use msg pack end ts
SourceID: 1,
},
CollectionID: ddn.collectionID,
ShardName: ddn.vChannelName,
PrimaryKeys: &schemapb.IDs{},
},
}
}
func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
......
......@@ -378,6 +378,12 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
}
// empty delete message, use msg base specified timestamp
if deleteMsg.GetNumRows() == 0 {
deleteMsg.BeginTimestamp = deleteMsg.GetBase().GetTimestamp()
deleteMsg.EndTimestamp = deleteMsg.GetBase().GetTimestamp()
}
return deleteMsg, nil
}
......
......@@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
......@@ -312,6 +313,39 @@ func TestDeleteMsg(t *testing.T) {
assert.Equal(t, int64(3), deleteMsg2.SourceID())
}
func TestDeleteMsg_Unmarshal_empty(t *testing.T) {
deleteMsg := &DeleteMsg{
BaseMsg: generateBaseMsg(),
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
CollectionName: "test_collection",
ShardName: "test-channel",
Timestamps: []uint64{},
Int64PrimaryKeys: []int64{},
NumRows: 0,
},
}
bytes, err := deleteMsg.Marshal(deleteMsg)
require.NoError(t, err)
tsMsg, err := deleteMsg.Unmarshal(bytes)
assert.Nil(t, err)
deleteMsg2, ok := tsMsg.(*DeleteMsg)
assert.True(t, ok)
assert.Equal(t, int64(1), deleteMsg2.ID())
assert.Equal(t, commonpb.MsgType_Delete, deleteMsg2.Type())
assert.Equal(t, int64(3), deleteMsg2.SourceID())
assert.Equal(t, uint64(2), deleteMsg2.BeginTimestamp)
assert.Equal(t, uint64(2), deleteMsg2.EndTimestamp)
}
func TestDeleteMsg_Unmarshal_IllegalParameter(t *testing.T) {
deleteMsg := &DeleteMsg{}
tsMsg, err := deleteMsg.Unmarshal(10)
......
......@@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
......@@ -37,6 +38,7 @@ type filterDeleteNode struct {
collectionID UniqueID
metaReplica ReplicaInterface
vchannel Channel
dmlChannel Channel
}
// Name returns the name of filterDeleteNode
......@@ -93,6 +95,12 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.vchannel))
}
// before 2.2.12 Mqtt Msgstream
// MsgPack => (Ts1 Del1 Del2 Del3 Ts2)
// Empty MsgPack => (Ts1 Ts2)
// After 2.2.12 MqMsgStream
// MsgPack => DeleteMsg(del1) DeleteMsg(del2) DeleteMsg(del3)
// Empty MsgPack => DeleteMsg(empty)
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_Delete:
......@@ -105,7 +113,15 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
if resMsg != nil {
dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg)
// Use delete msg ts to update
dMsg.timeRange.timestampMin = resMsg.BeginTs()
dMsg.timeRange.timestampMax = resMsg.EndTs()
}
case commonpb.MsgType_TimeTick:
// Legacy data, tt, delmsg(valid), delmsg(invalid) tt
// advance tsafe directly
log.Warn("legacy timetick message received, advance timetick")
return []Msg{&dMsg}
default:
log.Warn("invalid message type in filterDeleteNode",
zap.String("message type", msg.Type().String()),
......@@ -114,6 +130,11 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
// all delete message do not belong this flowgraph, skip
if len(dMsg.deleteMessages) == 0 {
return []Msg{}
}
sort.Slice(dMsg.deleteMessages, func(i, j int) bool {
return dMsg.deleteMessages[i].BeginTs() < dMsg.deleteMessages[j].BeginTs()
})
......@@ -134,14 +155,24 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
return nil, nil
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.String("vchannel", fddNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID))
if msg.GetShardName() != "" && msg.GetShardName() != fddNode.dmlChannel {
log.Debug("filter delete message belong to other channel", zap.String("msgChannel", msg.GetShardName()), zap.String("fgChannel", fddNode.vchannel))
return nil, nil
}
// allow empty delete and by pass partition check
if len(msg.Timestamps) <= 0 {
return msg, nil
}
/*
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.String("vchannel", fddNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID))
return nil, nil
}*/
if loadType == loadTypePartition {
if !fddNode.metaReplica.hasPartition(msg.PartitionID) {
// filter out msg which not belongs to the loaded partitions
......@@ -152,7 +183,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
}
// newFilteredDeleteNode returns a new filterDeleteNode
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDeleteNode {
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) (*filterDeleteNode, error) {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
......@@ -160,10 +191,16 @@ func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID,
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
dmlChannelName, err := funcutil.ConvertChannelName(vchannel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml)
if err != nil {
return nil, err
}
return &filterDeleteNode{
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
vchannel: vchannel,
}
dmlChannel: dmlChannelName,
}, nil
}
......@@ -20,10 +20,12 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
......@@ -34,7 +36,7 @@ func getFilterDeleteNode() (*filterDeleteNode, error) {
}
historical.addExcludedSegments(defaultCollectionID, nil)
return newFilteredDeleteNode(historical, defaultCollectionID, defaultChannelName), nil
return newFilteredDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
}
func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) {
......@@ -73,11 +75,7 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
msg.NumRows = 0
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
assert.NoError(t, err)
assert.Nil(t, res)
msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{})
res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
assert.NoError(t, err)
assert.Nil(t, res)
assert.NotNil(t, res)
})
t.Run("test not target partition", func(t *testing.T) {
......@@ -141,3 +139,135 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) {
assert.NotNil(t, res)
})
}
type FgFilterDeleteNodeSuite struct {
suite.Suite
replica ReplicaInterface
node *filterDeleteNode
}
func (s *FgFilterDeleteNodeSuite) SetupTest() {
historical, err := genSimpleReplica()
s.Require().NoError(err)
historical.addExcludedSegments(defaultCollectionID, nil)
node, err := newFilteredDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
s.Require().NoError(err)
s.node = node
}
func (s *FgFilterDeleteNodeSuite) generateFilterDeleteMsg() []flowgraph.Msg {
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
return []flowgraph.Msg{msg}
}
func (s *FgFilterDeleteNodeSuite) TestNewNodeFail() {
_, err := newFilteredDeleteNode(s.replica, defaultCollectionID, "bad_channel")
s.Error(err)
}
func (s *FgFilterDeleteNodeSuite) TestOperate() {
s.Run("valid_msg", func() {
msgs := s.generateFilterDeleteMsg() //genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
out := s.node.Operate(msgs)
s.Equal(1, len(out))
})
s.Run("legacy_timetick", func() {
ttMsg := &msgstream.TimeTickMsg{
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
},
},
}
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{ttMsg}, 0, 1000, nil, nil)
out := s.node.Operate([]flowgraph.Msg{msg})
s.Equal(1, len(out))
})
s.Run("invalid_input_length", func() {
msgs := []flowgraph.Msg{
flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{}, 0, 1000, nil, nil),
flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{}, 0, 1000, nil, nil),
}
out := s.node.Operate(msgs)
s.Equal(0, len(out))
})
s.Run("filterInvalidDeleteMessage_failed", func() {
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
dMsg.NumRows = 0
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
m := []flowgraph.Msg{msg}
s.Panics(func() {
s.node.Operate(m)
})
})
s.Run("invalid_msgType", func() {
iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(), defaultDelLength)
s.Require().NoError(err)
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil)
res := s.node.Operate([]flowgraph.Msg{msg})
s.Equal(0, len(res))
})
}
func (s *FgFilterDeleteNodeSuite) TestFilterInvalidDeleteMessage() {
s.Run("valid_case", func() {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection)
s.NoError(err)
s.NotNil(res)
})
s.Run("msg_collection_not_match", func() {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
s.node.collectionID = UniqueID(1000)
defer func() { s.node.collectionID = defaultCollectionID }()
res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection)
s.NoError(err)
s.Nil(res)
})
s.Run("msg_shard_not_match", func() {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg.ShardName = "non_match_shard"
defer func() { s.node.collectionID = defaultCollectionID }()
res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection)
s.NoError(err)
s.Nil(res)
})
s.Run("delete_no_data", func() {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg.Timestamps = make([]Timestamp, 0)
msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0)
msg.PrimaryKeys = &schemapb.IDs{}
msg.NumRows = 0
res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection)
s.NoError(err)
s.NotNil(res)
})
s.Run("target_partition_not_match", func() {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
err := s.node.metaReplica.removePartition(defaultPartitionID)
s.Require().NoError(err)
res, err := s.node.filterInvalidDeleteMessage(msg, loadTypePartition)
s.NoError(err)
s.Nil(res)
})
}
func TestFlowGraphFilterDeleteNode(t *testing.T) {
suite.Run(t, new(FgFilterDeleteNodeSuite))
}
......@@ -131,11 +131,14 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx),
}
dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel)
dmStreamNode, err := q.newDeltaInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel)
if err != nil {
return nil, err
}
filterDeleteNode, err := newFilteredDeleteNode(metaReplica, collectionID, vchannel)
if err != nil {
return nil, err
}
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, vchannel)
deleteNode, err := newDeleteNode(metaReplica, collectionID, vchannel)
if err != nil {
return nil, err
......@@ -200,6 +203,22 @@ func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstre
return node, nil
}
func (q *queryNodeFlowGraph) newDeltaInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, deltaChannel Channel, dataType string) (*flowgraph.InputNode, error) {
deltaStream, err := factory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
q.dmlStream = deltaStream
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, deltaChannel)
node := flowgraph.NewInputNode(deltaStream, name, maxQueueLength, maxParallelism, typeutil.QueryNodeRole,
Params.QueryNodeCfg.GetNodeID(), collectionID, dataType)
return node, nil
}
// consumeFlowGraph would consume by channel and subName
func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSubName) error {
if q.dmlStream == nil {
......
......@@ -20,8 +20,11 @@ import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
......@@ -80,3 +83,62 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
fg.close()
}
type DeltaFlowGraphSuite struct {
suite.Suite
replica ReplicaInterface
tsafe TSafeReplicaInterface
}
func (s *DeltaFlowGraphSuite) SetupTest() {
var err error
s.replica, err = genSimpleReplica()
s.Require().NoError(err)
s.tsafe = newTSafeReplica()
}
func (s *DeltaFlowGraphSuite) TestNewFgFailed() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockFactory := msgstream.NewMockMqFactory()
mockFactory.NewMsgStreamFunc = func(ctx context.Context) (msgstream.MsgStream, error) {
return nil, errors.New("mock error")
}
_, err := newQueryNodeDeltaFlowGraph(ctx,
defaultCollectionID,
s.replica,
s.tsafe,
defaultDeltaChannel,
mockFactory,
)
s.Error(err)
}
func (s *DeltaFlowGraphSuite) TestConsumeFlowGraph() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fac := genFactory()
fg, err := newQueryNodeDeltaFlowGraph(ctx,
defaultCollectionID,
s.replica,
s.tsafe,
defaultDeltaChannel,
fac,
)
s.Require().NoError(err)
defer fg.close()
err = fg.consumeFlowGraph(defaultDeltaChannel, defaultSubName)
s.NoError(err)
}
func TestDeltaFlowGraph(t *testing.T) {
suite.Run(t, new(DeltaFlowGraphSuite))
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册