From 48f5c60070a733c40e0eb2179198475e8a337f0c Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 31 Oct 2022 15:13:33 +0800 Subject: [PATCH] Refine msgstream close (#20194) Signed-off-by: yun.zhang Signed-off-by: yun.zhang --- internal/datanode/data_sync_service.go | 3 ++- internal/mq/msgstream/mq_msgstream.go | 5 ++++- internal/querynode/segment_loader.go | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index d951c694f..a07aa38eb 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -371,10 +371,11 @@ func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) { pChannelName := funcutil.ToPhysicalChannel(channelName) dmlStream, err := dsService.msFactory.NewMsgStream(ctx) - defer dmlStream.Close() if err != nil { return nil, err } + defer dmlStream.Close() + subName := fmt.Sprintf("datanode-%d-%s-%d", Params.DataNodeCfg.GetNodeID(), channelName, segmentID) log.Debug("dataSyncService register consumer for getChannelLatestMsgID", zap.String("pChannelName", pChannelName), diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 280f8e1a4..b7a2ddb06 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -179,6 +179,9 @@ func (ms *mqMsgStream) Start() { } func (ms *mqMsgStream) Close() { + log.Info("start to close mq msg stream", + zap.Int("producer num", len(ms.producers)), + zap.Int("consumer num", len(ms.consumers))) ms.streamCancel() ms.closeRWMutex.Lock() defer ms.closeRWMutex.Unlock() @@ -909,7 +912,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { for runLoop { select { case <-ms.ctx.Done(): - return nil + return ms.ctx.Err() case msg, ok := <-consumer.Chan(): if !ok { return fmt.Errorf("consumer closed") diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 81a0f172c..18e514192 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -761,6 +761,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection for hasMore { select { case <-ctx.Done(): + log.Debug("read delta msg from seek position done", zap.Error(ctx.Err())) return ctx.Err() case msgPack, ok := <-stream.Chan(): if !ok { -- GitLab