diff --git a/skywalking-commons/skywalking-registry/pom.xml b/skywalking-commons/skywalking-registry/pom.xml index 0e344b44b7e806aaa0088303a34915f3ed64290a..5ec2da6b24590b8b6fa3890cb27aa7ca564aff86 100644 --- a/skywalking-commons/skywalking-registry/pom.xml +++ b/skywalking-commons/skywalking-registry/pom.xml @@ -33,5 +33,22 @@ zookeeper 3.4.8 + + org.mockito + mockito-all + 1.10.19 + test + + + org.apache.curator + curator-test + 2.8.0 + test + + + com.101tec + zkclient + 0.10 + diff --git a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/EventType.java b/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/EventType.java deleted file mode 100644 index 30c0d141fbbd20b38f84a03df740f2ddcd101fd0..0000000000000000000000000000000000000000 --- a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/EventType.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.a.eye.skywalking.registry.api; - -/** - * Created by xin on 2016/11/10. - */ -public enum EventType { - Add, - Remove; -} diff --git a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/NotifyListener.java b/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/NotifyListener.java index 0e1ea799cdb1947007c9f0a241c592d0c49b89a6..b87e56d3a1e525509e51c4d36ac21b8e1eaf3c10 100644 --- a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/NotifyListener.java +++ b/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/api/NotifyListener.java @@ -1,5 +1,7 @@ package com.a.eye.skywalking.registry.api; +import java.util.List; + public interface NotifyListener { - void notify(EventType type, String url); + void notify(List currentUrls); } diff --git a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenter.java b/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenter.java index f2d4132f46bef7569ce03cac110657c18fc919e7..ebaed11fa1c1356eae14fb33bff66afc5b67f8b3 100644 --- a/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenter.java +++ b/skywalking-commons/skywalking-registry/src/main/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenter.java @@ -1,208 +1,57 @@ package com.a.eye.skywalking.registry.impl.zookeeper; -import com.a.eye.skywalking.logging.api.ILog; -import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.registry.api.*; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.Stat; +import com.a.eye.skywalking.registry.api.Center; +import com.a.eye.skywalking.registry.api.CenterType; +import com.a.eye.skywalking.registry.api.NotifyListener; +import com.a.eye.skywalking.registry.api.RegistryCenter; +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.ZkClient; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Properties; @Center(type = CenterType.DEFAULT_CENTER_TYPE) public class ZookeeperRegistryCenter implements RegistryCenter { - private ILog logger = LogManager.getLogger(ZookeeperRegistryCenter.class); - - public ZooKeeper client; + private ZkClient client; @Override public void register(String path) { - String createPath = path; - if (path.charAt(0) != '/') { - createPath = "/" + createPath; - } - - mkdirs(createPath, true); - } - - private void mkdirs(String path, boolean bool) { - - try { - String[] pathArray = path.split("/"); - if (pathArray.length == 0) { - return; - } + String[] pathSegment = path.split("/"); + StringBuilder createPath = new StringBuilder(); + for (int i = 0; i < pathSegment.length - 1; i++) { + if (pathSegment[i] == null || pathSegment[i].length() == 0) + continue; - StringBuilder currentCreatePath = new StringBuilder(); - for (int i = 0; i < pathArray.length - 1; i++) { - String pathSegment = pathArray[i]; - if (pathSegment.length() == 0) { - continue; - } - - currentCreatePath.append("/").append(pathSegment); - if (client.exists(currentCreatePath.toString(), false) == null) { - client.create(currentCreatePath.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } - if (bool) { - client.create(currentCreatePath.append("/").append(pathArray[pathArray.length - 1]).toString(), null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } else { - client.create(currentCreatePath.append("/").append(pathArray[pathArray.length - 1]).toString(), null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + createPath.append("/" + pathSegment[i]); + if (!exists(createPath.toString())) { + client.createPersistent(createPath.toString()); } - logger.info("register path[{}] success", path); - } catch (Exception e) { - logger.error("Failed to create path[{}]", path, e); } - } - @Override - public void subscribe(final String path, final NotifyListener listener) { - try { - if (client.exists(path, false) == null) { - logger.warn("{} was not exists. "); - mkdirs(path, false); - } - - client.getChildren(path, new SubscribeWatcher(path, listener), new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List children, Stat stat) { - for (String child : children) { - listener.notify(EventType.Add, child); - } - } - }, null); - } catch (Exception e) { - logger.error("Failed to subscribe the path {} ", path, e); - } + client.createEphemeral(createPath.append("/" + pathSegment[pathSegment.length - 1]).toString()); } @Override - public void start(final Properties centerConfig) { - final ZookeeperConfig config = new ZookeeperConfig(centerConfig); - try { - client = new ZooKeeper(config.getConnectURL(), 60 * 1000, new ConnectWatcher(config)); - if (config.hasAuthInfo()) { - client.addAuthInfo(config.getAutSchema(), config.getAuth()); + public void subscribe(String path, final NotifyListener listener) { + client.subscribeChildChanges(path, new IZkChildListener() { + @Override + public void handleChildChange(String parentPath, List children) throws Exception { + listener.notify(children); } - } catch (IOException e) { - logger.error("Failed to create zookeeper registry center [{}]", config.getConnectURL(), e); - } + }); } - private class RetryConnected implements Runnable { - - private ZookeeperConfig config; - - public RetryConnected(ZookeeperConfig config) { - this.config = config; - } - - @Override - public void run() { - while (true) { - try { - client = new ZooKeeper(config.getConnectURL(), 60 * 1000, new ConnectWatcher(config)); - } catch (Exception e) { - logger.error("failed to connect zookeeper", e); - } - - if (client.getState() == ZooKeeper.States.CONNECTED) { - logger.info("connected successfully!"); - break; - } - - try { - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - logger.error("Failed to sleep.", e); - } - } - - } - } - - - private class ConnectWatcher implements Watcher { - - private ZookeeperConfig config; - - public ConnectWatcher(ZookeeperConfig config) { - this.config = config; - } - - @Override - public void process(WatchedEvent watchedEvent) { - if (watchedEvent.getState() == Event.KeeperState.AuthFailed) { - logger.warn("failed to auth.auth url: {} auth schema:{} auth info:{}", config.getConnectURL(), - config.getAutSchema(), new String(config.getAuth())); - } - - if (watchedEvent.getState() == Event.KeeperState.Disconnected) { - logger.warn("Disconnected from zookeeper. retry connecting..."); - new Thread(new RetryConnected(config)).start(); - } - } + private boolean exists(String path) { + return client.exists(path); } - - private class SubscribeWatcher implements Watcher { - private String path; - - private NotifyListener listener; - - private List previousChildPath; - - public SubscribeWatcher(String path, NotifyListener listener) { - this.path = path; - this.listener = listener; - previousChildPath = new ArrayList(); - } - - @Override - public void process(WatchedEvent event) { - try { - client.getChildren(path, this); - - client.getChildren(path, false, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List children, Stat stat) { - System.out.println("aaaa"); - } - }, null); - }catch (Exception e){ - - } - - if (event.getType() == Event.EventType.NodeChildrenChanged) { - notifyListener(event); - } - } - - private void notifyListener(WatchedEvent event) { - try { - List tmpChildrenPath = client.getChildren(path, false); - tmpChildrenPath.removeAll(previousChildPath); - if (tmpChildrenPath.size() == 0) { - - } - } catch (Exception e) { - logger.error("Failed to fetch path[{}] children.", path, e); - } - } - - private void retryWatch() { - try { - client.getChildren(path, this); - } catch (Exception e) { - logger.error("Failed to rewatch path[{}]", path, e); - } + @Override + public void start(Properties centerConfig) { + ZookeeperConfig config = new ZookeeperConfig(centerConfig); + client = new ZkClient(config.getConnectURL(), 60 * 1000); + if (config.hasAuthInfo()) { + client.addAuthInfo(config.getAutSchema(), config.getAuth()); } } } diff --git a/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/RegistryCenterFactoryTest.java b/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/RegistryCenterFactoryTest.java deleted file mode 100644 index 595e62b650972f170e5cd9b57785d2c3d4ebfc34..0000000000000000000000000000000000000000 --- a/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/RegistryCenterFactoryTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.a.eye.skywalking.registry; - -import com.a.eye.skywalking.logging.api.LogManager; -import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver; -import com.a.eye.skywalking.registry.api.CenterType; -import com.a.eye.skywalking.registry.api.EventType; -import com.a.eye.skywalking.registry.api.NotifyListener; -import com.a.eye.skywalking.registry.api.RegistryCenter; -import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * Created by xin on 2016/11/12. - */ -public class RegistryCenterFactoryTest { - - private RegistryCenter registryCenter; - private ZooKeeper zooKeeper; - - @Before - public void setUp() throws IOException { - LogManager.setLogResolver(new Log4j2Resolver()); - registryCenter = RegistryCenterFactory.INSTANCE.getRegistryCenter(CenterType.DEFAULT_CENTER_TYPE); - Properties config = new Properties(); - config.setProperty(ZookeeperConfig.CONNECT_URL, "127.0.0.1:2181"); - registryCenter.start(config); - zooKeeper = new ZooKeeper("127.0.0.1:2181", 60 * 1000, new Watcher(){ - @Override - public void process(WatchedEvent watchedEvent) { - } - }); - } - - @Test - public void testRegistry() throws KeeperException, InterruptedException { - registryCenter.register("/a/b/c"); - assertNotNull(zooKeeper.exists("/a/b/c",false)); - } - - @After - public void clearUp() throws KeeperException, InterruptedException { - //zooKeeper.delete("/a", -1); - } - - @Test - public void testSubscribe(){ - registryCenter.subscribe("/a", new NotifyListener() { - @Override - public void notify(EventType type, String urls) { - assertEquals(type, EventType.Add); - assertEquals(urls,"b"); - } - }); - - registryCenter.register("/a/b"); - - registryCenter.register("/a/d"); - - registryCenter.register("/a/e"); - } -} diff --git a/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenterTest.java b/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenterTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f3cdfa67cd3851a01a0878242a0ae14861c8f354 --- /dev/null +++ b/skywalking-commons/skywalking-registry/src/test/java/com/a/eye/skywalking/registry/impl/zookeeper/ZookeeperRegistryCenterTest.java @@ -0,0 +1,66 @@ +package com.a.eye.skywalking.registry.impl.zookeeper; + +import com.a.eye.skywalking.registry.RegistryCenterFactory; +import com.a.eye.skywalking.registry.api.CenterType; +import com.a.eye.skywalking.registry.api.NotifyListener; +import com.a.eye.skywalking.registry.api.RegistryCenter; +import junit.framework.TestSuite; +import org.I0Itec.zkclient.ZkClient; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ZookeeperRegistryCenterTest extends TestSuite { + private TestingServer zkTestServer; + private ZkClient zkClient; + private RegistryCenter registryCenter; + + @Before + public void setUp() throws Exception { + zkTestServer = new TestingServer(2181, true); + zkClient = new ZkClient("127.0.0.1:2181", 2000); + + registryCenter = RegistryCenterFactory.INSTANCE.getRegistryCenter(CenterType.DEFAULT_CENTER_TYPE); + Properties config = new Properties(); + config.put(ZookeeperConfig.CONNECT_URL, "127.0.0.1:2181"); + registryCenter.start(config); + } + + @After + public void tearDown() throws Exception { + zkTestServer.getTempDirectory().delete(); + zkTestServer.stop(); + } + + @Test + public void subscribeNodeTest() throws InterruptedException { + final StringBuilder addUrl = new StringBuilder(); + registryCenter.subscribe("/skywalking/storage", new NotifyListener() { + @Override + public void notify(List currentUrls) { + for (String url : currentUrls) { + addUrl.append(url + ","); + } + } + }); + + registryCenter.register("/skywalking/storage/127.0.0.1:9400"); + Thread.sleep(100L); + assertEquals(addUrl.deleteCharAt(addUrl.length() - 1).toString(), "127.0.0.1:9400"); + } + + @Test + public void registryNodeTest() throws IOException, InterruptedException, KeeperException { + registryCenter.register("/skywalking/storage/test"); + assertTrue(zkClient.exists("/skywalking/storage/test")); + } +}