提交 a94b5428 编写于 作者: wu-sheng's avatar wu-sheng

Adjust zk cluster management.

上级 eabf9a7d
......@@ -35,6 +35,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
@Override public void registerRemote(RemoteInstance remoteInstance) {
this.remoteInstance = remoteInstance;
this.remoteInstance.setSelf(true);
}
@Override
......
......@@ -22,7 +22,6 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
......@@ -77,23 +76,18 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
.watchInstances(true)
.serializer(new SWInstanceSerializer()).build();
String remoteName = "remote";
ServiceCache<RemoteInstance> serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(remoteName)
.build();
try {
client.start();
client.blockUntilConnected();
serviceDiscovery.start();
serviceCache.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
}
this.registerServiceImplementation(ClusterRegister.class, new ZookeeperNodeRegister(serviceDiscovery, remoteName));
this.registerServiceImplementation(ClusterNodesQuery.class, new ZookeeperModuleQuery(serviceCache));
ZookeeperCoordinator coordinator = new ZookeeperCoordinator(serviceDiscovery);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override public void start() {
......
......@@ -18,9 +18,13 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
......@@ -30,21 +34,23 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ZookeeperNodeRegister implements ClusterRegister {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperNodeRegister.class);
public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private final String nodeName;
private volatile ServiceCache<RemoteInstance> serviceCache;
private volatile RemoteInstance selfInstance;
ZookeeperNodeRegister(ServiceDiscovery<RemoteInstance> serviceDiscovery, String nodeName) {
ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
this.nodeName = nodeName;
}
@Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
try {
String remoteNamePath = "remote";
ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
.name(nodeName)
.name(remoteNamePath)
.id(UUID.randomUUID().toString())
.address(remoteInstance.getHost())
.port(remoteInstance.getPort())
......@@ -52,9 +58,32 @@ public class ZookeeperNodeRegister implements ClusterRegister {
.build();
serviceDiscovery.registerService(thisInstance);
serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(remoteNamePath)
.build();
serviceCache.start();
this.selfInstance = remoteInstance;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ServiceRegisterException(e.getMessage());
}
}
@Override public List<RemoteInstance> queryRemoteNodes() {
List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
if (instance.equals(selfInstance)) {
instance.setSelf(true);
} else {
instance.setSelf(false);
}
remoteInstanceDetails.add(instance);
});
return remoteInstanceDetails;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.*;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.*;
/**
* @author peng-yongsheng, Wu Sheng
*/
public class ZookeeperModuleQuery implements ClusterNodesQuery {
private final ServiceCache<RemoteInstance> serviceCache;
ZookeeperModuleQuery(ServiceCache<RemoteInstance> serviceCache) {
this.serviceCache = serviceCache;
}
@Override
public List<RemoteInstance> queryRemoteNodes() throws ServiceRegisterException {
List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
serviceInstances.forEach(serviceInstance -> remoteInstanceDetails.add(serviceInstance.getPayload()));
return remoteInstanceDetails;
}
}
......@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.core.cluster;
import java.util.Objects;
/**
* @author peng-yongsheng
*/
......@@ -25,6 +27,7 @@ public class RemoteInstance {
private String host;
private int port;
private boolean self = false;
public String getHost() {
return host;
......@@ -41,4 +44,27 @@ public class RemoteInstance {
public void setPort(int port) {
this.port = port;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
RemoteInstance instance = (RemoteInstance)o;
return port == instance.port &&
Objects.equals(host, instance.host);
}
@Override public int hashCode() {
return Objects.hash(host, port);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册