diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index b1a88b5f7f8da1dc39c646cf1c74b88f72d6c185..8e283758d421ad81d9887a865030585f32db423e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -131,6 +131,9 @@ public class TransactionalMessageBridge { this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); + if ( foundList == null || foundList.size() == 0 ) { + break; + } this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) .getStoreTimestamp()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index b1c669c9933b1a0f2af5e0cee918c1eefb8a1717..f2636b7b1434979632ded12fa3788914d227fe83 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -167,6 +167,24 @@ public class TransactionalMessageBridgeTest { assertThat(messageExt).isNotNull(); } + @Test + public void testGetHalfMessageStatusFound() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(createGetMessageResult(GetMessageStatus.FOUND)); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND); + } + + @Test + public void testGetHalfMessageNull() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(null); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result.getPullStatus()).isNull(); + } + private GetMessageResult createGetMessageResult(GetMessageStatus status) { GetMessageResult getMessageResult = new GetMessageResult(); getMessageResult.setStatus(status);