diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index d951c694f17f0b8938e623cace831f8c2e276f61..a07aa38eb0ca3a18dc6872e95d8dbf28e68637bc 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -371,10 +371,11 @@ func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) { pChannelName := funcutil.ToPhysicalChannel(channelName) dmlStream, err := dsService.msFactory.NewMsgStream(ctx) - defer dmlStream.Close() if err != nil { return nil, err } + defer dmlStream.Close() + subName := fmt.Sprintf("datanode-%d-%s-%d", Params.DataNodeCfg.GetNodeID(), channelName, segmentID) log.Debug("dataSyncService register consumer for getChannelLatestMsgID", zap.String("pChannelName", pChannelName), diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 280f8e1a4914dfb211e9e2572fddd76e4a202099..b7a2ddb06e46d3f29591e077cac5852b02512012 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -179,6 +179,9 @@ func (ms *mqMsgStream) Start() { } func (ms *mqMsgStream) Close() { + log.Info("start to close mq msg stream", + zap.Int("producer num", len(ms.producers)), + zap.Int("consumer num", len(ms.consumers))) ms.streamCancel() ms.closeRWMutex.Lock() defer ms.closeRWMutex.Unlock() @@ -909,7 +912,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { for runLoop { select { case <-ms.ctx.Done(): - return nil + return ms.ctx.Err() case msg, ok := <-consumer.Chan(): if !ok { return fmt.Errorf("consumer closed") diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 81a0f172ca3572dd8c5e7c68eb234b2314c6b62e..18e514192ec6ac5a47f15f698136157ea7d10639 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -761,6 +761,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection for hasMore { select { case <-ctx.Done(): + log.Debug("read delta msg from seek position done", zap.Error(ctx.Err())) return ctx.Err() case msgPack, ok := <-stream.Chan(): if !ok {