From 37cf2a7cb706a0cfe5b217940467c4a9cb33626e Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 6 Dec 2017 16:21:24 +0800 Subject: [PATCH] [ROCKETMQ-324] Expose an interface for client to specify the async call back executor --- .../impl/producer/DefaultMQProducerImpl.java | 4 ++ .../client/producer/DefaultMQProducer.java | 12 ++++++ .../producer/DefaultMQProducerTest.java | 19 ++++++++ .../rocketmq/remoting/RemotingClient.java | 2 + .../remoting/netty/NettyRemotingClient.java | 12 +++++- .../netty/NettyRemotingClientTest.java | 43 +++++++++++++++++++ 6 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index db9e5124..35b905e1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1060,6 +1060,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.defaultMQProducer.getSendMsgTimeout()); } + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); + } + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 30185540..a2f25dd0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; @@ -34,6 +35,7 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; /** * This class is the entry point for applications intending to send messages. @@ -630,6 +632,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); } + /** + * Sets an Executor to be used for executing callback methods. + * If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used. + * + * @param callbackExecutor the instance of Executor + */ + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 1ae6f2f2..ded22ada 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -39,6 +41,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -195,6 +198,22 @@ public class DefaultMQProducerTest { } } + @Test + public void testSetCallbackExecutor() throws MQClientException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + ExecutorService customized = Executors.newCachedThreadPool(); + producer.setCallbackExecutor(customized); + + NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl() + .getmQClientFactory().getMQClientAPIImpl().getRemotingClient(); + + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index b527408e..2aea14cb 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -46,5 +46,7 @@ public interface RemotingClient extends RemotingService { void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); + void setCallbackExecutor(final ExecutorService callbackExecutor); + boolean isChannelWritable(final String addr); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index a96423c1..6dc0457e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; + + /** + * Invoke the callback methods in this executor when process response. + */ + private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; private RPCHook rpcHook; @@ -582,7 +587,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public ExecutorService getCallbackExecutor() { - return this.publicExecutor; + return callbackExecutor != null ? callbackExecutor : publicExecutor; + } + + @Override + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.callbackExecutor = callbackExecutor; } static class ChannelWrapper { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java new file mode 100644 index 00000000..04a3bebb --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + +import java.lang.reflect.Field; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingClientTest { + private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig()); + + @Test + public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { + Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor"); + field.setAccessible(true); + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient)); + + ExecutorService customized = Executors.newCachedThreadPool(); + remotingClient.setCallbackExecutor(customized); + + assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); + } +} \ No newline at end of file -- GitLab