From f0fe8dae0be0acde1b4ae0c8f587053840653c9e Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 1 Aug 2022 13:50:33 +0800 Subject: [PATCH] Close Node/Segment detector when close ShardCluster (#18476) Signed-off-by: Congqi Xia --- internal/querynode/shard_cluster.go | 14 ++++++++++++++ internal/querynode/shard_cluster_test.go | 4 ++++ 2 files changed, 18 insertions(+) diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 7a86b92cf..39a08b8de 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -102,13 +102,20 @@ type shardSegmentInfo struct { inUse int32 } +// Closable interface for close. +type Closable interface { + Close() +} + // ShardNodeDetector provides method to detect node events type ShardNodeDetector interface { + Closable watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) } // ShardSegmentDetector provides method to detect segment events type ShardSegmentDetector interface { + Closable watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) } @@ -181,6 +188,13 @@ func (sc *ShardCluster) Close() { log.Info("Close shard cluster") sc.closeOnce.Do(func() { sc.updateShardClusterState(unavailable) + if sc.nodeDetector != nil { + sc.nodeDetector.Close() + } + if sc.segmentDetector != nil { + sc.segmentDetector.Close() + } + close(sc.closeCh) }) } diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 7ed261317..31486a658 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -38,6 +38,8 @@ func (m *mockNodeDetector) watchNodes(collectionID int64, replicaID int64, vchan return m.initNodes, m.evtCh } +func (m *mockNodeDetector) Close() {} + type mockSegmentDetector struct { initSegments []segmentEvent evtCh chan segmentEvent @@ -47,6 +49,8 @@ func (m *mockSegmentDetector) watchSegments(collectionID int64, replicaID int64, return m.initSegments, m.evtCh } +func (m *mockSegmentDetector) Close() {} + type mockShardQueryNode struct { statisticResponse *internalpb.GetStatisticsResponse statisticErr error -- GitLab