提交 f723250d 编写于 作者: A ascrutae

完成注册中心功能

上级 7eb53791
......@@ -33,5 +33,22 @@
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
</project>
package com.a.eye.skywalking.registry.api;
/**
* Created by xin on 2016/11/10.
*/
public enum EventType {
Add,
Remove;
}
package com.a.eye.skywalking.registry.api;
import java.util.List;
public interface NotifyListener {
void notify(EventType type, String url);
void notify(List<String> currentUrls);
}
package com.a.eye.skywalking.registry.impl.zookeeper;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.registry.api.*;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import com.a.eye.skywalking.registry.api.Center;
import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.NotifyListener;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@Center(type = CenterType.DEFAULT_CENTER_TYPE)
public class ZookeeperRegistryCenter implements RegistryCenter {
private ILog logger = LogManager.getLogger(ZookeeperRegistryCenter.class);
public ZooKeeper client;
private ZkClient client;
@Override
public void register(String path) {
String createPath = path;
if (path.charAt(0) != '/') {
createPath = "/" + createPath;
}
mkdirs(createPath, true);
}
private void mkdirs(String path, boolean bool) {
try {
String[] pathArray = path.split("/");
if (pathArray.length == 0) {
return;
}
String[] pathSegment = path.split("/");
StringBuilder createPath = new StringBuilder();
for (int i = 0; i < pathSegment.length - 1; i++) {
if (pathSegment[i] == null || pathSegment[i].length() == 0)
continue;
StringBuilder currentCreatePath = new StringBuilder();
for (int i = 0; i < pathArray.length - 1; i++) {
String pathSegment = pathArray[i];
if (pathSegment.length() == 0) {
continue;
}
currentCreatePath.append("/").append(pathSegment);
if (client.exists(currentCreatePath.toString(), false) == null) {
client.create(currentCreatePath.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
if (bool) {
client.create(currentCreatePath.append("/").append(pathArray[pathArray.length - 1]).toString(), null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else {
client.create(currentCreatePath.append("/").append(pathArray[pathArray.length - 1]).toString(), null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createPath.append("/" + pathSegment[i]);
if (!exists(createPath.toString())) {
client.createPersistent(createPath.toString());
}
logger.info("register path[{}] success", path);
} catch (Exception e) {
logger.error("Failed to create path[{}]", path, e);
}
}
@Override
public void subscribe(final String path, final NotifyListener listener) {
try {
if (client.exists(path, false) == null) {
logger.warn("{} was not exists. ");
mkdirs(path, false);
}
client.getChildren(path, new SubscribeWatcher(path, listener), new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
for (String child : children) {
listener.notify(EventType.Add, child);
}
}
}, null);
} catch (Exception e) {
logger.error("Failed to subscribe the path {} ", path, e);
}
client.createEphemeral(createPath.append("/" + pathSegment[pathSegment.length - 1]).toString());
}
@Override
public void start(final Properties centerConfig) {
final ZookeeperConfig config = new ZookeeperConfig(centerConfig);
try {
client = new ZooKeeper(config.getConnectURL(), 60 * 1000, new ConnectWatcher(config));
if (config.hasAuthInfo()) {
client.addAuthInfo(config.getAutSchema(), config.getAuth());
public void subscribe(String path, final NotifyListener listener) {
client.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> children) throws Exception {
listener.notify(children);
}
} catch (IOException e) {
logger.error("Failed to create zookeeper registry center [{}]", config.getConnectURL(), e);
}
});
}
private class RetryConnected implements Runnable {
private ZookeeperConfig config;
public RetryConnected(ZookeeperConfig config) {
this.config = config;
}
@Override
public void run() {
while (true) {
try {
client = new ZooKeeper(config.getConnectURL(), 60 * 1000, new ConnectWatcher(config));
} catch (Exception e) {
logger.error("failed to connect zookeeper", e);
}
if (client.getState() == ZooKeeper.States.CONNECTED) {
logger.info("connected successfully!");
break;
}
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
logger.error("Failed to sleep.", e);
}
}
}
}
private class ConnectWatcher implements Watcher {
private ZookeeperConfig config;
public ConnectWatcher(ZookeeperConfig config) {
this.config = config;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {
logger.warn("failed to auth.auth url: {} auth schema:{} auth info:{}", config.getConnectURL(),
config.getAutSchema(), new String(config.getAuth()));
}
if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
logger.warn("Disconnected from zookeeper. retry connecting...");
new Thread(new RetryConnected(config)).start();
}
}
private boolean exists(String path) {
return client.exists(path);
}
private class SubscribeWatcher implements Watcher {
private String path;
private NotifyListener listener;
private List<String> previousChildPath;
public SubscribeWatcher(String path, NotifyListener listener) {
this.path = path;
this.listener = listener;
previousChildPath = new ArrayList<String>();
}
@Override
public void process(WatchedEvent event) {
try {
client.getChildren(path, this);
client.getChildren(path, false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
System.out.println("aaaa");
}
}, null);
}catch (Exception e){
}
if (event.getType() == Event.EventType.NodeChildrenChanged) {
notifyListener(event);
}
}
private void notifyListener(WatchedEvent event) {
try {
List<String> tmpChildrenPath = client.getChildren(path, false);
tmpChildrenPath.removeAll(previousChildPath);
if (tmpChildrenPath.size() == 0) {
}
} catch (Exception e) {
logger.error("Failed to fetch path[{}] children.", path, e);
}
}
private void retryWatch() {
try {
client.getChildren(path, this);
} catch (Exception e) {
logger.error("Failed to rewatch path[{}]", path, e);
}
@Override
public void start(Properties centerConfig) {
ZookeeperConfig config = new ZookeeperConfig(centerConfig);
client = new ZkClient(config.getConnectURL(), 60 * 1000);
if (config.hasAuthInfo()) {
client.addAuthInfo(config.getAutSchema(), config.getAuth());
}
}
}
package com.a.eye.skywalking.registry;
package com.a.eye.skywalking.registry.impl.zookeeper;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.EventType;
import com.a.eye.skywalking.registry.api.NotifyListener;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import junit.framework.TestSuite;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Created by xin on 2016/11/12.
*/
public class RegistryCenterFactoryTest {
import static org.junit.Assert.assertTrue;
public class ZookeeperRegistryCenterTest extends TestSuite {
private TestingServer zkTestServer;
private ZkClient zkClient;
private RegistryCenter registryCenter;
private ZooKeeper zooKeeper;
@Before
public void setUp() throws IOException {
LogManager.setLogResolver(new Log4j2Resolver());
public void setUp() throws Exception {
zkTestServer = new TestingServer(2181, true);
zkClient = new ZkClient("127.0.0.1:2181", 2000);
registryCenter = RegistryCenterFactory.INSTANCE.getRegistryCenter(CenterType.DEFAULT_CENTER_TYPE);
Properties config = new Properties();
config.setProperty(ZookeeperConfig.CONNECT_URL, "127.0.0.1:2181");
config.put(ZookeeperConfig.CONNECT_URL, "127.0.0.1:2181");
registryCenter.start(config);
zooKeeper = new ZooKeeper("127.0.0.1:2181", 60 * 1000, new Watcher(){
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
@Test
public void testRegistry() throws KeeperException, InterruptedException {
registryCenter.register("/a/b/c");
assertNotNull(zooKeeper.exists("/a/b/c",false));
}
@After
public void clearUp() throws KeeperException, InterruptedException {
//zooKeeper.delete("/a", -1);
public void tearDown() throws Exception {
zkTestServer.getTempDirectory().delete();
zkTestServer.stop();
}
@Test
public void testSubscribe(){
registryCenter.subscribe("/a", new NotifyListener() {
public void subscribeNodeTest() throws InterruptedException {
final StringBuilder addUrl = new StringBuilder();
registryCenter.subscribe("/skywalking/storage", new NotifyListener() {
@Override
public void notify(EventType type, String urls) {
assertEquals(type, EventType.Add);
assertEquals(urls,"b");
public void notify(List<String> currentUrls) {
for (String url : currentUrls) {
addUrl.append(url + ",");
}
}
});
registryCenter.register("/a/b");
registryCenter.register("/a/d");
registryCenter.register("/skywalking/storage/127.0.0.1:9400");
Thread.sleep(100L);
assertEquals(addUrl.deleteCharAt(addUrl.length() - 1).toString(), "127.0.0.1:9400");
}
registryCenter.register("/a/e");
@Test
public void registryNodeTest() throws IOException, InterruptedException, KeeperException {
registryCenter.register("/skywalking/storage/test");
assertTrue(zkClient.exists("/skywalking/storage/test"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册