diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index db9e51241f15a4a0ee0f6f173b5e392054cb3ca3..35b905e18d21609f6ade4d6c0c10fafc34f9180f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1060,6 +1060,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.defaultMQProducer.getSendMsgTimeout()); } + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); + } + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 301855401f5150525c5b041f6a12a82ee724efb4..a2f25dd0f8f47ae5f0031296a0440068b6275160 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; @@ -34,6 +35,7 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; /** * This class is the entry point for applications intending to send messages. @@ -630,6 +632,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); } + /** + * Sets an Executor to be used for executing callback methods. + * If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used. + * + * @param callbackExecutor the instance of Executor + */ + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 1ae6f2f2cc69c67d4856bd4889c068b2fc60d0e2..ded22ada914b163589b909a24d0f1b3a6b02e46c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -39,6 +41,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -195,6 +198,22 @@ public class DefaultMQProducerTest { } } + @Test + public void testSetCallbackExecutor() throws MQClientException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + ExecutorService customized = Executors.newCachedThreadPool(); + producer.setCallbackExecutor(customized); + + NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl() + .getmQClientFactory().getMQClientAPIImpl().getRemotingClient(); + + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index b527408e7a4aa6e6838fbaa87306e35212cb9c56..2aea14cb9d6078b563a210efa285fd13436c4674 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -46,5 +46,7 @@ public interface RemotingClient extends RemotingService { void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); + void setCallbackExecutor(final ExecutorService callbackExecutor); + boolean isChannelWritable(final String addr); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index a96423c1fce494d5d8d32c16d0ebcb9520b7a755..6dc0457e4145c912eab6377ba4095824c24d0f2d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; + + /** + * Invoke the callback methods in this executor when process response. + */ + private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; private RPCHook rpcHook; @@ -582,7 +587,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public ExecutorService getCallbackExecutor() { - return this.publicExecutor; + return callbackExecutor != null ? callbackExecutor : publicExecutor; + } + + @Override + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.callbackExecutor = callbackExecutor; } static class ChannelWrapper { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..04a3bebbb5791ba246994240f8191bd252028c27 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + +import java.lang.reflect.Field; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingClientTest { + private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig()); + + @Test + public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { + Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor"); + field.setAccessible(true); + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient)); + + ExecutorService customized = Executors.newCachedThreadPool(); + remotingClient.setCallbackExecutor(customized); + + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); + } +} \ No newline at end of file