提交 a4f62104 编写于 作者: Z zhenghu

[#1568] rocketmq need enhance stability when commitlog broken

 增加非空判断,防止空指针异常,导致循环事务 check,增加test覆盖,修复编码格式
上级 159d28d5
...@@ -62,7 +62,7 @@ public class TransactionalMessageBridgeTest { ...@@ -62,7 +62,7 @@ public class TransactionalMessageBridgeTest {
@Spy @Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig()); new NettyClientConfig(), new MessageStoreConfig());
@Mock @Mock
private MessageStore messageStore; private MessageStore messageStore;
...@@ -82,7 +82,7 @@ public class TransactionalMessageBridgeTest { ...@@ -82,7 +82,7 @@ public class TransactionalMessageBridgeTest {
@Test @Test
public void testPutHalfMessage() { public void testPutHalfMessage() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
PutMessageResult result = transactionBridge.putHalfMessage(createMessageBrokerInner()); PutMessageResult result = transactionBridge.putHalfMessage(createMessageBrokerInner());
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
} }
...@@ -96,7 +96,7 @@ public class TransactionalMessageBridgeTest { ...@@ -96,7 +96,7 @@ public class TransactionalMessageBridgeTest {
@Test @Test
public void testFetchConsumeOffset() { public void testFetchConsumeOffset() {
MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(), MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(),
0); 0);
long offset = transactionBridge.fetchConsumeOffset(mq); long offset = transactionBridge.fetchConsumeOffset(mq);
assertThat(offset).isGreaterThan(-1); assertThat(offset).isGreaterThan(-1);
} }
...@@ -104,20 +104,24 @@ public class TransactionalMessageBridgeTest { ...@@ -104,20 +104,24 @@ public class TransactionalMessageBridgeTest {
@Test @Test
public void updateConsumeOffset() { public void updateConsumeOffset() {
MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(), MessageQueue mq = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), this.brokerController.getBrokerConfig().getBrokerName(),
0); 0);
transactionBridge.updateConsumeOffset(mq, 0); transactionBridge.updateConsumeOffset(mq, 0);
} }
@Test @Test
public void testGetHalfMessage() { public void testGetHalfMessage() {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE)); when(messageStore
.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
.thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
PullResult result = transactionBridge.getHalfMessage(0, 0, 1); PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG); assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
} }
@Test @Test
public void testGetOpMessage() { public void testGetOpMessage() {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE)); when(messageStore
.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
.thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
PullResult result = transactionBridge.getOpMessage(0, 0, 1); PullResult result = transactionBridge.getOpMessage(0, 0, 1);
assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG); assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
} }
...@@ -125,7 +129,7 @@ public class TransactionalMessageBridgeTest { ...@@ -125,7 +129,7 @@ public class TransactionalMessageBridgeTest {
@Test @Test
public void testPutMessageReturnResult() { public void testPutMessageReturnResult() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
PutMessageResult result = transactionBridge.putMessageReturnResult(createMessageBrokerInner()); PutMessageResult result = transactionBridge.putMessageReturnResult(createMessageBrokerInner());
assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK);
} }
...@@ -133,7 +137,7 @@ public class TransactionalMessageBridgeTest { ...@@ -133,7 +137,7 @@ public class TransactionalMessageBridgeTest {
@Test @Test
public void testPutMessage() { public void testPutMessage() {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
Boolean success = transactionBridge.putMessage(createMessageBrokerInner()); Boolean success = transactionBridge.putMessage(createMessageBrokerInner());
assertThat(success).isEqualTo(true); assertThat(success).isEqualTo(true);
} }
...@@ -143,9 +147,9 @@ public class TransactionalMessageBridgeTest { ...@@ -143,9 +147,9 @@ public class TransactionalMessageBridgeTest {
MessageExt messageExt = createMessageBrokerInner(); MessageExt messageExt = createMessageBrokerInner();
final String offset = "123456789"; final String offset = "123456789";
MessageExtBrokerInner msgInner = transactionBridge.renewImmunityHalfMessageInner(messageExt); MessageExtBrokerInner msgInner = transactionBridge.renewImmunityHalfMessageInner(messageExt);
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,offset); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET, offset);
assertThat(msgInner).isNotNull(); assertThat(msgInner).isNotNull();
Map<String,String> properties = msgInner.getProperties(); Map<String, String> properties = msgInner.getProperties();
assertThat(properties).isNotNull(); assertThat(properties).isNotNull();
String resOffset = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); String resOffset = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
assertThat(resOffset).isEqualTo(offset); assertThat(resOffset).isEqualTo(offset);
...@@ -157,11 +161,11 @@ public class TransactionalMessageBridgeTest { ...@@ -157,11 +161,11 @@ public class TransactionalMessageBridgeTest {
MessageExt messageExt = new MessageExt(); MessageExt messageExt = new MessageExt();
long bornTimeStamp = messageExt.getBornTimestamp(); long bornTimeStamp = messageExt.getBornTimestamp();
MessageExt messageExtRes = transactionBridge.renewHalfMessageInner(messageExt); MessageExt messageExtRes = transactionBridge.renewHalfMessageInner(messageExt);
assertThat( messageExtRes.getBornTimestamp()).isEqualTo(bornTimeStamp); assertThat(messageExtRes.getBornTimestamp()).isEqualTo(bornTimeStamp);
} }
@Test @Test
public void testLookMessageByOffset(){ public void testLookMessageByOffset() {
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
MessageExt messageExt = transactionBridge.lookMessageByOffset(123); MessageExt messageExt = transactionBridge.lookMessageByOffset(123);
assertThat(messageExt).isNotNull(); assertThat(messageExt).isNotNull();
...@@ -182,7 +186,7 @@ public class TransactionalMessageBridgeTest { ...@@ -182,7 +186,7 @@ public class TransactionalMessageBridgeTest {
.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))) .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
.thenReturn(null); .thenReturn(null);
PullResult result = transactionBridge.getHalfMessage(0, 0, 1); PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
assertThat(result.getPullStatus()).isNull(); assertThat(result).isNull();
} }
private GetMessageResult createGetMessageResult(GetMessageStatus status) { private GetMessageResult createGetMessageResult(GetMessageStatus status) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册