未验证 提交 09fb8b5d 编写于 作者: Y yukon 提交者: GitHub

Merge pull request #201 from zhouxinyu/ROCKETMQ-324

[ROCKETMQ-324] Expose an interface for client to specify the async call back executor
...@@ -1060,6 +1060,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1060,6 +1060,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.defaultMQProducer.getSendMsgTimeout()); this.defaultMQProducer.getSendMsgTimeout());
} }
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
public SendResult send(Message msg, public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
......
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.producer;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
...@@ -34,6 +35,7 @@ import org.apache.rocketmq.common.message.MessageId; ...@@ -34,6 +35,7 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
/** /**
* This class is the entry point for applications intending to send messages. * This class is the entry point for applications intending to send messages.
...@@ -630,6 +632,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -630,6 +632,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
} }
/**
* Sets an Executor to be used for executing callback methods.
* If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used.
*
* @param callbackExecutor the instance of Executor
*/
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException { private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch; MessageBatch msgBatch;
try { try {
......
...@@ -22,6 +22,8 @@ import java.util.Collections; ...@@ -22,6 +22,8 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -39,6 +41,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; ...@@ -39,6 +41,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -195,6 +198,22 @@ public class DefaultMQProducerTest { ...@@ -195,6 +198,22 @@ public class DefaultMQProducerTest {
} }
} }
@Test
public void testSetCallbackExecutor() throws MQClientException {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
ExecutorService customized = Executors.newCachedThreadPool();
producer.setCallbackExecutor(customized);
NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl()
.getmQClientFactory().getMQClientAPIImpl().getRemotingClient();
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
public static TopicRouteData createTopicRoute() { public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData(); TopicRouteData topicRouteData = new TopicRouteData();
......
...@@ -46,5 +46,7 @@ public interface RemotingClient extends RemotingService { ...@@ -46,5 +46,7 @@ public interface RemotingClient extends RemotingService {
void registerProcessor(final int requestCode, final NettyRequestProcessor processor, void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor); final ExecutorService executor);
void setCallbackExecutor(final ExecutorService callbackExecutor);
boolean isChannelWritable(final String addr); boolean isChannelWritable(final String addr);
} }
...@@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Lock lockNamesrvChannel = new ReentrantLock(); private final Lock lockNamesrvChannel = new ReentrantLock();
private final ExecutorService publicExecutor; private final ExecutorService publicExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener; private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook; private RPCHook rpcHook;
...@@ -582,7 +587,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -582,7 +587,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override @Override
public ExecutorService getCallbackExecutor() { public ExecutorService getCallbackExecutor() {
return this.publicExecutor; return callbackExecutor != null ? callbackExecutor : publicExecutor;
}
@Override
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
} }
static class ChannelWrapper { static class ChannelWrapper {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class NettyRemotingClientTest {
private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());
@Test
public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
field.setAccessible(true);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));
ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册