diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 7a86b92cfec29f71b79e35f9f73fd2633812878d..39a08b8dee8d488e3e952c3e982cd2fbd7e09aba 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -102,13 +102,20 @@ type shardSegmentInfo struct { inUse int32 } +// Closable interface for close. +type Closable interface { + Close() +} + // ShardNodeDetector provides method to detect node events type ShardNodeDetector interface { + Closable watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) } // ShardSegmentDetector provides method to detect segment events type ShardSegmentDetector interface { + Closable watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) } @@ -181,6 +188,13 @@ func (sc *ShardCluster) Close() { log.Info("Close shard cluster") sc.closeOnce.Do(func() { sc.updateShardClusterState(unavailable) + if sc.nodeDetector != nil { + sc.nodeDetector.Close() + } + if sc.segmentDetector != nil { + sc.segmentDetector.Close() + } + close(sc.closeCh) }) } diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 7ed261317fe712b6d6beadebd74fd3cdc09c3256..31486a658b01158d12a072f077f283997fe560a6 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -38,6 +38,8 @@ func (m *mockNodeDetector) watchNodes(collectionID int64, replicaID int64, vchan return m.initNodes, m.evtCh } +func (m *mockNodeDetector) Close() {} + type mockSegmentDetector struct { initSegments []segmentEvent evtCh chan segmentEvent @@ -47,6 +49,8 @@ func (m *mockSegmentDetector) watchSegments(collectionID int64, replicaID int64, return m.initSegments, m.evtCh } +func (m *mockSegmentDetector) Close() {} + type mockShardQueryNode struct { statisticResponse *internalpb.GetStatisticsResponse statisticErr error