未验证 提交 36041006 编写于 作者: L lizhiboo 提交者: GitHub

[ISSUE #3314] Make mqClientApi request timeout settable

上级 564ee290
...@@ -60,6 +60,8 @@ public class ClientConfig { ...@@ -60,6 +60,8 @@ public class ClientConfig {
private boolean useTLS = TlsSystemConfig.tlsEnable; private boolean useTLS = TlsSystemConfig.tlsEnable;
private int mqClientApiTimeout = 3 * 1000;
private LanguageCode language = LanguageCode.JAVA; private LanguageCode language = LanguageCode.JAVA;
public String buildMQClientId() { public String buildMQClientId() {
...@@ -298,6 +300,13 @@ public class ClientConfig { ...@@ -298,6 +300,13 @@ public class ClientConfig {
this.accessChannel = accessChannel; this.accessChannel = accessChannel;
} }
public int getMqClientApiTimeout() {
return mqClientApiTimeout;
}
public void setMqClientApiTimeout(int mqClientApiTimeout) {
this.mqClientApiTimeout = mqClientApiTimeout;
}
@Override @Override
public String toString() { public String toString() {
...@@ -305,6 +314,6 @@ public class ClientConfig { ...@@ -305,6 +314,6 @@ public class ClientConfig {
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]"; + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
} }
} }
...@@ -447,7 +447,7 @@ public class MQClientInstance { ...@@ -447,7 +447,7 @@ public class MQClientInstance {
if (addr != null) { if (addr != null) {
try { try {
this.getMQClientAPIImpl().checkClientInBroker( this.getMQClientAPIImpl().checkClientInBroker(
addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000 addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout()
); );
} catch (Exception e) { } catch (Exception e) {
if (e instanceof MQClientException) { if (e instanceof MQClientException) {
...@@ -554,7 +554,7 @@ public class MQClientInstance { ...@@ -554,7 +554,7 @@ public class MQClientInstance {
} }
try { try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) { if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
} }
...@@ -610,7 +610,7 @@ public class MQClientInstance { ...@@ -610,7 +610,7 @@ public class MQClientInstance {
TopicRouteData topicRouteData; TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) { if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3); clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) { if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) { for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
...@@ -619,7 +619,7 @@ public class MQClientInstance { ...@@ -619,7 +619,7 @@ public class MQClientInstance {
} }
} }
} else { } else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
} }
if (topicRouteData != null) { if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic); TopicRouteData old = this.topicRouteTable.get(topic);
...@@ -894,7 +894,7 @@ public class MQClientInstance { ...@@ -894,7 +894,7 @@ public class MQClientInstance {
String addr = entry1.getValue(); String addr = entry1.getValue();
if (addr != null) { if (addr != null) {
try { try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) { } catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e); log.error("unregister client exception from broker: " + addr, e);
...@@ -1064,7 +1064,7 @@ public class MQClientInstance { ...@@ -1064,7 +1064,7 @@ public class MQClientInstance {
if (null != brokerAddr) { if (null != brokerAddr) {
try { try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
} catch (Exception e) { } catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册