diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7ace9d5b07d13bb850e2c51bc3eb4d9ee9d517ab..90f4f7876352c613dbce7e8d31e6557585a3fc0f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + private final BlockingQueue asyncSenderThreadPoolQueue; + private final ExecutorService defaultAsyncSenderExecutor; + private ExecutorService asyncSenderExecutor; + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); } @@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; + + this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000); + this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.asyncSenderThreadPoolQueue, + new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); + } + }); } public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { @@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } - public ExecutorService getCallbackExecutor() { - return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + public ExecutorService getAsyncSenderExecutor() { + return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor; + } + + public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { + this.asyncSenderExecutor = asyncSenderExecutor; } public SendResult send(Message msg, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 9732d0eb84458062d203db0af55df8e37043b1a3..f57e52c3efaa93abe55b1d6db40d0d085c435066 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); } + /** + * Sets an Executor to be used for executing asynchronous send. If the Executor is not set, {@link + * DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used. + * + * @param asyncSenderExecutor the instance of Executor + */ + public void setAsyncSenderExecutor(final ExecutorService asyncSenderExecutor) { + this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try {