diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 9380f4b73e2f549576151552caa384aae98fb2ee..c7d5a06dfd33abee62a83c535e1460ae02db7511 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -519,9 +519,11 @@ public class MQClientAPIImpl { final SendMessageContext context, final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { + final long beginStartTime = System.currentTimeMillis(); this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { + long cost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { @@ -555,23 +557,23 @@ public class MQClientAPIImpl { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index cadcab1b11e633db516330e2f46adabffd84cc35..51b619491b5f0e853428253fd50f713dac0d2780 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -188,7 +188,7 @@ public class RemotingCommand { } public static int createNewRequestId() { - return requestId.incrementAndGet(); + return requestId.getAndIncrement(); } public static SerializeType getSerializeTypeConfigInThisServer() {