未验证 提交 48f5c600 编写于 作者: J jaime 提交者: GitHub

Refine msgstream close (#20194)

Signed-off-by: Nyun.zhang <yun.zhang@zilliz.com>
Signed-off-by: Nyun.zhang <yun.zhang@zilliz.com>
上级 4136009a
......@@ -371,10 +371,11 @@ func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb
func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) {
pChannelName := funcutil.ToPhysicalChannel(channelName)
dmlStream, err := dsService.msFactory.NewMsgStream(ctx)
defer dmlStream.Close()
if err != nil {
return nil, err
}
defer dmlStream.Close()
subName := fmt.Sprintf("datanode-%d-%s-%d", Params.DataNodeCfg.GetNodeID(), channelName, segmentID)
log.Debug("dataSyncService register consumer for getChannelLatestMsgID",
zap.String("pChannelName", pChannelName),
......
......@@ -179,6 +179,9 @@ func (ms *mqMsgStream) Start() {
}
func (ms *mqMsgStream) Close() {
log.Info("start to close mq msg stream",
zap.Int("producer num", len(ms.producers)),
zap.Int("consumer num", len(ms.consumers)))
ms.streamCancel()
ms.closeRWMutex.Lock()
defer ms.closeRWMutex.Unlock()
......@@ -909,7 +912,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
for runLoop {
select {
case <-ms.ctx.Done():
return nil
return ms.ctx.Err()
case msg, ok := <-consumer.Chan():
if !ok {
return fmt.Errorf("consumer closed")
......
......@@ -761,6 +761,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
for hasMore {
select {
case <-ctx.Done():
log.Debug("read delta msg from seek position done", zap.Error(ctx.Err()))
return ctx.Err()
case msgPack, ok := <-stream.Chan():
if !ok {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册