diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 2e23eaa7f3b478e9ae91abc5af623bfa21b2658a..35f65e36c2d7c21d2d0c71534d64812b996c137d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -107,16 +107,20 @@ public class DefaultMQProducerTest { nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - any(SendCallback.class), nullable(TopicPublishInfo.class), any(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - SendCallback callback = (SendCallback) args[6]; - callback.onSuccess(createSendResult(SendStatus.SEND_OK)); - return new SendResult(); + CommunicationMode mode = invocation.getArgument(5); + switch (mode) { + case SYNC: + return createSendResult(SendStatus.SEND_OK); + case ONEWAY: + case ASYNC: + SendCallback callback = invocation.getArgument(6); + callback.onSuccess(createSendResult(SendStatus.SEND_OK)); + return null; + } + return null; } }); } @@ -244,6 +248,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(bigMessage, new SendCallback() { @@ -252,6 +257,7 @@ public class DefaultMQProducerTest { assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + cc.incrementAndGet(); countDownLatch.countDown(); } @@ -260,6 +266,7 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(cc.get()).isEqualTo(1); } @Test