diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index c0b2a01776dbe8b41ba47cffb32f88afedf78cac..d201934800a929ebfda46ebd51876804405c0549 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -335,6 +335,7 @@ func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) { if !ok { return } + consumer.Ack(pulsarMsg) headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { @@ -430,6 +431,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { msgLen := len(consumer.Chan()) for i := 0; i < msgLen; i++ { msg := <-consumer.Chan() + consumer.Ack(msg) pulsarMsgBuffer = append(pulsarMsgBuffer, msg) } }