From b29c318cdde225ef3a33a73e939e49e087766a28 Mon Sep 17 00:00:00 2001 From: yukon Date: Sat, 21 Jan 2017 23:13:01 +0800 Subject: [PATCH] [ROCKETMQ-51] Add unit tests for ClientManageProcessor --- .../processor/ClientManageProcessorTest.java | 132 ++++++++++++++++++ .../processor/PullMessageProcessorTest.java | 38 ++++- .../processor/SendMessageProcessorTest.java | 44 +++++- 3 files changed, 207 insertions(+), 7 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java new file mode 100644 index 00000000..147c7323 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.util.HashMap; +import java.util.UUID; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ClientManageProcessorTest { + private ClientManageProcessor clientManageProcessor; + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private ChannelHandlerContext handlerContext; + @Mock + private Channel channel; + + private ClientChannelInfo clientChannelInfo; + private String clientId = UUID.randomUUID().toString(); + private String group = "FooBarGroup"; + private String topic = "FooBar"; + + @Before + public void init() { + when(handlerContext.channel()).thenReturn(channel); + clientManageProcessor = new ClientManageProcessor(brokerController); + clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100); + brokerController.getProducerManager().registerProducer(group, clientChannelInfo); + + ConsumerData consumerData = createConsumerData(group, topic); + brokerController.getConsumerManager().registerConsumer( + consumerData.getGroupName(), + clientChannelInfo, + consumerData.getConsumeType(), + consumerData.getMessageModel(), + consumerData.getConsumeFromWhere(), + consumerData.getSubscriptionDataSet(), + false); + } + + @Test + public void processRequest_UnRegisterProducer() throws Exception { + brokerController.getProducerManager().registerProducer(group, clientChannelInfo); + HashMap channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); + assertThat(channelMap).isNotNull(); + assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo); + + RemotingCommand request = createUnRegisterProducerCommand(); + RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request); + assertThat(response).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); + assertThat(channelMap).isNull(); + } + + @Test + public void processRequest_UnRegisterConsumer() throws RemotingCommandException { + ConsumerGroupInfo consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group); + assertThat(consumerGroupInfo).isNotNull(); + + RemotingCommand request = createUnRegisterConsumerCommand(); + RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request); + assertThat(response).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group); + assertThat(consumerGroupInfo).isNull(); + } + + private RemotingCommand createUnRegisterProducerCommand() { + UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); + requestHeader.setClientID(clientId); + requestHeader.setProducerGroup(group); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); + request.setLanguage(LanguageCode.JAVA); + request.setVersion(100); + request.makeCustomHeaderToNet(); + return request; + } + + private RemotingCommand createUnRegisterConsumerCommand() { + UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); + requestHeader.setClientID(clientId); + requestHeader.setConsumerGroup(group); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); + request.setLanguage(LanguageCode.JAVA); + request.setVersion(100); + request.makeCustomHeaderToNet(); + return request; + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index 9c3ec67c..d3d98120 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -19,10 +19,14 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -78,7 +82,7 @@ public class PullMessageProcessorTest { when(handlerContext.channel()).thenReturn(mockChannel); brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); clientChannelInfo = new ClientChannelInfo(mockChannel); - ConsumerData consumerData = createConsumerData(); + ConsumerData consumerData = createConsumerData(group, topic); brokerController.getConsumerManager().registerConsumer( consumerData.getGroupName(), clientChannelInfo, @@ -130,6 +134,36 @@ public class PullMessageProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testProcessRequest_FoundWithHook() throws RemotingCommandException { + GetMessageResult getMessageResult = createGetMessageResult(); + when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult); + List consumeMessageHookList = new ArrayList<>(); + final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1]; + ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() { + @Override public String hookName() { + return "TestHook"; + } + + @Override public void consumeMessageBefore(ConsumeMessageContext context) { + messageContext[0] = context; + } + + @Override public void consumeMessageAfter(ConsumeMessageContext context) { + } + }; + consumeMessageHookList.add(consumeMessageHook); + pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); + final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); + assertThat(response).isNotNull(); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(messageContext[0]).isNotNull(); + assertThat(messageContext[0].getConsumerGroup()).isEqualTo(group); + assertThat(messageContext[0].getTopic()).isEqualTo(topic); + assertThat(messageContext[0].getQueueId()).isEqualTo(1); + } + @Test public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException { GetMessageResult getMessageResult = createGetMessageResult(); @@ -170,7 +204,7 @@ public class PullMessageProcessorTest { return request; } - private ConsumerData createConsumerData() { + static ConsumerData createConsumerData(String group, String topic) { ConsumerData consumerData = new ConsumerData(); consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 2cf8d45f..02490a07 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -19,7 +19,11 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.mqtrace.SendMessageContext; +import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt; @@ -64,6 +68,9 @@ public class SendMessageProcessorTest { @Mock private MessageStore messageStore; + private String topic = "FooBar"; + private String group = "FooBarGroup"; + @Before public void init() { brokerController.setMessageStore(messageStore); @@ -72,6 +79,7 @@ public class SendMessageProcessorTest { when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); when(handlerContext.channel()).thenReturn(mockChannel); when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); + sendMessageProcessor = new SendMessageProcessor(brokerController); } @Test @@ -80,6 +88,33 @@ public class SendMessageProcessorTest { assertPutResult(ResponseCode.SUCCESS); } + @Test + public void testProcessRequest_WithHook() throws RemotingCommandException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + List sendMessageHookList = new ArrayList<>(); + final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; + SendMessageHook sendMessageHook = new SendMessageHook() { + @Override public String hookName() { + return null; + } + + @Override public void sendMessageBefore(SendMessageContext context) { + sendMessageContext[0] = context; + } + + @Override public void sendMessageAfter(SendMessageContext context) { + + } + }; + sendMessageHookList.add(sendMessageHook); + sendMessageProcessor.registerSendMessageHook(sendMessageHookList); + assertPutResult(ResponseCode.SUCCESS); + System.out.println(sendMessageContext[0]); + assertThat(sendMessageContext[0]).isNotNull(); + assertThat(sendMessageContext[0].getTopic()).isEqualTo(topic); + assertThat(sendMessageContext[0].getProducerGroup()).isEqualTo(group); + } + @Test public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { @@ -142,8 +177,8 @@ public class SendMessageProcessorTest { private RemotingCommand createSendMsgCommand(int requestCode) { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); - requestHeader.setProducerGroup("FooBar_PID"); - requestHeader.setTopic("FooBar"); + requestHeader.setProducerGroup(group); + requestHeader.setTopic(topic); requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); requestHeader.setDefaultTopicQueueNums(3); requestHeader.setQueueId(1); @@ -153,6 +188,7 @@ public class SendMessageProcessorTest { requestHeader.setReconsumeTimes(0); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); + request.setBody(new byte[] {'a'}); request.makeCustomHeaderToNet(); return request; } @@ -162,7 +198,7 @@ public class SendMessageProcessorTest { requestHeader.setMaxReconsumeTimes(3); requestHeader.setDelayLevel(4); - requestHeader.setGroup("FooBar_PID"); + requestHeader.setGroup(group); requestHeader.setOffset(123L); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); @@ -179,8 +215,6 @@ public class SendMessageProcessorTest { return null; } }).when(handlerContext).writeAndFlush(any(Object.class)); - - sendMessageProcessor = new SendMessageProcessor(brokerController); RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request); if (responseToReturn != null) { assertThat(response[0]).isNull(); -- GitLab