diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index 453a335a40b22ce370bd52f03b1dd718e8b3b9a0..67f7a5f5628a0030dc2ffe733e84821eb03f449d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -131,6 +131,9 @@ public class TransactionalMessageBridge { this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); + if (foundList == null || foundList.size() == 0) { + break; + } this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) .getStoreTimestamp()); @@ -175,8 +178,10 @@ public class TransactionalMessageBridge { try { List messageBufferList = getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { - MessageExt msgExt = MessageDecoder.decode(bb); - foundList.add(msgExt); + MessageExt msgExt = MessageDecoder.decode(bb, true, false); + if (msgExt != null) { + foundList.add(msgExt); + } } } finally { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index b1c669c9933b1a0f2af5e0cee918c1eefb8a1717..ebe887285f0ff63fd81c741523a6f1820017fccc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -167,6 +167,24 @@ public class TransactionalMessageBridgeTest { assertThat(messageExt).isNotNull(); } + @Test + public void testGetHalfMessageStatusFound() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(createGetMessageResult(GetMessageStatus.FOUND)); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND); + } + + @Test + public void testGetHalfMessageNull() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(null); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result).isNull(); + } + private GetMessageResult createGetMessageResult(GetMessageStatus status) { GetMessageResult getMessageResult = new GetMessageResult(); getMessageResult.setStatus(status);