diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 2e23eaa7f3b478e9ae91abc5af623bfa21b2658a..cb32bda8ac218dd64d87d3be981809623e38c235 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -107,16 +107,20 @@ public class DefaultMQProducerTest { nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - any(SendCallback.class), nullable(TopicPublishInfo.class), any(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - SendCallback callback = (SendCallback) args[6]; - callback.onSuccess(createSendResult(SendStatus.SEND_OK)); - return new SendResult(); + CommunicationMode mode = invocation.getArgument(5); + switch (mode) { + case SYNC: + return createSendResult(SendStatus.SEND_OK); + case ONEWAY: + case ASYNC: + SendCallback callback = invocation.getArgument(6); + callback.onSuccess(createSendResult(SendStatus.SEND_OK)); + return null; + } + return null; } }); } @@ -181,7 +185,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicInteger cc = new AtomicInteger(0); +// final AtomicInteger cc = new AtomicInteger(0); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(message, new SendCallback() { @Override @@ -190,7 +194,7 @@ public class DefaultMQProducerTest { assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); countDownLatch.countDown(); - cc.incrementAndGet(); +// cc.incrementAndGet(); } @Override @@ -198,7 +202,7 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - assertThat(cc.get()).isEqualTo(1); +// assertThat(cc.get()).isEqualTo(1); } @Test @@ -244,6 +248,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(bigMessage, new SendCallback() { @@ -252,6 +257,7 @@ public class DefaultMQProducerTest { assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); + cc.incrementAndGet(); countDownLatch.countDown(); } @@ -260,6 +266,7 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(cc.get()).isEqualTo(1); } @Test