diff --git a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java index 3535e2b34babce1aed2a53e6cf5919b9e0fc6072..6cba94f1a7545a3192b3cc0872ddb817abb132e8 100644 --- a/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java +++ b/oap-server/server-cluster-plugin/cluster-standalone-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/standalone/StandaloneManager.java @@ -35,6 +35,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister { @Override public void registerRemote(RemoteInstance remoteInstance) { this.remoteInstance = remoteInstance; + this.remoteInstance.setSelf(true); } @Override diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java index 0cfa3d5b34612870fd19f20183ea946153efc1af..81ea6739c491608680a653ec529d28ce432e129a 100644 --- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java +++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ClusterModuleZookeeperProvider.java @@ -22,7 +22,6 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.skywalking.oap.server.core.cluster.ClusterModule; @@ -77,23 +76,18 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { .watchInstances(true) .serializer(new SWInstanceSerializer()).build(); - String remoteName = "remote"; - ServiceCache serviceCache = serviceDiscovery.serviceCacheBuilder() - .name(remoteName) - .build(); try { client.start(); client.blockUntilConnected(); serviceDiscovery.start(); - - serviceCache.start(); } catch (Exception e) { logger.error(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e); } - this.registerServiceImplementation(ClusterRegister.class, new ZookeeperNodeRegister(serviceDiscovery, remoteName)); - this.registerServiceImplementation(ClusterNodesQuery.class, new ZookeeperModuleQuery(serviceCache)); + ZookeeperCoordinator coordinator = new ZookeeperCoordinator(serviceDiscovery); + this.registerServiceImplementation(ClusterRegister.class, coordinator); + this.registerServiceImplementation(ClusterNodesQuery.class, coordinator); } @Override public void start() { diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java similarity index 59% rename from oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java rename to oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java index ba6b518130b18aabe74ff90af5e62f16d7970a01..8cd8b595594715bffba5076b5b31de6b19c2ca88 100644 --- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperNodeRegister.java +++ b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperCoordinator.java @@ -18,9 +18,13 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery; import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException; @@ -30,21 +34,23 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class ZookeeperNodeRegister implements ClusterRegister { - private static final Logger logger = LoggerFactory.getLogger(ZookeeperNodeRegister.class); +public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class); private final ServiceDiscovery serviceDiscovery; - private final String nodeName; + private volatile ServiceCache serviceCache; + private volatile RemoteInstance selfInstance; - ZookeeperNodeRegister(ServiceDiscovery serviceDiscovery, String nodeName) { + ZookeeperCoordinator(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; - this.nodeName = nodeName; } @Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException { try { + String remoteNamePath = "remote"; + ServiceInstance thisInstance = ServiceInstance.builder() - .name(nodeName) + .name(remoteNamePath) .id(UUID.randomUUID().toString()) .address(remoteInstance.getHost()) .port(remoteInstance.getPort()) @@ -52,9 +58,32 @@ public class ZookeeperNodeRegister implements ClusterRegister { .build(); serviceDiscovery.registerService(thisInstance); + + serviceCache = serviceDiscovery.serviceCacheBuilder() + .name(remoteNamePath) + .build(); + + serviceCache.start(); + + this.selfInstance = remoteInstance; } catch (Exception e) { - logger.error(e.getMessage(), e); throw new ServiceRegisterException(e.getMessage()); } } + + @Override public List queryRemoteNodes() { + List> serviceInstances = serviceCache.getInstances(); + + List remoteInstanceDetails = new ArrayList<>(serviceInstances.size()); + serviceInstances.forEach(serviceInstance -> { + RemoteInstance instance = serviceInstance.getPayload(); + if (instance.equals(selfInstance)) { + instance.setSelf(true); + } else { + instance.setSelf(false); + } + remoteInstanceDetails.add(instance); + }); + return remoteInstanceDetails; + } } diff --git a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java b/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java deleted file mode 100644 index 7a41531a1879910aa848a5b0eac0bf16e32f24a7..0000000000000000000000000000000000000000 --- a/oap-server/server-cluster-plugin/cluster-zookeeper-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/zookeeper/ZookeeperModuleQuery.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; - -import java.util.*; -import org.apache.curator.x.discovery.ServiceCache; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.skywalking.oap.server.core.cluster.*; - -/** - * @author peng-yongsheng, Wu Sheng - */ -public class ZookeeperModuleQuery implements ClusterNodesQuery { - - private final ServiceCache serviceCache; - - ZookeeperModuleQuery(ServiceCache serviceCache) { - this.serviceCache = serviceCache; - } - - @Override - public List queryRemoteNodes() throws ServiceRegisterException { - List> serviceInstances = serviceCache.getInstances(); - - List remoteInstanceDetails = new ArrayList<>(serviceInstances.size()); - serviceInstances.forEach(serviceInstance -> remoteInstanceDetails.add(serviceInstance.getPayload())); - return remoteInstanceDetails; - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java index 14018e197beb52bb94e3c12ce456a986c5a6b491..2c8d761f2b1b1731374047339135c32b121eec01 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java @@ -18,6 +18,8 @@ package org.apache.skywalking.oap.server.core.cluster; +import java.util.Objects; + /** * @author peng-yongsheng */ @@ -25,6 +27,7 @@ public class RemoteInstance { private String host; private int port; + private boolean self = false; public String getHost() { return host; @@ -41,4 +44,27 @@ public class RemoteInstance { public void setPort(int port) { this.port = port; } + + public boolean isSelf() { + return self; + } + + public void setSelf(boolean self) { + this.self = self; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + RemoteInstance instance = (RemoteInstance)o; + return port == instance.port && + Objects.equals(host, instance.host); + } + + @Override public int hashCode() { + + return Objects.hash(host, port); + } }