package com.kwan.shuyu.watch; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.After; import org.junit.Before; import org.junit.Test; public class CuratorWatcherTest { private CuratorFramework client; /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "120.79.36.53:2181,120.79.36.53:2182,120.79.36.53:2183" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("120.79.36.53:2181,120.79.36.53:2182,120.79.36.53:2183", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("120.79.36.53:2181,120.79.36.53:2182,120.79.36.53:2183") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("itheima") .build(); //开启连接 client.start(); } @After public void close() { if (client != null) { client.close(); } } /** * 演示 NodeCache:给指定一个节点注册监听器 */ @Test public void testNodeCache() throws Exception { //1. 创建NodeCache对象 final NodeCache nodeCache = new NodeCache(client, "/app1"); //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据 nodeCache.start(true); while (true) { } } /** * 演示 PathChildrenCache:监听某个节点的所有子节点们 */ @Test public void testPathChildrenCache() throws Exception { //1.创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true); //2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); //监听子节点的数据变更,并且拿到变更后的数据 //1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); //2.判断类型是否是update if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); //3. 开启 pathChildrenCache.start(); while (true) { } } /** * 演示 TreeCache:监听某个节点自己和所有子节点们 */ @Test public void testTreeCache() throws Exception { //1. 创建监听器 TreeCache treeCache = new TreeCache(client, "/app2"); //2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); //3. 开启 treeCache.start(); while (true) { } } }