提交 40710b07 编写于 作者: C Cczzzz

Fix asynchronous send retry

上级 034bebc1
...@@ -302,8 +302,8 @@ public class MQClientAPIImpl { ...@@ -302,8 +302,8 @@ public class MQClientAPIImpl {
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm()); requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm()); requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),",")); requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),",")); requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
...@@ -342,7 +342,7 @@ public class MQClientAPIImpl { ...@@ -342,7 +342,7 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark()); throw new MQClientException(response.getCode(), response.getRemark());
} }
public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis) public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader(); UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
...@@ -364,7 +364,8 @@ public class MQClientAPIImpl { ...@@ -364,7 +364,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark()); throw new MQClientException(response.getCode(), response.getRemark());
} }
public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException, public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
...@@ -387,7 +388,7 @@ public class MQClientAPIImpl { ...@@ -387,7 +388,7 @@ public class MQClientAPIImpl {
} }
throw new MQBrokerException(response.getCode(), response.getRemark()); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
public SendResult sendMessage( public SendResult sendMessage(
...@@ -481,9 +482,11 @@ public class MQClientAPIImpl { ...@@ -481,9 +482,11 @@ public class MQClientAPIImpl {
final SendMessageContext context, final SendMessageContext context,
final DefaultMQProducerImpl producer final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException { ) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override @Override
public void operationComplete(ResponseFuture responseFuture) { public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand(); RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) { if (null == sendCallback && response != null) {
...@@ -517,23 +520,23 @@ public class MQClientAPIImpl { ...@@ -517,23 +520,23 @@ public class MQClientAPIImpl {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) { } catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer); retryTimesWhenSendFailed, times, e, context, false, producer);
} }
} else { } else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) { if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) { } else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()); responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} else { } else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer); retryTimesWhenSendFailed, times, ex, context, true, producer);
} }
} }
......
...@@ -188,7 +188,7 @@ public class RemotingCommand { ...@@ -188,7 +188,7 @@ public class RemotingCommand {
} }
public static int createNewRequestId() { public static int createNewRequestId() {
return requestId.incrementAndGet(); return requestId.getAndIncrement();
} }
public static SerializeType getSerializeTypeConfigInThisServer() { public static SerializeType getSerializeTypeConfigInThisServer() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册