From 264a05607c088b638fc65ca4074e5ebefbeeb199 Mon Sep 17 00:00:00 2001 From: yukon Date: Fri, 20 Jan 2017 10:50:43 +0800 Subject: [PATCH] [ROCKETMQ-51] Add unit tests for SendMessageProcessor --- .../rocketmq/broker/BrokerControllerTest.java | 20 +- .../processor/SendMessageProcessorTest.java | 200 ++++++++++++++++++ 2 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 4a19c0cf..86b9c4ed 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -21,37 +21,27 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class BrokerControllerTest { - private static final int RESTART_NUM = 3; - protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class); +import static org.assertj.core.api.Assertions.assertThat; +public class BrokerControllerTest { /** * Tests if the controller can be properly stopped and started. * * @throws Exception If fails. */ @Test - public void testRestart() throws Exception { - - for (int i = 0; i < RESTART_NUM; i++) { + public void testBrokerRestart() throws Exception { + for (int i = 0; i < 2; i++) { BrokerController brokerController = new BrokerController(// new BrokerConfig(), // new NettyServerConfig(), // new NettyClientConfig(), // new MessageStoreConfig()); - boolean initResult = brokerController.initialize(); - Assert.assertTrue(initResult); - logger.info("Broker is initialized " + initResult); + assertThat(brokerController.initialize()); brokerController.start(); - logger.info("Broker is started"); - brokerController.shutdown(); - logger.info("Broker is stopped"); } } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java new file mode 100644 index 00000000..d41d03ad --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SendMessageProcessorTest { + private SendMessageProcessor sendMessageProcessor; + @Mock + private ChannelHandlerContext handlerContext; + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private MessageStore messageStore; + + @Before + public void init() { + brokerController.setMessageStore(messageStore); + when(messageStore.now()).thenReturn(System.currentTimeMillis()); + Channel mockChannel = mock(Channel.class); + when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); + when(handlerContext.channel()).thenReturn(mockChannel); + when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); + } + + @Test + public void testProcessRequest() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + assertPutResult(ResponseCode.SUCCESS); + } + + + @Test + public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); + } + + @Test + public void testProcessRequest_MessageIllegal() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.MESSAGE_ILLEGAL); + } + + @Test + public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.SYSTEM_ERROR); + } + + @Test + public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); + } + + @Test + public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.SYSTEM_ERROR); + } + + @Test + public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.MESSAGE_ILLEGAL); + } + + @Test + public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); + } + + @Test + public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); + } + + @Test + public void testProcessRequest_WithMsgBack() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); + + sendMessageProcessor = new SendMessageProcessor(brokerController); + final RemotingCommand response = sendMessageProcessor.processRequest(handlerContext, request); + assertThat(response).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + private RemotingCommand createSendMsgCommand(int requestCode) { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup("FooBar_PID"); + requestHeader.setTopic("FooBar"); + requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); + requestHeader.setDefaultTopicQueueNums(3); + requestHeader.setQueueId(1); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(124); + requestHeader.setReconsumeTimes(0); + + RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); + request.addExtField("queueId", String.valueOf(requestHeader.getQueueId())); + request.addExtField("topic", String.valueOf(requestHeader.getTopic())); + request.addExtField("defaultTopicQueueNums", String.valueOf(requestHeader.getDefaultTopicQueueNums())); + request.addExtField("defaultTopic", requestHeader.getDefaultTopic()); + request.addExtField("sysFlag", String.valueOf(requestHeader.getSysFlag())); + request.addExtField("flag", String.valueOf(requestHeader.getFlag())); + request.addExtField("bornTimestamp", String.valueOf(requestHeader.getBornTimestamp())); + return request; + } + + private RemotingCommand createSendMsgBackCommand(int requestCode) { + ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); + + requestHeader.setMaxReconsumeTimes(3); + requestHeader.setDelayLevel(4); + requestHeader.setGroup("FooBar_PID"); + requestHeader.setOffset(123L); + + RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); + request.addExtField("group", requestHeader.getGroup()); + request.addExtField("offset", String.valueOf(requestHeader.getOffset())); + request.addExtField("delayLevel", String.valueOf(requestHeader.getDelayLevel())); + return request; + } + + private void assertPutResult(int responseCode) throws RemotingCommandException { + final RemotingCommand request = createSendMsgCommand(RequestCode.SEND_MESSAGE); + final RemotingCommand[] response = new RemotingCommand[1]; + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + response[0] = invocation.getArgument(0); + return null; + } + }).when(handlerContext).writeAndFlush(any(Object.class)); + + sendMessageProcessor = new SendMessageProcessor(brokerController); + RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request); + if (responseToReturn != null) { + assertThat(response[0]).isNull(); + response[0] = responseToReturn; + } + assertThat(response[0].getCode()).isEqualTo(responseCode); + assertThat(response[0].getOpaque()).isEqualTo(request.getOpaque()); + } +} \ No newline at end of file -- GitLab