diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java index 350cde7668361fe2b8e6675617e1ddade28042ae..1be93ce0f3a1dd6d557d841c85109f3410bc40c6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java @@ -21,7 +21,6 @@ import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class ThreadLocalIndexTest { - @Test public void testGetAndIncrement() throws Exception { ThreadLocalIndex localIndex = new ThreadLocalIndex(); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java index 58d99d60d68cbdf24207d144664d7d9e54481612..0a54855bafc3d7e14523cf01f247072136b586f1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java @@ -30,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; public class LocalFileOffsetStoreTest { - @Mock private static MQClientInstance mQClientFactory; private static String group = "FooBarGroup"; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java index 9a6a4b555089a66603b9474a82b007f42c33fc09..7ecb0225c4193e5d9c7fa7d02b9eefc3e5b97fa6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java @@ -43,7 +43,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RemoteBrokerOffsetStoreTest { - @Mock private static MQClientInstance mQClientFactory; @Mock diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cbcf560127b6e1fda1b38a797c911089874aca4b --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl; + +import java.lang.reflect.Field; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class MQClientAPIImplTest { + private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig()); + @Mock + private static RemotingClient remotingClient; + @Mock + private static DefaultMQProducerImpl defaultMQProducerImpl; + + private String brokerAddr = "127.0.0.1"; + private String brokerName = "DefaultBroker"; + private static String group = "FooBarGroup"; + private static String topic = "FooBar"; + private Message msg = new Message("FooBar", new byte[] {}); + + @BeforeClass + public static void init() throws Exception { + remotingClient = mock(NettyRemotingClient.class); + defaultMQProducerImpl = mock(DefaultMQProducerImpl.class); + Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(mqClientAPI, remotingClient); + } + + @Test + public void testSendMessageOneWay_Success() throws RemotingException, InterruptedException, MQBrokerException { + doNothing().when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong()); + SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl); + assertThat(sendResult).isNull(); + } + + @Test + public void testSendMessageOneWay_WithException() throws RemotingException, InterruptedException, MQBrokerException { + doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong()); + try { + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl); + failBecauseExceptionWasNotThrown(RemotingException.class); + } catch (RemotingException e) { + assertThat(e).hasMessage("Remoting Exception in Test"); + } + + doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong()); + try { + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl); + failBecauseExceptionWasNotThrown(InterruptedException.class); + } catch (InterruptedException e) { + assertThat(e).hasMessage("Interrupted Exception in Test"); + } + } + + @Test + public void testSendMessageSync_Success() throws InterruptedException, RemotingException, MQBrokerException { + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createSuccessResponse(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + SendMessageRequestHeader requestHeader = createSendMessageRequestHeader(); + + SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader, + 3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl); + + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(123L); + assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1); + } + + @Test + public void testSendMessageSync_WithException() throws InterruptedException, RemotingException, MQBrokerException { + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setOpaque(request.getOpaque()); + response.setRemark("Broker is broken."); + return response; + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + SendMessageRequestHeader requestHeader = createSendMessageRequestHeader(); + + try { + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader, + 3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl); + failBecauseExceptionWasNotThrown(MQBrokerException.class); + } catch (MQBrokerException e) { + assertThat(e).hasMessageContaining("Broker is broken."); + } + } + + @Test + public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException { + doNothing().when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl); + assertThat(sendResult).isNull(); + + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock mock) throws Throwable { + InvokeCallback callback = mock.getArgument(3); + RemotingCommand request = mock.getArgument(1); + ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null); + responseFuture.setResponseCommand(createSuccessResponse(request)); + callback.operationComplete(responseFuture); + return null; + } + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + SendMessageContext sendMessageContext = new SendMessageContext(); + sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer())); + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC, + new SendCallback() { + @Override public void onSuccess(SendResult sendResult) { + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); + assertThat(sendResult.getQueueOffset()).isEqualTo(123L); + assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1); + } + + @Override public void onException(Throwable e) { + } + }, + null, null, 0, sendMessageContext, defaultMQProducerImpl); + } + + @Test + public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException { + doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient) + .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + try { + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl); + failBecauseExceptionWasNotThrown(RemotingException.class); + } catch (RemotingException e) { + assertThat(e).hasMessage("Remoting Exception in Test"); + } + + doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient) + .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + try { + mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), + 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl); + failBecauseExceptionWasNotThrown(InterruptedException.class); + } catch (InterruptedException e) { + assertThat(e).hasMessage("Interrupted Exception in Test"); + } + } + + private RemotingCommand createSuccessResponse(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + + SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + responseHeader.setMsgId("123"); + responseHeader.setQueueId(1); + responseHeader.setQueueOffset(123L); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, "RegionHZ"); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, "true"); + response.addExtField("queueId", String.valueOf(responseHeader.getQueueId())); + response.addExtField("msgId", responseHeader.getMsgId()); + response.addExtField("queueOffset", String.valueOf(responseHeader.getQueueOffset())); + return response; + } + + private SendMessageRequestHeader createSendMessageRequestHeader() { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setTopic(topic); + requestHeader.setProducerGroup(group); + requestHeader.setQueueId(1); + requestHeader.setMaxReconsumeTimes(10); + return requestHeader; + } +} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index 956cdd9065529003030e901131c06c26014ebdc7..b7b07c6af99722bee54ec280cf28ec3649f60e67 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; @@ -34,7 +33,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -42,11 +40,8 @@ import static org.mockito.Mockito.mock; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { - - @Mock - private static MQClientAPIImpl mqClientAPI; @InjectMocks - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());; + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());; private String topic = "FooBar"; private String group = "FooBarGroup"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 46ca8dd1185623681483f266d078214afcfe0737..99c13fbb77a1a5088d18c1c035ac1e17a47bcd72 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -114,9 +114,7 @@ public class RemotingCommand { } public static RemotingCommand createResponseCommand(Class classHeader) { - RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); - - return cmd; + return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); } /**