diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java index 7162adf97d42682aa9c879b7a8f401778f3ffe3e..b59482325410c9199abdd955132d2205e0ea255f 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java @@ -4,6 +4,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.HashSet; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -46,22 +48,30 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { List paths; try { paths = client.getChildren(event.getPath(), true); + ClusterDataListener listener = listeners.get(event.getPath()); + Set remoteNodes = new HashSet(); + Set notifiedNodes = listener.getAddresses(); if (CollectionUtils.isNotEmpty(paths)) { for (String serverPath : paths) { Stat stat = new Stat(); byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat); String dataStr = new String(data); - if (stat.getCzxid() == stat.getMzxid()) { + String addressValue = serverPath + dataStr; + remoteNodes.add(addressValue); + if (!notifiedNodes.contains(addressValue)) { logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); - listeners.get(event.getPath()).addAddress(serverPath + dataStr); - listeners.get(event.getPath()).serverJoinNotify(serverPath + dataStr); - } else { - logger.info("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr); - listeners.get(event.getPath()).removeAddress(serverPath + dataStr); - listeners.get(event.getPath()).serverQuitNotify(serverPath + dataStr); + listener.addAddress(addressValue); + listener.serverJoinNotify(addressValue); } } } + for (String address : notifiedNodes) { + if (remoteNodes.isEmpty() || !remoteNodes.contains(address)) { + logger.info("path children has been changed, path and data: {}", event.getPath() + "/" + address); + listener.removeAddress(address); + listener.serverQuitNotify(address); + } + } } catch (ZookeeperClientException e) { logger.error(e.getMessage(), e); }