未验证 提交 73f20710 编写于 作者: Z zhangjidi2016 提交者: GitHub

[ISSUE #2219] Add some asynchronous API for batch messages (#2315)

* [ISSUE #2219] Add asynchronous batch send method.

* modify the ut
Co-authored-by: Nzhangjidi2016 <zhangjidi@cmss.chinamobile.com>
上级 742ba503
......@@ -916,6 +916,29 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
@Override
public void send(Collection<Message> msgs, SendCallback sendCallback,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
}
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
}
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
}
/**
* Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link
......
......@@ -99,7 +99,19 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
......
......@@ -232,6 +232,51 @@ public class DefaultMQProducerTest {
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(5);
}
@Test
public void testBatchSendMessageAsync()
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
cc.incrementAndGet();
countDownLatch.countDown();
}
};
MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
};
List<Message> msgs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
message.setBody(("hello world" + i).getBytes());
msgs.add(message);
}
producer.send(msgs, sendCallback);
producer.send(msgs, sendCallback, 1000);
MessageQueue mq = new MessageQueue("test", "BrokerA", 1);
producer.send(msgs, mq, sendCallback);
// this message is send failed
producer.send(msgs, new MessageQueue(), sendCallback, 1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
}
@Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册