From 8542b2662980bc4203d802f57f63a09283dcdc83 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 24 Jul 2023 09:09:05 +0800 Subject: [PATCH] Use normal msgstream instead of mqttmsgstream for delta channel (#25849) Signed-off-by: Congqi Xia --- internal/datanode/flow_graph_dd_node.go | 50 ++++-- internal/mq/msgstream/msg.go | 6 + internal/mq/msgstream/msg_test.go | 34 +++++ .../flow_graph_filter_delete_node.go | 51 ++++++- .../flow_graph_filter_delete_node_test.go | 144 +++++++++++++++++- internal/querynode/flow_graph_query_node.go | 23 ++- .../querynode/flow_graph_query_node_test.go | 62 ++++++++ 7 files changed, 342 insertions(+), 28 deletions(-) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index edc9a753b..a9c09e86f 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "sort" "sync/atomic" "github.com/milvus-io/milvus/internal/util/timerecord" @@ -29,6 +30,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -220,13 +222,15 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { for i := int64(0); i < dmsg.NumRows; i++ { dmsg.HashValues = append(dmsg.HashValues, uint32(0)) } - forwardMsgs = append(forwardMsgs, dmsg) if dmsg.CollectionID != ddn.collectionID { log.Warn("filter invalid DeleteMsg, collection mis-match", zap.Int64("Get collID", dmsg.CollectionID), zap.Int64("Expected collID", ddn.collectionID)) continue } + dmsg.ShardName = ddn.vChannelName + forwardMsgs = append(forwardMsgs, dmsg) + rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest))) metrics.DataNodeConsumeBytesCount. @@ -300,26 +304,48 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool { func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error { tr := timerecord.NewTimeRecorder("forwardDeleteMsg") - if len(msgs) != 0 { - var msgPack = msgstream.MsgPack{ - Msgs: msgs, - BeginTs: minTs, - EndTs: maxTs, - } - if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { - return err - } + if len(msgs) == 0 { + msgs = []msgstream.TsMsg{ddn.composeEmptyDeleteMsg(maxTs)} + } + + // produce in timestamp order + sort.Slice(msgs, func(i, j int) bool { + return msgs[i].EndTs() < msgs[j].EndTs() + }) + + var msgPack = msgstream.MsgPack{ + Msgs: msgs, + BeginTs: minTs, + EndTs: maxTs, } - if err := ddn.sendDeltaTimeTick(maxTs); err != nil { + if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { return err } - metrics.DataNodeForwardDeleteMsgTimeTaken. WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())). Observe(float64(tr.ElapseSpan().Milliseconds())) return nil } +func (ddn *ddNode) composeEmptyDeleteMsg(ts Timestamp) *msgstream.DeleteMsg { + return &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{1}, + }, + DeleteRequest: internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: commonpbutil.MsgIDNeedFill, + Timestamp: ts, // use msg pack end ts + SourceID: 1, + }, + CollectionID: ddn.collectionID, + ShardName: ddn.vChannelName, + PrimaryKeys: &schemapb.IDs{}, + }, + } +} + func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ diff --git a/internal/mq/msgstream/msg.go b/internal/mq/msgstream/msg.go index 7eb755068..dc7a1c380 100644 --- a/internal/mq/msgstream/msg.go +++ b/internal/mq/msgstream/msg.go @@ -378,6 +378,12 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { } } + // empty delete message, use msg base specified timestamp + if deleteMsg.GetNumRows() == 0 { + deleteMsg.BeginTimestamp = deleteMsg.GetBase().GetTimestamp() + deleteMsg.EndTimestamp = deleteMsg.GetBase().GetTimestamp() + } + return deleteMsg, nil } diff --git a/internal/mq/msgstream/msg_test.go b/internal/mq/msgstream/msg_test.go index 27c651dda..3c5cdd9f0 100644 --- a/internal/mq/msgstream/msg_test.go +++ b/internal/mq/msgstream/msg_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -312,6 +313,39 @@ func TestDeleteMsg(t *testing.T) { assert.Equal(t, int64(3), deleteMsg2.SourceID()) } +func TestDeleteMsg_Unmarshal_empty(t *testing.T) { + deleteMsg := &DeleteMsg{ + BaseMsg: generateBaseMsg(), + DeleteRequest: internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: 1, + Timestamp: 2, + SourceID: 3, + }, + + CollectionName: "test_collection", + ShardName: "test-channel", + Timestamps: []uint64{}, + Int64PrimaryKeys: []int64{}, + NumRows: 0, + }, + } + bytes, err := deleteMsg.Marshal(deleteMsg) + require.NoError(t, err) + + tsMsg, err := deleteMsg.Unmarshal(bytes) + assert.Nil(t, err) + + deleteMsg2, ok := tsMsg.(*DeleteMsg) + assert.True(t, ok) + assert.Equal(t, int64(1), deleteMsg2.ID()) + assert.Equal(t, commonpb.MsgType_Delete, deleteMsg2.Type()) + assert.Equal(t, int64(3), deleteMsg2.SourceID()) + assert.Equal(t, uint64(2), deleteMsg2.BeginTimestamp) + assert.Equal(t, uint64(2), deleteMsg2.EndTimestamp) +} + func TestDeleteMsg_Unmarshal_IllegalParameter(t *testing.T) { deleteMsg := &DeleteMsg{} tsMsg, err := deleteMsg.Unmarshal(10) diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index c856d2c74..68abd25c5 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/trace" ) @@ -37,6 +38,7 @@ type filterDeleteNode struct { collectionID UniqueID metaReplica ReplicaInterface vchannel Channel + dmlChannel Channel } // Name returns the name of filterDeleteNode @@ -93,6 +95,12 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.vchannel)) } + // before 2.2.12 Mqtt Msgstream + // MsgPack => (Ts1 Del1 Del2 Del3 Ts2) + // Empty MsgPack => (Ts1 Ts2) + // After 2.2.12 MqMsgStream + // MsgPack => DeleteMsg(del1) DeleteMsg(del2) DeleteMsg(del3) + // Empty MsgPack => DeleteMsg(empty) for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_Delete: @@ -105,7 +113,15 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } if resMsg != nil { dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg) + // Use delete msg ts to update + dMsg.timeRange.timestampMin = resMsg.BeginTs() + dMsg.timeRange.timestampMax = resMsg.EndTs() } + case commonpb.MsgType_TimeTick: + // Legacy data, tt, delmsg(valid), delmsg(invalid) tt + // advance tsafe directly + log.Warn("legacy timetick message received, advance timetick") + return []Msg{&dMsg} default: log.Warn("invalid message type in filterDeleteNode", zap.String("message type", msg.Type().String()), @@ -114,6 +130,11 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } + // all delete message do not belong this flowgraph, skip + if len(dMsg.deleteMessages) == 0 { + return []Msg{} + } + sort.Slice(dMsg.deleteMessages, func(i, j int) bool { return dMsg.deleteMessages[i].BeginTs() < dMsg.deleteMessages[j].BeginTs() }) @@ -134,14 +155,24 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet return nil, nil } - if len(msg.Timestamps) <= 0 { - log.Debug("filter invalid delete message, no message", - zap.String("vchannel", fddNode.vchannel), - zap.Int64("collectionID", msg.CollectionID), - zap.Int64("partitionID", msg.PartitionID)) + if msg.GetShardName() != "" && msg.GetShardName() != fddNode.dmlChannel { + log.Debug("filter delete message belong to other channel", zap.String("msgChannel", msg.GetShardName()), zap.String("fgChannel", fddNode.vchannel)) return nil, nil } + // allow empty delete and by pass partition check + if len(msg.Timestamps) <= 0 { + return msg, nil + } + /* + if len(msg.Timestamps) <= 0 { + log.Debug("filter invalid delete message, no message", + zap.String("vchannel", fddNode.vchannel), + zap.Int64("collectionID", msg.CollectionID), + zap.Int64("partitionID", msg.PartitionID)) + return nil, nil + }*/ + if loadType == loadTypePartition { if !fddNode.metaReplica.hasPartition(msg.PartitionID) { // filter out msg which not belongs to the loaded partitions @@ -152,7 +183,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet } // newFilteredDeleteNode returns a new filterDeleteNode -func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDeleteNode { +func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) (*filterDeleteNode, error) { maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism @@ -160,10 +191,16 @@ func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) + dmlChannelName, err := funcutil.ConvertChannelName(vchannel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml) + if err != nil { + return nil, err + } + return &filterDeleteNode{ baseNode: baseNode, collectionID: collectionID, metaReplica: metaReplica, vchannel: vchannel, - } + dmlChannel: dmlChannelName, + }, nil } diff --git a/internal/querynode/flow_graph_filter_delete_node_test.go b/internal/querynode/flow_graph_filter_delete_node_test.go index 0cd42a508..8b44c8668 100644 --- a/internal/querynode/flow_graph_filter_delete_node_test.go +++ b/internal/querynode/flow_graph_filter_delete_node_test.go @@ -20,10 +20,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mq/msgstream" - "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -34,7 +36,7 @@ func getFilterDeleteNode() (*filterDeleteNode, error) { } historical.addExcludedSegments(defaultCollectionID, nil) - return newFilteredDeleteNode(historical, defaultCollectionID, defaultChannelName), nil + return newFilteredDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) } func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) { @@ -73,11 +75,7 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { msg.NumRows = 0 res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) - assert.Nil(t, res) - msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{}) - res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection) - assert.NoError(t, err) - assert.Nil(t, res) + assert.NotNil(t, res) }) t.Run("test not target partition", func(t *testing.T) { @@ -141,3 +139,135 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) { assert.NotNil(t, res) }) } + +type FgFilterDeleteNodeSuite struct { + suite.Suite + + replica ReplicaInterface + node *filterDeleteNode +} + +func (s *FgFilterDeleteNodeSuite) SetupTest() { + historical, err := genSimpleReplica() + + s.Require().NoError(err) + + historical.addExcludedSegments(defaultCollectionID, nil) + + node, err := newFilteredDeleteNode(historical, defaultCollectionID, defaultDeltaChannel) + s.Require().NoError(err) + + s.node = node +} + +func (s *FgFilterDeleteNodeSuite) generateFilterDeleteMsg() []flowgraph.Msg { + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) + return []flowgraph.Msg{msg} +} + +func (s *FgFilterDeleteNodeSuite) TestNewNodeFail() { + _, err := newFilteredDeleteNode(s.replica, defaultCollectionID, "bad_channel") + s.Error(err) +} + +func (s *FgFilterDeleteNodeSuite) TestOperate() { + s.Run("valid_msg", func() { + msgs := s.generateFilterDeleteMsg() //genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + out := s.node.Operate(msgs) + s.Equal(1, len(out)) + }) + + s.Run("legacy_timetick", func() { + ttMsg := &msgstream.TimeTickMsg{ + TimeTickMsg: internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + }, + }, + } + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{ttMsg}, 0, 1000, nil, nil) + out := s.node.Operate([]flowgraph.Msg{msg}) + s.Equal(1, len(out)) + }) + + s.Run("invalid_input_length", func() { + msgs := []flowgraph.Msg{ + flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{}, 0, 1000, nil, nil), + flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{}, 0, 1000, nil, nil), + } + out := s.node.Operate(msgs) + s.Equal(0, len(out)) + }) + + s.Run("filterInvalidDeleteMessage_failed", func() { + dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + dMsg.NumRows = 0 + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) + m := []flowgraph.Msg{msg} + s.Panics(func() { + s.node.Operate(m) + }) + }) + + s.Run("invalid_msgType", func() { + iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(), defaultDelLength) + s.Require().NoError(err) + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil) + + res := s.node.Operate([]flowgraph.Msg{msg}) + s.Equal(0, len(res)) + }) +} + +func (s *FgFilterDeleteNodeSuite) TestFilterInvalidDeleteMessage() { + s.Run("valid_case", func() { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection) + s.NoError(err) + s.NotNil(res) + }) + + s.Run("msg_collection_not_match", func() { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + s.node.collectionID = UniqueID(1000) + defer func() { s.node.collectionID = defaultCollectionID }() + res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection) + s.NoError(err) + s.Nil(res) + }) + + s.Run("msg_shard_not_match", func() { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + msg.ShardName = "non_match_shard" + defer func() { s.node.collectionID = defaultCollectionID }() + res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection) + s.NoError(err) + s.Nil(res) + }) + + s.Run("delete_no_data", func() { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + msg.Timestamps = make([]Timestamp, 0) + msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0) + msg.PrimaryKeys = &schemapb.IDs{} + msg.NumRows = 0 + res, err := s.node.filterInvalidDeleteMessage(msg, loadTypeCollection) + s.NoError(err) + s.NotNil(res) + }) + + s.Run("target_partition_not_match", func() { + msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) + err := s.node.metaReplica.removePartition(defaultPartitionID) + s.Require().NoError(err) + + res, err := s.node.filterInvalidDeleteMessage(msg, loadTypePartition) + s.NoError(err) + s.Nil(res) + }) +} + +func TestFlowGraphFilterDeleteNode(t *testing.T) { + suite.Run(t, new(FgFilterDeleteNodeSuite)) +} diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 1d10de84b..488087230 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -131,11 +131,14 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx), } - dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel) + dmStreamNode, err := q.newDeltaInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel) + if err != nil { + return nil, err + } + filterDeleteNode, err := newFilteredDeleteNode(metaReplica, collectionID, vchannel) if err != nil { return nil, err } - var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, vchannel) deleteNode, err := newDeleteNode(metaReplica, collectionID, vchannel) if err != nil { return nil, err @@ -200,6 +203,22 @@ func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstre return node, nil } +func (q *queryNodeFlowGraph) newDeltaInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, deltaChannel Channel, dataType string) (*flowgraph.InputNode, error) { + deltaStream, err := factory.NewMsgStream(ctx) + if err != nil { + return nil, err + } + + q.dmlStream = deltaStream + + maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength + maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism + name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, deltaChannel) + node := flowgraph.NewInputNode(deltaStream, name, maxQueueLength, maxParallelism, typeutil.QueryNodeRole, + Params.QueryNodeCfg.GetNodeID(), collectionID, dataType) + return node, nil +} + // consumeFlowGraph would consume by channel and subName func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSubName) error { if q.dmlStream == nil { diff --git a/internal/querynode/flow_graph_query_node_test.go b/internal/querynode/flow_graph_query_node_test.go index 6516d37a8..078a54550 100644 --- a/internal/querynode/flow_graph_query_node_test.go +++ b/internal/querynode/flow_graph_query_node_test.go @@ -20,8 +20,11 @@ import ( "context" "testing" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" ) @@ -80,3 +83,62 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { fg.close() } + +type DeltaFlowGraphSuite struct { + suite.Suite + + replica ReplicaInterface + tsafe TSafeReplicaInterface +} + +func (s *DeltaFlowGraphSuite) SetupTest() { + var err error + s.replica, err = genSimpleReplica() + s.Require().NoError(err) + + s.tsafe = newTSafeReplica() +} + +func (s *DeltaFlowGraphSuite) TestNewFgFailed() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockFactory := msgstream.NewMockMqFactory() + mockFactory.NewMsgStreamFunc = func(ctx context.Context) (msgstream.MsgStream, error) { + return nil, errors.New("mock error") + } + + _, err := newQueryNodeDeltaFlowGraph(ctx, + defaultCollectionID, + s.replica, + s.tsafe, + defaultDeltaChannel, + mockFactory, + ) + + s.Error(err) +} + +func (s *DeltaFlowGraphSuite) TestConsumeFlowGraph() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fac := genFactory() + + fg, err := newQueryNodeDeltaFlowGraph(ctx, + defaultCollectionID, + s.replica, + s.tsafe, + defaultDeltaChannel, + fac, + ) + s.Require().NoError(err) + + defer fg.close() + + err = fg.consumeFlowGraph(defaultDeltaChannel, defaultSubName) + s.NoError(err) +} + +func TestDeltaFlowGraph(t *testing.T) { + suite.Run(t, new(DeltaFlowGraphSuite)) +} -- GitLab