diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 4e8122287591342478ee0ee8d963c48196d71746..2854eae37ee30f7279a70489033216dd68d142f2 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -557,7 +557,12 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor }, nil } // Get the current dml channel position ID, that will be used in segments start positions and end positions. - posID, err := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) + var posID []byte + err = retry.Do(ctx, func() error { + id, innerError := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) + posID = id + return innerError + }, retry.Attempts(30)) if err != nil { return &datapb.AddImportSegmentResponse{ Status: merr.Status(merr.WrapErrChannelNotFound(