提交 809668eb 编写于 作者: P peng-yongsheng

Add zookeeper implement of cluster module.

上级 8fba0fda
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author peng-yongsheng
*/
public abstract class ClusterException extends CollectorException {
public ClusterException(String message) {
super(message);
}
public ClusterException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster;
import java.util.HashSet;
import java.util.Set;
/**
* @author peng-yongsheng
*/
public abstract class ClusterModuleListener {
private Set<String> addresses;
public ClusterModuleListener() {
addresses = new HashSet<>();
}
public abstract String path();
public final void addAddress(String address) {
addresses.add(address);
}
public final void removeAddress(String address) {
addresses.remove(address);
}
public final Set<String> getAddresses() {
return addresses;
}
public abstract void serverJoinNotify(String serverAddress);
public abstract void serverQuitNotify(String serverAddress);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster;
/**
* @author peng-yongsheng
*/
public class ClusterNodeExistException extends ClusterException {
public ClusterNodeExistException(String message) {
super(message);
}
public ClusterNodeExistException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.ClientException;
/**
* @author peng-yongsheng
*/
public interface DataMonitor {
void setClient(Client client);
void addListener(ClusterModuleListener listener) throws ClientException;
void register(String path, ModuleRegistration registration) throws ClientException;
ClusterModuleListener getListener(String path);
void createPath(String path) throws ClientException;
void setData(String path, String value) throws ClientException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster;
/**
* @author peng-yongsheng
*/
public abstract class ModuleRegistration {
public abstract Value buildValue();
public static class Value {
private final String host;
private final int port;
private final String contextPath;
public Value(String host, int port, String contextPath) {
this.host = host;
this.port = port;
this.contextPath = contextPath;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getHostPort() {
return host + ":" + port;
}
public String getContextPath() {
return contextPath;
}
}
}
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster.service;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ModuleListenerService extends Service {
void addListener(ClusterModuleListener listener);
}
......@@ -18,10 +18,12 @@
package org.skywalking.apm.collector.cluster.service;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ModuleRegisterService extends Service {
void register(String moduleName, String providerName, ModuleRegistration registration);
}
......@@ -24,4 +24,8 @@ import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
* @author peng-yongsheng
*/
public class RedisModuleRegisterService implements ModuleRegisterService {
@Override public void register(String moduleName, String providerName, String address, String others) {
}
}
......@@ -24,4 +24,8 @@ import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
* @author peng-yongsheng
*/
public class StandaloneModuleRegisterService implements ModuleRegisterService {
@Override public void register(String moduleName, String providerName, String address, String others) {
}
}
......@@ -19,10 +19,16 @@
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Properties;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.service.ModuleRegistrationGetService;
import org.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleListenerService;
import org.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleRegisterService;
import org.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleRegistrationGetService;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -32,6 +38,11 @@ import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
*/
public class ClusterModuleZookeeperProvider extends ModuleProvider {
private static final String HOST_PORT = "hostPort";
private static final String SESSION_TIMEOUT = "sessionTimeout";
private ClusterZKDataMonitor dataMonitor;
@Override public String name() {
return "zookeeper";
}
......@@ -41,12 +52,24 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(ModuleRegisterService.class, new ZookeeperModuleRegisterService());
this.registerServiceImplementation(ModuleRegisterService.class, new ZookeeperModuleRegistrationGetService());
dataMonitor = new ClusterZKDataMonitor();
final String hostPort = config.getProperty(HOST_PORT);
final String sessionTimeout = config.getProperty(SESSION_TIMEOUT);
ZookeeperClient zookeeperClient = new ZookeeperClient(hostPort, Integer.valueOf(sessionTimeout), dataMonitor);
dataMonitor.setClient(zookeeperClient);
this.registerServiceImplementation(ModuleListenerService.class, new ZookeeperModuleListenerService(dataMonitor));
this.registerServiceImplementation(ModuleRegisterService.class, new ZookeeperModuleRegisterService(dataMonitor));
this.registerServiceImplementation(ModuleRegistrationGetService.class, new ZookeeperModuleRegistrationGetService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
try {
dataMonitor.start();
} catch (CollectorException e) {
throw new UnexpectedException(e.getMessage());
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.cluster.ClusterNodeExistException;
import org.skywalking.apm.collector.cluster.DataMonitor;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ClusterZKDataMonitor implements DataMonitor, Watcher {
private final Logger logger = LoggerFactory.getLogger(ClusterZKDataMonitor.class);
private ZookeeperClient client;
private Map<String, ClusterModuleListener> listeners;
private Map<String, ModuleRegistration> registrations;
public ClusterZKDataMonitor() {
listeners = new LinkedHashMap<>();
registrations = new LinkedHashMap<>();
}
@Override public synchronized void process(WatchedEvent event) {
logger.info("changed path {}, event type: {}", event.getPath(), event.getType().name());
if (listeners.containsKey(event.getPath())) {
List<String> paths;
try {
paths = client.getChildren(event.getPath(), true);
ClusterModuleListener listener = listeners.get(event.getPath());
Set<String> remoteNodes = new HashSet<>();
Set<String> notifiedNodes = listener.getAddresses();
if (CollectionUtils.isNotEmpty(paths)) {
for (String serverPath : paths) {
Stat stat = new Stat();
byte[] data = client.getData(event.getPath() + "/" + serverPath, true, stat);
String dataStr = new String(data);
String addressValue = serverPath + dataStr;
remoteNodes.add(addressValue);
if (!notifiedNodes.contains(addressValue)) {
logger.info("path children has been created, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listener.addAddress(addressValue);
listener.serverJoinNotify(addressValue);
}
}
}
String[] notifiedNodeArray = notifiedNodes.toArray(new String[notifiedNodes.size()]);
for (int i = notifiedNodeArray.length - 1; i >= 0; i--) {
String address = notifiedNodeArray[i];
if (remoteNodes.isEmpty() || !remoteNodes.contains(address)) {
logger.info("path children has been remove, path and data: {}", event.getPath() + "/" + address);
listener.removeAddress(address);
listener.serverQuitNotify(address);
}
}
} catch (ZookeeperClientException e) {
logger.error(e.getMessage(), e);
}
}
}
@Override public void setClient(Client client) {
this.client = (ZookeeperClient)client;
}
public void start() throws CollectorException {
Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator();
while (entryIterator.hasNext()) {
Map.Entry<String, ModuleRegistration> next = entryIterator.next();
createPath(next.getKey());
ModuleRegistration.Value value = next.getValue().buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(next.getKey(), true);
String serverPath = next.getKey() + "/" + value.getHostPort();
Stat stat = client.exists(serverPath, false);
if (stat != null) {
client.delete(serverPath, stat.getVersion());
}
stat = client.exists(serverPath, false);
if (stat == null) {
setData(serverPath, contextPath);
} else {
client.delete(serverPath, stat.getVersion());
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
}
}
@Override public void addListener(ClusterModuleListener listener) {
String path = PathUtils.convertKey2Path(listener.path());
logger.info("listener path: {}", path);
listeners.put(path, listener);
}
@Override public void register(String path, ModuleRegistration registration) {
registrations.put(path, registration);
}
@Override public ClusterModuleListener getListener(String path) {
path = PathUtils.convertKey2Path(path);
return listeners.get(path);
}
@Override public void createPath(String path) throws ClientException {
String[] paths = path.replaceFirst("/", "").split("/");
StringBuilder pathBuilder = new StringBuilder();
for (String subPath : paths) {
pathBuilder.append("/").append(subPath);
if (client.exists(pathBuilder.toString(), false) == null) {
client.create(pathBuilder.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
@Override public void setData(String path, String value) throws ClientException {
if (client.exists(path, false) == null) {
client.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
client.setData(path, value.getBytes(), -1);
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster.zookeeper.service;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.zookeeper.ClusterZKDataMonitor;
/**
* @author peng-yongsheng
*/
public class ZookeeperModuleListenerService implements ModuleListenerService {
private final ClusterZKDataMonitor dataMonitor;
public ZookeeperModuleListenerService(ClusterZKDataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
@Override public void addListener(ClusterModuleListener listener) {
dataMonitor.addListener(listener);
}
}
......@@ -18,10 +18,23 @@
package org.skywalking.apm.collector.cluster.zookeeper.service;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.zookeeper.ClusterZKDataMonitor;
/**
* @author peng-yongsheng
*/
public class ZookeeperModuleRegisterService implements ModuleRegisterService {
private final ClusterZKDataMonitor dataMonitor;
public ZookeeperModuleRegisterService(ClusterZKDataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
@Override public void register(String moduleName, String providerName, ModuleRegistration registration) {
String path = "/" + moduleName + "/" + providerName;
dataMonitor.register(path, registration);
}
}
......@@ -24,5 +24,10 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册