From 351d87055a0d74166a58d2fd741766048483f750 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Wed, 9 Jun 2021 16:01:48 +0800 Subject: [PATCH] fix ttmsgstream (#5689) * fix msgstream Signed-off-by: yefu.chen * fix asconsumer Signed-off-by: yefu.chen --- internal/msgstream/mq_msgstream.go | 27 +++++++++++-------------- internal/msgstream/mq_msgstream_test.go | 5 +++++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index dc40317db..4dc1c91b5 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -352,6 +352,13 @@ func (ms *mqMsgStream) receiveMsg(consumer mqclient.Consumer) { log.Error("Failed to getTsMsgFromConsumerMsg", zap.Error(err)) continue } + pos := tsMsg.Position() + tsMsg.SetPosition(&MsgPosition{ + ChannelName: pos.ChannelName, + MsgID: pos.MsgID, + MsgGroup: consumer.Subscription(), + Timestamp: tsMsg.BeginTs(), + }) sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) if ok { @@ -697,21 +704,12 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { var mp *MsgPosition var err error fn := func() error { - if _, ok := ms.consumers[mp.ChannelName]; ok { - return fmt.Errorf("the channel should not been subscribed") - } - - receiveChannel := make(chan mqclient.ConsumerMessage, ms.bufSize) - consumer, err = ms.client.Subscribe(mqclient.ConsumerOptions{ - Topic: mp.ChannelName, - SubscriptionName: mp.MsgGroup, - SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest, - Type: mqclient.KeyShared, - MessageChannel: receiveChannel, - }) - if err != nil { - return err + var ok bool + consumer, ok = ms.consumers[mp.ChannelName] + if !ok { + return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName) } + if consumer == nil { return fmt.Errorf("consumer is nil") } @@ -736,7 +734,6 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if len(mp.MsgID) == 0 { return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface") } - if err = Retry(20, time.Millisecond*200, fn); err != nil { return fmt.Errorf("Failed to seek, error %s", err.Error()) } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index 19c7daefb..a625cfb7b 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -254,6 +254,11 @@ func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPositi factory := ProtoUDFactory{} pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + consumerName := []string{} + for _, c := range positions { + consumerName = append(consumerName, c.ChannelName) + } + outputStream.AsConsumer(consumerName, positions[0].MsgGroup) outputStream.Seek(positions) outputStream.Start() return outputStream -- GitLab