From ebb9b24b477dfa7041ffe8e4d34fbb369185cc8a Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Thu, 30 Dec 2021 10:33:46 +0800 Subject: [PATCH] Improve name of flowgraph node (#14538) Signed-off-by: dragondriver --- internal/datanode/flow_graph_dd_node.go | 3 ++- internal/datanode/flow_graph_dd_node_test.go | 3 ++- internal/datanode/flow_graph_delete_node.go | 2 +- internal/datanode/flow_graph_delete_node_test.go | 2 +- internal/datanode/flow_graph_dmstream_input_node.go | 3 ++- internal/datanode/flow_graph_insert_buffer_node.go | 2 +- internal/querynode/flow_graph_filter_delete_node.go | 4 +++- internal/querynode/flow_graph_filter_dm_node.go | 4 +++- internal/querynode/flow_graph_service_time_node.go | 3 ++- 9 files changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 2dcc22a2e..401e36d55 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "fmt" "sync" "sync/atomic" @@ -68,7 +69,7 @@ type ddNode struct { // Name returns node name, implementing flowgraph.Node func (ddn *ddNode) Name() string { - return "ddNode" + return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vchannelName) } // Operate handles input messages, implementing flowgrpah.Node diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 2c9b2429f..636fc5d05 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -90,7 +91,7 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) { for _, seg := range ddNode.flushedSegments { flushedSegIDs = append(flushedSegIDs, seg.ID) } - assert.Equal(t, "ddNode", ddNode.Name()) + assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vchannelName), ddNode.Name()) assert.Equal(t, test.inCollID, ddNode.collectionID) assert.Equal(t, len(test.inFlushedSegs), len(ddNode.flushedSegments)) assert.ElementsMatch(t, test.inFlushedSegs, flushedSegIDs) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index afe7a885a..e2be1702b 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -91,7 +91,7 @@ func newDelDataBuf() *DelDataBuf { } func (dn *deleteNode) Name() string { - return "deleteNode" + return "deleteNode-" + dn.channelName } func (dn *deleteNode) Close() { diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index e021a472d..1467b8cf8 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -121,7 +121,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) { assert.Nil(t, err) assert.NotNil(t, dn) - assert.Equal(t, "deleteNode", dn.Name()) + assert.Equal(t, "deleteNode-"+dn.channelName, dn.Name()) dn.Close() }) } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 729fa8048..1237c6630 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -57,6 +57,7 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode log.Debug("datanode Seek successfully", zap.String("Channel Name", seekPos.GetChannelName()), zap.Duration("elapse", time.Since(start))) } - node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism) + name := fmt.Sprintf("dmInputNode-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName) + node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism) return node, nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index f36e696b6..37635b703 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -142,7 +142,7 @@ func (bd *BufferData) updateSize(no int64) { } func (ibNode *insertBufferNode) Name() string { - return "ibNode" + return "ibNode-" + ibNode.channelName } func (ibNode *insertBufferNode) Close() { diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index d3e999e48..8dcdff221 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -17,6 +17,8 @@ package querynode import ( + "fmt" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -36,7 +38,7 @@ type filterDeleteNode struct { // Name returns the name of filterDeleteNode func (fddNode *filterDeleteNode) Name() string { - return "fdNode" + return fmt.Sprintf("fdNode-%d", fddNode.collectionID) } // Operate handles input messages, to filter invalid delete messages diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 49d2383fb..9e4ea0eb2 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -17,6 +17,8 @@ package querynode import ( + "fmt" + "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -36,7 +38,7 @@ type filterDmNode struct { // Name returns the name of filterDmNode func (fdmNode *filterDmNode) Name() string { - return "fdmNode" + return fmt.Sprintf("fdmNode-%d", fdmNode.collectionID) } // Operate handles input messages, to filter invalid insert messages diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 21682b156..295cccdae 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -18,6 +18,7 @@ package querynode import ( "context" + "fmt" "go.uber.org/zap" @@ -37,7 +38,7 @@ type serviceTimeNode struct { // Name returns the name of serviceTimeNode func (stNode *serviceTimeNode) Name() string { - return "stNode" + return fmt.Sprintf("stNode-%d-%s", stNode.collectionID, stNode.vChannel) } // Close would close serviceTimeNode -- GitLab