提交 f1772539 编写于 作者: M maowei.ymw 提交者: von gosling

Modify the asynchronous send thread pool to a exclusive one

上级 3482f944
...@@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap; ...@@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null); this(defaultMQProducer, null);
} }
...@@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer; this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
} }
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
...@@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) { public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
} }
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
public ExecutorService getAsyncSenderExecutor() {
return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}
public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
this.asyncSenderExecutor = asyncSenderExecutor;
} }
public SendResult send(Message msg, public SendResult send(Message msg,
......
...@@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
} }
/**
* Sets an Executor to be used for executing asynchronous send. If the Executor is not set, {@link
* DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used.
*
* @param asyncSenderExecutor the instance of Executor
*/
public void setAsyncSenderExecutor(final ExecutorService asyncSenderExecutor) {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException { private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch; MessageBatch msgBatch;
try { try {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册