diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 3a023e362cb8c3eb1c06630674e2a13ce8b003c4..1837204a3090802c47fad5dff5b0af05c3915212 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -308,6 +308,7 @@ public class MQClientAPIImpl {
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
+ long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
@@ -324,11 +325,19 @@ public class MQClientAPIImpl {
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+ long costTimeAsync = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTimeAsync) {
+ throw new RemotingTooMuchRequestException("sendMessage call timeout");
+ }
+ this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
+ long costTimeSync = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTimeSync) {
+ throw new RemotingTooMuchRequestException("sendMessage call timeout");
+ }
+ return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
@@ -2080,4 +2089,4 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
-}
\ No newline at end of file
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4b5e373fd2767971201a672e43c981b1dc2189bb..e1d9f90429512a46ce9950a1ce31a873a63cd5b8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -64,6 +64,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -414,19 +416,48 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg,
- SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
- public void send(Message msg, SendCallback sendCallback, long timeout)
+ /**
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+ * A new one will be provided in next version
+ * @param msg
+ * @param sendCallback
+ * @param timeout the sendCallback
will be invoked at most time
+ * @throws RejectedExecutionException
+ */
+ @Deprecated
+ public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
+ final long beginStartTime = System.currentTimeMillis();
+ ExecutorService executor = this.getCallbackExecutor();
try {
- this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknownn exception", e);
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout > costTime) {
+ try {
+ sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
+ } catch (Exception e) {
+ sendCallback.onException(e);
+ }
+ } else {
+ sendCallback.onException(
+ new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+ }
+ }
+
+ });
+ } catch (RejectedExecutionException e) {
+ throw new MQClientException("executor rejected ", e);
}
+
}
+
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
@@ -450,6 +481,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
@@ -464,7 +496,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+ long costTime = beginTimestampPrev - beginTimestampFirst;
+ if (timeout < costTime) {
+ callTimeout = true;
+ break;
+ }
+
+ sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
@@ -546,6 +584,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
+ if (callTimeout) {
+ throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
+ }
+
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
@@ -587,11 +629,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendKernelImpl(final Message msg,
- final MessageQueue mq,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ final MessageQueue mq,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
@@ -691,13 +734,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
-
+ long costTimeAsync = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTimeAsync) {
+ throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
+ }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
- timeout,
+ timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
@@ -708,12 +754,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break;
case ONEWAY:
case SYNC:
+ long costTimeSync = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTimeSync) {
+ throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
+ }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
- timeout,
+ timeout - costTimeSync,
communicationMode,
context,
this);
@@ -844,6 +894,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -851,6 +902,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new MQClientException("message's topic not equal mq's topic", null);
}
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTime) {
+ throw new RemotingTooMuchRequestException("call timeout");
+ }
+
return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
}
@@ -862,20 +918,55 @@ public class DefaultMQProducerImpl implements MQProducerInner {
send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+ /**
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+ * A new one will be provided in next version
+ * @param msg
+ * @param mq
+ * @param sendCallback
+ * @param timeout the sendCallback
will be invoked at most time
+ * @throws MQClientException
+ * @throws RemotingException
+ * @throws InterruptedException
+ */
+ @Deprecated
+ public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
+ final long beginStartTime = System.currentTimeMillis();
+ ExecutorService executor = this.getCallbackExecutor();
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ makeSureStateOK();
+ Validators.checkMessage(msg, defaultMQProducer);
- if (!msg.getTopic().equals(mq.getTopic())) {
- throw new MQClientException("message's topic not equal mq's topic", null);
- }
+ if (!msg.getTopic().equals(mq.getTopic())) {
+ throw new MQClientException("message's topic not equal mq's topic", null);
+ }
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout > costTime) {
+ try {
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
+ timeout - costTime);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception", e);
+ }
+ } else {
+ sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+ }
+ } catch (Exception e) {
+ sendCallback.onException(e);
+ }
- try {
- this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
+ }
+
+ });
+ } catch (RejectedExecutionException e) {
+ throw new MQClientException("executor rejected ", e);
}
+
}
/**
@@ -913,6 +1004,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -925,8 +1017,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new MQClientException("select message queue throwed exception.", e);
}
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTime) {
+ throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
+ }
if (mq != null) {
- return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
+ return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
@@ -943,12 +1039,47 @@ public class DefaultMQProducerImpl implements MQProducerInner {
send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+ /**
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+ * A new one will be provided in next version
+ * @param msg
+ * @param selector
+ * @param arg
+ * @param sendCallback
+ * @param timeout the sendCallback
will be invoked at most time
+ * @throws MQClientException
+ * @throws RemotingException
+ * @throws InterruptedException
+ */
+ @Deprecated
+ public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
+ final long beginStartTime = System.currentTimeMillis();
+ ExecutorService executor = this.getCallbackExecutor();
try {
- this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknownn exception", e);
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout > costTime) {
+ try {
+ try {
+ sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
+ timeout - costTime);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknownn exception", e);
+ }
+ } catch (Exception e) {
+ sendCallback.onException(e);
+ }
+ } else {
+ sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+ }
+ }
+
+ });
+ } catch (RejectedExecutionException e) {
+ throw new MQClientException("exector rejected ", e);
}
}
@@ -1082,6 +1213,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
+ public ExecutorService getCallbackExecutor() {
+ return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+
+ }
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index d6dce86c72a2fccd4fd34496f89bf473dac0e09e..ff2fb78bbfb9df89596abc2ad7b35fcc33a2df3e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -289,4 +289,4 @@ public class DefaultMQPushConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
-}
\ No newline at end of file
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index d3c6cc8b117236d3d106fb1e2b04e01e3ffd646e..c225afd6842b24adc49818575d54a2d5ca5e016d 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -24,6 +24,9 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -65,6 +69,8 @@ public class DefaultMQProducerTest {
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
+ @Mock
+ private NettyRemotingClient nettyRemotingClient;
private DefaultMQProducer producer;
private Message message;
@@ -161,38 +167,91 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+ when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
+ countDownLatch.countDown();
}
});
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ callbackExecutor.shutdown();
+ }
+ @Test
+ public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
+ final AtomicInteger cc = new AtomicInteger(0);
+ final CountDownLatch countDownLatch = new CountDownLatch(6);
+ ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+ when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+ when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
+
+ SendCallback sendCallback = new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ }
+ @Override
+ public void onException(Throwable e) {
+ e.printStackTrace();
+ cc.incrementAndGet();
+ countDownLatch.countDown();
+ }
+ };
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List mqs, Message msg, Object arg) {
+ return null;
+ }
+ };
+
+ Message message = new Message();
+ message.setTopic("test");
+ message.setBody("hello world".getBytes());
+ producer.send(new Message(),sendCallback);
+ producer.send(message,sendCallback,1000);
+ producer.send(message,new MessageQueue(),sendCallback);
+ producer.send(new Message(),new MessageQueue(),sendCallback,1000);
+ producer.send(new Message(),messageQueueSelector,null,sendCallback);
+ producer.send(message,messageQueueSelector,null,sendCallback,1000);
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ callbackExecutor.shutdown();
+ assertThat(cc.get()).isEqualTo(6);
}
@Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+ when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
+ countDownLatch.countDown();
}
});
-
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ callbackExecutor.shutdown();
}
@Test
@@ -249,7 +308,7 @@ public class DefaultMQProducerTest {
@Test
public void testSetCallbackExecutor() throws MQClientException {
- String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+ String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
@@ -319,4 +378,4 @@ public class DefaultMQProducerTest {
}
return assertionErrors[0];
}
-}
\ No newline at end of file
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 2aea14cb9d6078b563a210efa285fd13436c4674..c0754db634d5a365db91d49c9d353879627d671a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -48,5 +48,7 @@ public interface RemotingClient extends RemotingService {
void setCallbackExecutor(final ExecutorService callbackExecutor);
+ ExecutorService getCallbackExecutor();
+
boolean isChannelWritable(final String addr);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 45ca7304c8f096f1628b2d3774974bb62ef094f0..8dccebc04575db85ddc70e8c41de1a2cdf8a93c2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -403,11 +403,17 @@ public abstract class NettyRemotingAbstract {
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
+ }
+
+ final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 241f2b07ad1abc292a75c3432376a22a678c71ce..33c2eed8de188e0b21a88d21aad68494460d3ccc 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -360,13 +360,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+ long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
+ RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
@@ -390,8 +395,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
- if (null == addr)
+ if (null == addr) {
return getAndCreateNameserverChannel();
+ }
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
@@ -431,8 +437,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
- if (channelNew != null)
+ if (channelNew != null) {
return channelNew;
+ }
}
}
} catch (Exception e) {
@@ -511,13 +518,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
- this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTooMuchRequestException("invokeAsync call timeout");
+ }
+ this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 04a3bebbb5791ba246994240f8191bd252028c27..6b5633df1d74a33c87dbceb3df270b2f1c580224 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -30,14 +30,10 @@ public class NettyRemotingClientTest {
private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());
@Test
- public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
- Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
- field.setAccessible(true);
- assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));
-
+ public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
-}
\ No newline at end of file
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
index 0bad6ea2bfcda8dc4f02d1ebb6f983782ca83db8..d1a1fd143e07072061cd7ec117b0e3a050ca7fed 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
@@ -61,7 +61,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, sendCallback);
}
- @Test(expected = java.lang.NullPointerException.class)
+ @Test
public void testSendMQNull() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
@@ -69,7 +69,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, messageQueue, SendCallBackFactory.getSendCallBack());
}
- @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
+ @Test
public void testSendSelectorNull() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
@@ -77,7 +77,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, selector, 100, SendCallBackFactory.getSendCallBack());
}
- @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
+ @Test
public void testSelectorThrowsException() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);