From 40710b071b6815d47ea2b41caf1a3a116442d511 Mon Sep 17 00:00:00 2001 From: Cczzzz <15641680695@163.com> Date: Thu, 20 Feb 2020 18:39:37 +0800 Subject: [PATCH] Fix asynchronous send retry --- .../rocketmq/client/impl/MQClientAPIImpl.java | 21 +++++++++++-------- .../remoting/protocol/RemotingCommand.java | 2 +- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b743af93..52c39923 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -302,8 +302,8 @@ public class MQClientAPIImpl { requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); - requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),",")); - requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),",")); + requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ",")); + requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ",")); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); @@ -342,7 +342,7 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis) + public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader(); @@ -364,7 +364,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, + public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr, + final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null); @@ -387,7 +388,7 @@ public class MQClientAPIImpl { } throw new MQBrokerException(response.getCode(), response.getRemark()); - + } public SendResult sendMessage( @@ -481,9 +482,11 @@ public class MQClientAPIImpl { final SendMessageContext context, final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { + final long beginStartTime = System.currentTimeMillis(); this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { + long cost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { @@ -517,23 +520,23 @@ public class MQClientAPIImpl { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index cadcab1b..51b61949 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -188,7 +188,7 @@ public class RemotingCommand { } public static int createNewRequestId() { - return requestId.incrementAndGet(); + return requestId.getAndIncrement(); } public static SerializeType getSerializeTypeConfigInThisServer() { -- GitLab