提交 bc0c04bf 编写于 作者: W what-a-good-jungle 提交者: von gosling

[ROCKETMQ-355][POLISH]Async send method polish - fix the timeout semantic (#318)

上级 ed433a3c
...@@ -308,6 +308,7 @@ public class MQClientAPIImpl { ...@@ -308,6 +308,7 @@ public class MQClientAPIImpl {
final SendMessageContext context, final SendMessageContext context,
final DefaultMQProducerImpl producer final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null; RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) { if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
...@@ -324,11 +325,19 @@ public class MQClientAPIImpl { ...@@ -324,11 +325,19 @@ public class MQClientAPIImpl {
return null; return null;
case ASYNC: case ASYNC:
final AtomicInteger times = new AtomicInteger(); final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer); retryTimesWhenSendFailed, times, context, producer);
return null; return null;
case SYNC: case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default: default:
assert false; assert false;
break; break;
......
...@@ -64,6 +64,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -64,6 +64,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
...@@ -418,15 +420,44 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -418,15 +420,44 @@ public class DefaultMQProducerImpl implements MQProducerInner {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
public void send(Message msg, SendCallback sendCallback, long timeout) /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws RejectedExecutionException
*/
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
try { try {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); executor.submit(new Runnable() {
} catch (MQBrokerException e) { @Override
throw new MQClientException("unknownn exception", e); public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
} }
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
} }
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
} }
...@@ -450,6 +481,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -450,6 +481,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long endTimestamp = beginTimestampFirst; long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) { if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null; MessageQueue mq = null;
Exception exception = null; Exception exception = null;
SendResult sendResult = null; SendResult sendResult = null;
...@@ -464,7 +496,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -464,7 +496,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
brokersSent[times] = mq.getBrokerName(); brokersSent[times] = mq.getBrokerName();
try { try {
beginTimestampPrev = System.currentTimeMillis(); beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) { switch (communicationMode) {
...@@ -546,6 +584,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -546,6 +584,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception); MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) { if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) { } else if (exception instanceof RemotingConnectException) {
...@@ -592,6 +634,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -592,6 +634,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final SendCallback sendCallback, final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo, final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) { if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic()); tryToFindTopicPublishInfo(mq.getTopic());
...@@ -691,13 +734,16 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -691,13 +734,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
tmpMessage = MessageAccessor.cloneMessage(msg); tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody); msg.setBody(prevBody);
} }
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, brokerAddr,
mq.getBrokerName(), mq.getBrokerName(),
tmpMessage, tmpMessage,
requestHeader, requestHeader,
timeout, timeout - costTimeAsync,
communicationMode, communicationMode,
sendCallback, sendCallback,
topicPublishInfo, topicPublishInfo,
...@@ -708,12 +754,16 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -708,12 +754,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break; break;
case ONEWAY: case ONEWAY:
case SYNC: case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, brokerAddr,
mq.getBrokerName(), mq.getBrokerName(),
msg, msg,
requestHeader, requestHeader,
timeout, timeout - costTimeSync,
communicationMode, communicationMode,
context, context,
this); this);
...@@ -844,6 +894,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -844,6 +894,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public SendResult send(Message msg, MessageQueue mq, long timeout) public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
...@@ -851,6 +902,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -851,6 +902,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new MQClientException("message's topic not equal mq's topic", null); throw new MQClientException("message's topic not equal mq's topic", null);
} }
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("call timeout");
}
return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout); return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
} }
...@@ -862,20 +918,55 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -862,20 +918,55 @@ public class DefaultMQProducerImpl implements MQProducerInner {
send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* @param msg
* @param mq
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK(); final long beginStartTime = System.currentTimeMillis();
Validators.checkMessage(msg, this.defaultMQProducer); ExecutorService executor = this.getCallbackExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
makeSureStateOK();
Validators.checkMessage(msg, defaultMQProducer);
if (!msg.getTopic().equals(mq.getTopic())) { if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null); throw new MQClientException("message's topic not equal mq's topic", null);
} }
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try { try {
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
timeout - costTime);
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e); throw new MQClientException("unknown exception", e);
} }
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
}
} catch (Exception e) {
sendCallback.onException(e);
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
} }
/** /**
...@@ -913,6 +1004,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -913,6 +1004,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
...@@ -925,8 +1017,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -925,8 +1017,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new MQClientException("select message queue throwed exception.", e); throw new MQClientException("select message queue throwed exception.", e);
} }
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) { if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout); return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else { } else {
throw new MQClientException("select message queue return null.", null); throw new MQClientException("select message queue return null.", null);
} }
...@@ -943,13 +1039,48 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -943,13 +1039,48 @@ public class DefaultMQProducerImpl implements MQProducerInner {
send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
} }
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) /**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* @param msg
* @param selector
* @param arg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try { try {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
timeout - costTime);
} catch (MQBrokerException e) { } catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e); throw new MQClientException("unknownn exception", e);
} }
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("exector rejected ", e);
}
} }
/** /**
...@@ -1082,6 +1213,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1082,6 +1213,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) { public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
} }
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
}
public SendResult send(Message msg, public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
......
...@@ -24,6 +24,9 @@ import java.util.List; ...@@ -24,6 +24,9 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; ...@@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
...@@ -65,6 +69,8 @@ public class DefaultMQProducerTest { ...@@ -65,6 +69,8 @@ public class DefaultMQProducerTest {
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
@Mock
private NettyRemotingClient nettyRemotingClient;
private DefaultMQProducer producer; private DefaultMQProducer producer;
private Message message; private Message message;
...@@ -161,38 +167,91 @@ public class DefaultMQProducerTest { ...@@ -161,38 +167,91 @@ public class DefaultMQProducerTest {
@Test @Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(message, new SendCallback() { producer.send(message, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L); assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
countDownLatch.countDown();
} }
@Override @Override
public void onException(Throwable e) { public void onException(Throwable e) {
countDownLatch.countDown();
} }
}); });
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
}
@Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
cc.incrementAndGet();
countDownLatch.countDown();
}
};
MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
};
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
producer.send(new Message(),sendCallback);
producer.send(message,sendCallback,1000);
producer.send(message,new MessageQueue(),sendCallback);
producer.send(new Message(),new MessageQueue(),sendCallback,1000);
producer.send(new Message(),messageQueueSelector,null,sendCallback);
producer.send(message,messageQueueSelector,null,sendCallback,1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
assertThat(cc.get()).isEqualTo(6);
} }
@Test @Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(bigMessage, new SendCallback() { producer.send(bigMessage, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
assertThat(sendResult.getQueueOffset()).isEqualTo(456L); assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
countDownLatch.countDown();
} }
@Override @Override
public void onException(Throwable e) { public void onException(Throwable e) {
countDownLatch.countDown();
} }
}); });
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
} }
@Test @Test
...@@ -249,7 +308,7 @@ public class DefaultMQProducerTest { ...@@ -249,7 +308,7 @@ public class DefaultMQProducerTest {
@Test @Test
public void testSetCallbackExecutor() throws MQClientException { public void testSetCallbackExecutor() throws MQClientException {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp); producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876"); producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); producer.start();
......
...@@ -48,5 +48,7 @@ public interface RemotingClient extends RemotingService { ...@@ -48,5 +48,7 @@ public interface RemotingClient extends RemotingService {
void setCallbackExecutor(final ExecutorService callbackExecutor); void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr); boolean isChannelWritable(final String addr);
} }
...@@ -403,11 +403,17 @@ public abstract class NettyRemotingAbstract { ...@@ -403,11 +403,17 @@ public abstract class NettyRemotingAbstract {
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque(); final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) { if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once); long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture); this.responseTable.put(opaque, responseFuture);
try { try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
......
...@@ -360,13 +360,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -360,13 +360,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override @Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr); final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) { if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request); this.rpcHook.doBeforeRequest(addr, request);
} }
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) { if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
} }
...@@ -390,8 +395,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -390,8 +395,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
} }
private Channel getAndCreateChannel(final String addr) throws InterruptedException { private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) if (null == addr) {
return getAndCreateNameserverChannel(); return getAndCreateNameserverChannel();
}
ChannelWrapper cw = this.channelTables.get(addr); ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) { if (cw != null && cw.isOK()) {
...@@ -431,10 +437,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -431,10 +437,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.namesrvAddrChoosed.set(newAddr); this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr); Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) if (channelNew != null) {
return channelNew; return channelNew;
} }
} }
}
} catch (Exception e) { } catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e); log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally { } finally {
...@@ -511,13 +518,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -511,13 +518,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException { RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr); final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) { if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request); this.rpcHook.doBeforeRequest(addr, request);
} }
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel); this.closeChannel(addr, channel);
......
...@@ -31,10 +31,6 @@ public class NettyRemotingClientTest { ...@@ -31,10 +31,6 @@ public class NettyRemotingClientTest {
@Test @Test
public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
field.setAccessible(true);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));
ExecutorService customized = Executors.newCachedThreadPool(); ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized); remotingClient.setCallbackExecutor(customized);
......
...@@ -61,7 +61,7 @@ public class AsyncSendExceptionIT extends BaseConf { ...@@ -61,7 +61,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, sendCallback); producer.send(msg, sendCallback);
} }
@Test(expected = java.lang.NullPointerException.class) @Test
public void testSendMQNull() throws Exception { public void testSendMQNull() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
...@@ -69,7 +69,7 @@ public class AsyncSendExceptionIT extends BaseConf { ...@@ -69,7 +69,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, messageQueue, SendCallBackFactory.getSendCallBack()); producer.send(msg, messageQueue, SendCallBackFactory.getSendCallBack());
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test
public void testSendSelectorNull() throws Exception { public void testSendSelectorNull() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
...@@ -77,7 +77,7 @@ public class AsyncSendExceptionIT extends BaseConf { ...@@ -77,7 +77,7 @@ public class AsyncSendExceptionIT extends BaseConf {
producer.send(msg, selector, 100, SendCallBackFactory.getSendCallBack()); producer.send(msg, selector, 100, SendCallBackFactory.getSendCallBack());
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test
public void testSelectorThrowsException() throws Exception { public void testSelectorThrowsException() throws Exception {
Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册