From dca6c6afde928e4c1903c5af1757f281b21163c4 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 15 Jul 2021 10:05:55 +0800 Subject: [PATCH] Add unittest for BackGroundGC in DataNode (#6522) Signed-off-by: yangxuan --- internal/datanode/data_node_test.go | 82 +++++++++++++------------ internal/datanode/flow_graph_dd_node.go | 1 + 2 files changed, 45 insertions(+), 38 deletions(-) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 3621da145..30ec46bcd 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -203,8 +203,48 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) }) + t.Run("Test BackGroundGC", func(te *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + node := newIDLEDataNodeMock(ctx) + + collIDCh := make(chan UniqueID) + go node.BackGroundGC(collIDCh) + node.clearSignal = collIDCh + + testDataSyncs := []struct { + collID UniqueID + dmChannelName string + }{ + {1, "fake-dm-backgroundgc-1"}, + {2, "fake-dm-backgroundgc-2"}, + {3, "fake-dm-backgroundgc-3"}, + {4, ""}, + {1, ""}, + } + + for i, t := range testDataSyncs { + if i <= 2 { + node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: t.collID, ChannelName: t.dmChannelName}) + + msFactory := msgstream.NewPmsFactory() + insertStream, _ := msFactory.NewMsgStream(ctx) + var insertMsgStream msgstream.MsgStream = insertStream + insertMsgStream.Start() + } + + collIDCh <- t.collID + } + + assert.Eventually(t, func() bool { + node.chanMut.Lock() + defer node.chanMut.Unlock() + return len(node.vchan2FlushCh) == 0 + }, time.Second, time.Millisecond) + + cancel() + }) + t.Run("Test ReleaseDataSyncService", func(t *testing.T) { - t.Skip() dmChannelName := "fake-dm-channel-test-NewDataSyncService" vchan := &datapb.VchannelInfo{ @@ -214,9 +254,9 @@ func TestDataNode(t *testing.T) { } err := node.NewDataSyncService(vchan) - assert.NoError(t, err) - assert.Equal(t, 1, len(node.vchan2FlushCh)) - assert.Equal(t, 1, len(node.vchan2SyncService)) + require.NoError(t, err) + require.Equal(t, 1, len(node.vchan2FlushCh)) + require.Equal(t, 1, len(node.vchan2SyncService)) time.Sleep(time.Second) node.ReleaseDataSyncService(dmChannelName) @@ -275,40 +315,6 @@ func TestDataNode(t *testing.T) { cancel() }) - t.Run("Test BackGroundGC", func(t *testing.T) { - t.Skipf("Skip for data race") - collIDCh := make(chan UniqueID) - go node.BackGroundGC(collIDCh) - - dmChannelName := "fake-dm-channel-test-BackGroundGC" - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegments: []*datapb.SegmentInfo{}, - } - require.Equal(t, 0, len(node.vchan2FlushCh)) - require.Equal(t, 0, len(node.vchan2SyncService)) - - err := node.NewDataSyncService(vchan) - require.NoError(t, err) - time.Sleep(time.Second) - - require.Equal(t, 1, len(node.vchan2FlushCh)) - require.Equal(t, 1, len(node.vchan2SyncService)) - - collIDCh <- 1 - assert.Eventually(t, func() bool { - return len(node.vchan2FlushCh) == 0 - }, time.Second*4, time.Millisecond) - - assert.Equal(t, 0, len(node.vchan2SyncService)) - - s, ok := node.vchan2SyncService[dmChannelName] - assert.False(t, ok) - assert.Nil(t, s) - }) - cancel() <-node.ctx.Done() node.Stop() diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 3310be3b9..e9d6eeb88 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -88,6 +88,7 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { log.Info("Destroying current flowgraph", zap.Any("collectionID", ddn.collectionID)) ddn.clearSignal <- ddn.collectionID + return []Msg{} } case commonpb.MsgType_Insert: log.Debug("DDNode with insert messages") -- GitLab