提交 795cb74c 编写于 作者: X xige-16 提交者: yefu.chen

Fix message stream miss ack

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 3e5f05fc
...@@ -335,6 +335,7 @@ func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) { ...@@ -335,6 +335,7 @@ func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) {
if !ok { if !ok {
return return
} }
consumer.Ack(pulsarMsg)
headerMsg := commonpb.MsgHeader{} headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil { if err != nil {
...@@ -430,6 +431,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { ...@@ -430,6 +431,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
msgLen := len(consumer.Chan()) msgLen := len(consumer.Chan())
for i := 0; i < msgLen; i++ { for i := 0; i < msgLen; i++ {
msg := <-consumer.Chan() msg := <-consumer.Chan()
consumer.Ack(msg)
pulsarMsgBuffer = append(pulsarMsgBuffer, msg) pulsarMsgBuffer = append(pulsarMsgBuffer, msg)
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册