From 04c8925d6da77a41cef746a9c6478a407c4c9edd Mon Sep 17 00:00:00 2001 From: Jaskey Date: Sat, 27 May 2017 12:38:00 +0800 Subject: [PATCH] [ROCKETMQ-160]SendHeartBeat may not be logged in the expected period closes apache/incubator-rocketmq#86 --- .../client/impl/factory/MQClientInstance.java | 66 ++++++++++--------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index a8c65b29..1b075ee1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -112,7 +112,7 @@ public class MQClientInstance { private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; private final ConsumerStatsManager consumerStatsManager; - private final AtomicLong storeTimesTotal = new AtomicLong(0); + private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0); private ServiceState serviceState = ServiceState.CREATE_JUST; private DatagramSocket datagramSocket; private Random random = new Random(); @@ -517,38 +517,40 @@ public class MQClientInstance { return; } - long times = this.storeTimesTotal.getAndIncrement(); - Iterator>> it = this.brokerAddrTable.entrySet().iterator(); - while (it.hasNext()) { - Entry> entry = it.next(); - String brokerName = entry.getKey(); - HashMap oneTable = entry.getValue(); - if (oneTable != null) { - for (Map.Entry entry1 : oneTable.entrySet()) { - Long id = entry1.getKey(); - String addr = entry1.getValue(); - if (addr != null) { - if (consumerEmpty) { - if (id != MixAll.MASTER_ID) - continue; - } - - try { - int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); - if (!this.brokerVersionTable.containsKey(brokerName)) { - this.brokerVersionTable.put(brokerName, new HashMap(4)); - } - this.brokerVersionTable.get(brokerName).put(addr, version); - if (times % 20 == 0) { - log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); - log.info(heartbeatData.toString()); + if (!this.brokerAddrTable.isEmpty()) { + long times = this.sendHeartbeatTimesTotal.getAndIncrement(); + Iterator>> it = this.brokerAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry> entry = it.next(); + String brokerName = entry.getKey(); + HashMap oneTable = entry.getValue(); + if (oneTable != null) { + for (Map.Entry entry1 : oneTable.entrySet()) { + Long id = entry1.getKey(); + String addr = entry1.getValue(); + if (addr != null) { + if (consumerEmpty) { + if (id != MixAll.MASTER_ID) + continue; } - } catch (Exception e) { - if (this.isBrokerInNameServer(addr)) { - log.error("send heart beat to broker exception", e); - } else { - log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, - id, addr); + + try { + int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); + if (!this.brokerVersionTable.containsKey(brokerName)) { + this.brokerVersionTable.put(brokerName, new HashMap(4)); + } + this.brokerVersionTable.get(brokerName).put(addr, version); + if (times % 20 == 0) { + log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); + log.info(heartbeatData.toString()); + } + } catch (Exception e) { + if (this.isBrokerInNameServer(addr)) { + log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); + } else { + log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, + id, addr); + } } } } -- GitLab