From d2345731b9998603a15f1fdc2e8ee96d1e9d439d Mon Sep 17 00:00:00 2001 From: zhenghu Date: Tue, 5 Nov 2019 00:53:34 +0800 Subject: [PATCH] =?UTF-8?q?[#1568]=20rocketmq=20need=20enhance=20stability?= =?UTF-8?q?=20when=20commitlog=20broken=20=20=E5=A2=9E=E5=8A=A0=E9=9D=9E?= =?UTF-8?q?=E7=A9=BA=E5=88=A4=E6=96=AD=EF=BC=8C=E9=98=B2=E6=AD=A2=E7=A9=BA?= =?UTF-8?q?=E6=8C=87=E9=92=88=E5=BC=82=E5=B8=B8,=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=E4=BA=8B=E5=8A=A1=20check,=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?test=E8=A6=86=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../queue/TransactionalMessageBridge.java | 3 +++ .../queue/TransactionalMessageBridgeTest.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index b1a88b5f..8e283758 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -131,6 +131,9 @@ public class TransactionalMessageBridge { this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); + if ( foundList == null || foundList.size() == 0 ) { + break; + } this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) .getStoreTimestamp()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index b1c669c9..f2636b7b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -167,6 +167,24 @@ public class TransactionalMessageBridgeTest { assertThat(messageExt).isNotNull(); } + @Test + public void testGetHalfMessageStatusFound() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(createGetMessageResult(GetMessageStatus.FOUND)); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND); + } + + @Test + public void testGetHalfMessageNull() { + when(messageStore + .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) + .thenReturn(null); + PullResult result = transactionBridge.getHalfMessage(0, 0, 1); + assertThat(result.getPullStatus()).isNull(); + } + private GetMessageResult createGetMessageResult(GetMessageStatus status) { GetMessageResult getMessageResult = new GetMessageResult(); getMessageResult.setStatus(status); -- GitLab