未验证 提交 920fcac1 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1569 from guyuezheng/master

[ISSUE #1568] rocketmq need enhance stability when commitlog broken
...@@ -131,6 +131,9 @@ public class TransactionalMessageBridge { ...@@ -131,6 +131,9 @@ public class TransactionalMessageBridge {
this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic,
getMessageResult.getBufferTotalSize()); getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
if (foundList == null || foundList.size() == 0) {
break;
}
this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
.getStoreTimestamp()); .getStoreTimestamp());
...@@ -175,8 +178,10 @@ public class TransactionalMessageBridge { ...@@ -175,8 +178,10 @@ public class TransactionalMessageBridge {
try { try {
List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
for (ByteBuffer bb : messageBufferList) { for (ByteBuffer bb : messageBufferList) {
MessageExt msgExt = MessageDecoder.decode(bb); MessageExt msgExt = MessageDecoder.decode(bb, true, false);
foundList.add(msgExt); if (msgExt != null) {
foundList.add(msgExt);
}
} }
} finally { } finally {
......
...@@ -167,6 +167,24 @@ public class TransactionalMessageBridgeTest { ...@@ -167,6 +167,24 @@ public class TransactionalMessageBridgeTest {
assertThat(messageExt).isNotNull(); assertThat(messageExt).isNotNull();
} }
@Test
public void testGetHalfMessageStatusFound() {
when(messageStore
.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
.thenReturn(createGetMessageResult(GetMessageStatus.FOUND));
PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND);
}
@Test
public void testGetHalfMessageNull() {
when(messageStore
.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
.thenReturn(null);
PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
assertThat(result).isNull();
}
private GetMessageResult createGetMessageResult(GetMessageStatus status) { private GetMessageResult createGetMessageResult(GetMessageStatus status) {
GetMessageResult getMessageResult = new GetMessageResult(); GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(status); getMessageResult.setStatus(status);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册