diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 818c94a509053f0619ac61d7aad4fe14c26053e3..d572a2320e30eecbb91bf74b4e80a784a971937d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -111,22 +111,7 @@ public class DefaultMQProducerTest { nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - CommunicationMode mode = invocation.getArgument(5); - switch (mode) { - case SYNC: - return createSendResult(SendStatus.SEND_OK); - case ONEWAY: - case ASYNC: - SendCallback callback = invocation.getArgument(6); - callback.onSuccess(createSendResult(SendStatus.SEND_OK)); - return null; - } - return null; - } - }); + .thenReturn(createSendResult(SendStatus.SEND_OK)); } @After @@ -189,7 +174,6 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicInteger cc = new AtomicInteger(0); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(message, new SendCallback() { @Override @@ -198,15 +182,14 @@ public class DefaultMQProducerTest { assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); countDownLatch.countDown(); - cc.incrementAndGet(); } @Override public void onException(Throwable e) { + countDownLatch.countDown(); } }); - countDownLatch.await(8000L, TimeUnit.MILLISECONDS); - assertThat(cc.get()).isEqualTo(1); + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); } @Test @@ -252,7 +235,6 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(1); when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(bigMessage, new SendCallback() { @@ -261,7 +243,6 @@ public class DefaultMQProducerTest { assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - cc.incrementAndGet(); countDownLatch.countDown(); } @@ -270,7 +251,6 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - assertThat(cc.get()).isEqualTo(1); } @Test