From 795cb74c853943915b233f746302d22a294ad0ec Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 2 Mar 2021 13:15:49 +0800 Subject: [PATCH] Fix message stream miss ack Signed-off-by: xige-16 --- internal/msgstream/pulsarms/pulsar_msgstream.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index c0b2a0177..d20193480 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -335,6 +335,7 @@ func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) { if !ok { return } + consumer.Ack(pulsarMsg) headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { @@ -430,6 +431,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { msgLen := len(consumer.Chan()) for i := 0; i < msgLen; i++ { msg := <-consumer.Chan() + consumer.Ack(msg) pulsarMsgBuffer = append(pulsarMsgBuffer, msg) } } -- GitLab