diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f48594ebbeb721a2c79c18dcde301abd164c01d8 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.net.Broker2Client; +import org.apache.rocketmq.broker.transaction.TransactionalMessageService; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ReplyMessageProcessorTest { + private ReplyMessageProcessor replyMessageProcessor; + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private ChannelHandlerContext handlerContext; + @Mock + private MessageStore messageStore; + @Mock + private Channel channel; + @Mock + private TransactionalMessageService transactionMsgService; + private String topic = "FooBar"; + private String group = "FooBarGroup"; + private ClientChannelInfo clientInfo; + @Mock + private Broker2Client broker2Client; + + @Before + public void init() throws IllegalAccessException, NoSuchFieldException { + clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0); + brokerController.setMessageStore(messageStore); + Field field = BrokerController.class.getDeclaredField("broker2Client"); + field.setAccessible(true); + field.set(brokerController, broker2Client); + when(messageStore.now()).thenReturn(System.currentTimeMillis()); + Channel mockChannel = mock(Channel.class); + when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); + when(handlerContext.channel()).thenReturn(mockChannel); + when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); + replyMessageProcessor = new ReplyMessageProcessor(brokerController); + } + + @Test + public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException { + when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + brokerController.getProducerManager().registerProducer(group, clientInfo); + final RemotingCommand request = createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE); + when(brokerController.getBroker2Client().callClient(any(Channel.class), any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, request)); + RemotingCommand responseToReturn = replyMessageProcessor.processRequest(handlerContext, request); + assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); + } + + private RemotingCommand createSendMessageRequestHeaderCommand(int requestCode) { + SendMessageRequestHeader requestHeader = createSendMessageRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); + request.setBody(new byte[] {'a'}); + request.makeCustomHeaderToNet(); + return request; + } + + private SendMessageRequestHeader createSendMessageRequestHeader() { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup(group); + requestHeader.setTopic(topic); + requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC); + requestHeader.setDefaultTopicQueueNums(3); + requestHeader.setQueueId(1); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(124); + requestHeader.setReconsumeTimes(0); + Map map = new HashMap(); + map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); + requestHeader.setProperties(MessageDecoder.messageProperties2String(map)); + return requestHeader; + } + + private RemotingCommand createResponse(int code, RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + response.setCode(code); + response.setOpaque(request.getOpaque()); + return response; + } +} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 84af63235570bcfbe9d9919a7b39cfe0283e91a5..3f00d9e4030473f395c9b2a037e50b7e6fde24f0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; @@ -164,7 +165,7 @@ public class MQClientAPIImplTest { public Object answer(InvocationOnMock mock) throws Throwable { InvokeCallback callback = mock.getArgument(3); RemotingCommand request = mock.getArgument(1); - ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSuccessResponse(request)); callback.operationComplete(responseFuture); return null; @@ -289,6 +290,7 @@ public class MQClientAPIImplTest { assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed"); } } + @Test public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { doAnswer(new Answer() { @@ -322,6 +324,38 @@ public class MQClientAPIImplTest { assertThat(result).isEqualTo(true); } + @Test + public void testSendMessageTypeofReply() throws Exception { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + InvokeCallback callback = mock.getArgument(3); + RemotingCommand request = mock.getArgument(1); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + responseFuture.setResponseCommand(createSuccessResponse(request)); + callback.operationComplete(responseFuture); + return null; + } + }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class)); + SendMessageContext sendMessageContext = new SendMessageContext(); + sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); + msg.getProperties().put("MSG_TYPE", "reply"); + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(123L); + assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1); + } + + @Override + public void onException(Throwable e) { + } + }, null, null, 0, sendMessageContext, defaultMQProducerImpl); + } + private RemotingCommand createResumeSuccessResponse(RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 9540755fe339f37651f983fbe1ab524e7fa94004..6f2f959632b7d71990307ddcffb8add1ef1f4896 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.impl.CommunicationMode; @@ -45,6 +48,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; @@ -184,6 +188,7 @@ public class DefaultMQProducerTest { }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); } + @Test public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException { final AtomicInteger cc = new AtomicInteger(0); @@ -211,12 +216,12 @@ public class DefaultMQProducerTest { Message message = new Message(); message.setTopic("test"); message.setBody("hello world".getBytes()); - producer.send(new Message(),sendCallback); - producer.send(message,sendCallback,1000); - producer.send(message,new MessageQueue(),sendCallback); - producer.send(new Message(),new MessageQueue(),sendCallback,1000); - producer.send(new Message(),messageQueueSelector,null,sendCallback); - producer.send(message,messageQueueSelector,null,sendCallback,1000); + producer.send(new Message(), sendCallback); + producer.send(message, sendCallback, 1000); + producer.send(message, new MessageQueue(), sendCallback); + producer.send(new Message(), new MessageQueue(), sendCallback, 1000); + producer.send(new Message(), messageQueueSelector, null, sendCallback); + producer.send(message, messageQueueSelector, null, sendCallback, 1000); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(6); @@ -311,6 +316,100 @@ public class DefaultMQProducerTest { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + @Test + public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + new Thread(new Runnable() { + @Override public void run() { + boolean flag = true; + ConcurrentHashMap responseMap = null; + while (flag) { + responseMap = RequestFutureTable.getRequestFutureTable(); + if (responseMap != null) { + flag = false; + } + } + assertThat(responseMap).isNotNull(); + for (Map.Entry entry : responseMap.entrySet()) { + RequestResponseFuture future = entry.getValue(); + future.putResponseMessage(message); + } + } + }).start(); + Message result = producer.request(message, 3 * 1000L); + assertThat(result.getTopic()).isEqualTo("FooBar"); + assertThat(result.getBody()).isEqualTo(new byte[] {'a'}); + } + + @Test(expected = RequestTimeoutException.class) + public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + Message result = producer.request(message, 3 * 1000L); + } + + @Test + public void testAsyncRequest_OnSuccess() throws Exception { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + final CountDownLatch countDownLatch = new CountDownLatch(1); + RequestCallback requestCallback = new RequestCallback() { + @Override public void onSuccess(Message message) { + assertThat(message.getTopic()).isEqualTo("FooBar"); + assertThat(message.getBody()).isEqualTo(new byte[] {'a'}); + assertThat(message.getFlag()).isEqualTo(1); + countDownLatch.countDown(); + } + + @Override public void onException(Throwable e) { + } + }; + producer.request(message, requestCallback, 3 * 1000L); + ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable(); + assertThat(responseMap).isNotNull(); + for (Map.Entry entry : responseMap.entrySet()) { + RequestResponseFuture future = entry.getValue(); + future.setSendReqeustOk(true); + message.setFlag(1); + future.getRequestCallback().onSuccess(message); + } + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testAsyncRequest_OnException() throws Exception { + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(1); + RequestCallback requestCallback = new RequestCallback() { + @Override public void onSuccess(Message message) { + + } + + @Override public void onException(Throwable e) { + cc.incrementAndGet(); + countDownLatch.countDown(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return null; + } + }; + + try { + producer.request(message, requestCallback, 3 * 1000L); + failBecauseExceptionWasNotThrown(RemotingSendRequestException.class); + } catch (RemotingSendRequestException e) { + ConcurrentHashMap responseMap = RequestFutureTable.getRequestFutureTable(); + assertThat(responseMap).isNotNull(); + for (Map.Entry entry : responseMap.entrySet()) { + RequestResponseFuture future = entry.getValue(); + future.getRequestCallback().onException(new Throwable()); + } + } + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(cc.get()).isEqualTo(1); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java new file mode 100644 index 0000000000000000000000000000000000000000..90e4623e9b486770b1740d090a2dc9f05aca51f5 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.producer; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.message.Message; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RequestResponseFutureTest { + + @Test + public void testExecuteRequestCallback() throws Exception { + final AtomicInteger cc = new AtomicInteger(0); + RequestResponseFuture future = new RequestResponseFuture(UUID.randomUUID().toString(), 3 * 1000L, new RequestCallback() { + @Override public void onSuccess(Message message) { + cc.incrementAndGet(); + } + + @Override public void onException(Throwable e) { + } + }); + future.setSendReqeustOk(true); + future.executeRequestCallback(); + assertThat(cc.get()).isEqualTo(1); + } + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a722fa6cbb52b1b7b509140095a6461669cbecb3 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.utils; + +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; + +public class MessageUtilsTest { + + @Test + public void testCreateReplyMessage() throws MQClientException { + Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'}); + assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX); + assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO)).isEqualTo("127.0.0.1"); + assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000"); + } + + @Test + public void testCreateReplyMessage_Exception() throws MQClientException { + try { + Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'}); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("create reply message fail."); + } + } + + private Message createReplyMessage(String clusterName) { + Message requestMessage = new Message(); + Map map = new HashMap(); + map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); + map.put(MessageConst.PROPERTY_CLUSTER, clusterName); + map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000"); + MessageAccessor.setProperties(requestMessage, map); + return requestMessage; + } + +}