From 73f2071087cc47a37601988d6b6fee4b78b9ff3d Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Fri, 25 Sep 2020 14:57:40 +0800 Subject: [PATCH] [ISSUE #2219] Add some asynchronous API for batch messages (#2315) * [ISSUE #2219] Add asynchronous batch send method. * modify the ut Co-authored-by: zhangjidi2016 --- .../client/producer/DefaultMQProducer.java | 23 ++++++++++ .../rocketmq/client/producer/MQProducer.java | 14 +++++- .../producer/DefaultMQProducerTest.java | 45 +++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 09969504..24caf140 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -916,6 +916,29 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); } + + @Override + public void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), sendCallback); + } + + @Override + public void send(Collection msgs, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout); + } + + @Override + public void send(Collection msgs, MessageQueue mq, + SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback); + } + + @Override + public void send(Collection msgs, MessageQueue mq, + SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout); + } /** * Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index c6cf4c93..f70ddb28 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -99,7 +99,19 @@ public interface MQProducer extends MQAdmin { SendResult send(final Collection msgs, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - + + void send(final Collection msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + void send(final Collection msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + + void send(final Collection msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + + void send(final Collection msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + //for rpc Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 41046fcd..e1771b96 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -232,6 +232,51 @@ public class DefaultMQProducerTest { countDownLatch.await(3000L, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(5); } + + @Test + public void testBatchSendMessageAsync() + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(4); + + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + countDownLatch.countDown(); + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + cc.incrementAndGet(); + countDownLatch.countDown(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return null; + } + }; + + List msgs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Message message = new Message(); + message.setTopic("test"); + message.setBody(("hello world" + i).getBytes()); + msgs.add(message); + } + producer.send(msgs, sendCallback); + producer.send(msgs, sendCallback, 1000); + MessageQueue mq = new MessageQueue("test", "BrokerA", 1); + producer.send(msgs, mq, sendCallback); + // this message is send failed + producer.send(msgs, new MessageQueue(), sendCallback, 1000); + + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(cc.get()).isEqualTo(1); + } @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { -- GitLab