From 36041006aaa92867077849f0dc060d4da295712e Mon Sep 17 00:00:00 2001 From: lizhiboo Date: Sun, 12 Sep 2021 16:24:37 +0800 Subject: [PATCH] [ISSUE #3314] Make mqClientApi request timeout settable --- .../org/apache/rocketmq/client/ClientConfig.java | 11 ++++++++++- .../client/impl/factory/MQClientInstance.java | 12 ++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index beeeb2f5..b2c043ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -60,6 +60,8 @@ public class ClientConfig { private boolean useTLS = TlsSystemConfig.tlsEnable; + private int mqClientApiTimeout = 3 * 1000; + private LanguageCode language = LanguageCode.JAVA; public String buildMQClientId() { @@ -298,6 +300,13 @@ public class ClientConfig { this.accessChannel = accessChannel; } + public int getMqClientApiTimeout() { + return mqClientApiTimeout; + } + + public void setMqClientApiTimeout(int mqClientApiTimeout) { + this.mqClientApiTimeout = mqClientApiTimeout; + } @Override public String toString() { @@ -305,6 +314,6 @@ public class ClientConfig { + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]"; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index d30534ff..e897d495 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -447,7 +447,7 @@ public class MQClientInstance { if (addr != null) { try { this.getMQClientAPIImpl().checkClientInBroker( - addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000 + addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout() ); } catch (Exception e) { if (e instanceof MQClientException) { @@ -554,7 +554,7 @@ public class MQClientInstance { } try { - int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); + int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout()); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap(4)); } @@ -610,7 +610,7 @@ public class MQClientInstance { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), - 1000 * 3); + clientConfig.getMqClientApiTimeout()); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); @@ -619,7 +619,7 @@ public class MQClientInstance { } } } else { - topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); + topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); @@ -894,7 +894,7 @@ public class MQClientInstance { String addr = entry1.getValue(); if (addr != null) { try { - this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); + this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout()); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); } catch (RemotingException e) { log.error("unregister client exception from broker: " + addr, e); @@ -1064,7 +1064,7 @@ public class MQClientInstance { if (null != brokerAddr) { try { - return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); + return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout()); } catch (Exception e) { log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); } -- GitLab