未验证 提交 997164b1 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1800 from Cczzzz/master

[ISSUE #1781]Fix asynchronous send retry
...@@ -519,9 +519,11 @@ public class MQClientAPIImpl { ...@@ -519,9 +519,11 @@ public class MQClientAPIImpl {
final SendMessageContext context, final SendMessageContext context,
final DefaultMQProducerImpl producer final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException { ) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override @Override
public void operationComplete(ResponseFuture responseFuture) { public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand(); RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) { if (null == sendCallback && response != null) {
...@@ -555,23 +557,23 @@ public class MQClientAPIImpl { ...@@ -555,23 +557,23 @@ public class MQClientAPIImpl {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) { } catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer); retryTimesWhenSendFailed, times, e, context, false, producer);
} }
} else { } else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) { if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) { } else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()); responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} else { } else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} }
} }
......
...@@ -188,7 +188,7 @@ public class RemotingCommand { ...@@ -188,7 +188,7 @@ public class RemotingCommand {
} }
public static int createNewRequestId() { public static int createNewRequestId() {
return requestId.incrementAndGet(); return requestId.getAndIncrement();
} }
public static SerializeType getSerializeTypeConfigInThisServer() { public static SerializeType getSerializeTypeConfigInThisServer() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册