diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index dc40317dbaeed4b7b05a2ae06cc722f77df6eb64..4dc1c91b5dace271e3447ffc941f283fc93ee27d 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -352,6 +352,13 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) { log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err)) continue } + pos := tsMsg.Position() + tsMsg.SetPosition(&MsgPosition{ + ChannelName: pos.ChannelName, + MsgID: pos.MsgID, + MsgGroup: consumer.Subscription(), + Timestamp: tsMsg.BeginTs(), + }) sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) if ok { @@ -697,21 +704,12 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { var mp *MsgPosition var err error fn := func() error { - if _, ok := ms.consumers[mp.ChannelName]; ok { - return fmt.Errorf("the channel should not been subscribed") - } - - receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize) - consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{ - Topic: mp.ChannelName, - SubscriptionName: mp.MsgGroup, - SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, - Type: mqclient.KeyShared, - MessageChannel: receiveChannel, - }) - if err != nil { - return err + var ok bool + consumer, ok = ms.consumers[mp.ChannelName] + if !ok { + return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName) } + if consumer == nil { return fmt.Errorf("consumer is nil") } @@ -736,7 +734,6 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if len(mp.MsgID) == 0 { return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface") } - if err = Retry(20, time.Millisecond*200, fn); err != nil { return fmt.Errorf("Failed to seek, error %s", err.Error()) } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index 19c7daefb7e4097e83ad9f539b217f633bd58598..a625cfb7b0db63296c9b34d0e44fe2cbe9c03e89 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -254,6 +254,11 @@ func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPositi factory := ProtoUDFactory{} pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + consumerName := []string{} + for _, c := range positions { + consumerName = append(consumerName, c.ChannelName) + } + outputStream.AsConsumer(consumerName, positions[0].MsgGroup) outputStream.Seek(positions) outputStream.Start() return outputStream