提交 9598cd09 编写于 作者: C cvictory 提交者: ken.lj

Merge pull request #3603, configcenter share zookeeper connection with registry.

Fixes #3288
上级 0c2232f3
...@@ -49,4 +49,12 @@ public class ConfigChangeEvent { ...@@ -49,4 +49,12 @@ public class ConfigChangeEvent {
return changeType; return changeType;
} }
} @Override
\ No newline at end of file public String toString() {
return "ConfigChangeEvent{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
", changeType=" + changeType +
'}';
}
}
...@@ -33,16 +33,9 @@ ...@@ -33,16 +33,9 @@
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.dubbo</groupId>
<artifactId>curator-framework</artifactId> <artifactId>dubbo-remoting-zookeeper</artifactId>
</dependency> <version>${project.parent.version}</version>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
...@@ -50,4 +43,4 @@ ...@@ -50,4 +43,4 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -21,13 +21,9 @@ import org.apache.dubbo.common.utils.StringUtils; ...@@ -21,13 +21,9 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent; import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType; import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.EventType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -37,9 +33,8 @@ import java.util.concurrent.CountDownLatch; ...@@ -37,9 +33,8 @@ import java.util.concurrent.CountDownLatch;
/** /**
* *
*/ */
public class CacheListener implements TreeCacheListener {
private static final byte[] EMPTY_BYTES = new byte[0];
public class CacheListener implements DataListener {
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>(); private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
private CountDownLatch initializedLatch; private CountDownLatch initializedLatch;
private String rootPath; private String rootPath;
...@@ -49,76 +44,73 @@ public class CacheListener implements TreeCacheListener { ...@@ -49,76 +44,73 @@ public class CacheListener implements TreeCacheListener {
this.initializedLatch = initializedLatch; this.initializedLatch = initializedLatch;
} }
public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}
public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}
/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}
@Override @Override
public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception { public void dataChanged(String path, Object value, EventType eventType) {
if (eventType == null) {
return;
}
TreeCacheEvent.Type type = event.getType(); if (eventType == EventType.INITIALIZED) {
ChildData data = event.getData();
if (type == TreeCacheEvent.Type.INITIALIZED) {
initializedLatch.countDown(); initializedLatch.countDown();
return; return;
} }
// TODO, ignore other event types if (path == null || (value == null && eventType != EventType.NodeDeleted)) {
if (data == null) {
return; return;
} }
// TODO We limit the notification of config changes to a specific path level, for example // TODO We limit the notification of config changes to a specific path level, for example
// /dubbo/config/service/configurators, other config changes not in this level will not get notified, // /dubbo/config/service/configurators, other config changes not in this level will not get notified,
// say /dubbo/config/dubbo.properties // say /dubbo/config/dubbo.properties
if (data.getPath().split("/").length >= 5) { if (path.split("/").length >= 5) {
byte[] value = data.getData(); String key = pathToKey(path);
String key = pathToKey(data.getPath());
ConfigChangeType changeType; ConfigChangeType changeType;
switch (type) { switch (eventType) {
case NODE_ADDED: case NodeCreated:
changeType = ConfigChangeType.ADDED; changeType = ConfigChangeType.ADDED;
break; break;
case NODE_REMOVED: case NodeDeleted:
changeType = ConfigChangeType.DELETED; changeType = ConfigChangeType.DELETED;
break; break;
case NODE_UPDATED: case NodeDataChanged:
changeType = ConfigChangeType.MODIFIED; changeType = ConfigChangeType.MODIFIED;
break; break;
default: default:
return; return;
} }
if (value == null) { ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType);
value = EMPTY_BYTES;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
Set<ConfigurationListener> listeners = keyListeners.get(key); Set<ConfigurationListener> listeners = keyListeners.get(key);
if (CollectionUtils.isNotEmpty(listeners)) { if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent)); listeners.forEach(listener -> listener.process(configChangeEvent));
} }
} }
} }
public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}
public void removeListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.get(key);
if (listeners != null) {
listeners.remove(configurationListener);
}
}
/**
* This is used to convert a configuration nodePath into a key
* TODO doc
*
* @param path
* @return key (nodePath less the config root path)
*/
private String pathToKey(String path) {
if (StringUtils.isEmpty(path)) {
return path;
}
return path.replace(rootPath + "/", "").replaceAll("/", ".");
}
} }
...@@ -16,28 +16,21 @@ ...@@ -16,28 +16,21 @@
*/ */
package org.apache.dubbo.configcenter.support.zookeeper; package org.apache.dubbo.configcenter.support.zookeeper;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration; import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
/** /**
...@@ -45,52 +38,32 @@ import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; ...@@ -45,52 +38,32 @@ import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
*/ */
public class ZookeeperDynamicConfiguration implements DynamicConfiguration { public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class); private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
private Executor executor;
private CuratorFramework client;
private Executor executor;
// The final root path would be: /configRootPath/"config" // The final root path would be: /configRootPath/"config"
private String rootPath; private String rootPath;
private TreeCache treeCache; private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch; private CountDownLatch initializedLatch;
private CacheListener cacheListener; private CacheListener cacheListener;
private URL url; private URL url;
ZookeeperDynamicConfiguration(URL url) { ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
this.url = url; this.url = url;
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000);
int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000);
String connectString = url.getBackupAddress();
client = newClient(connectString, sessionTimeout, connectTimeout, policy);
client.start();
try {
boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS);
if (!connected) {
if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) {
throw new IllegalStateException("Failed to connect to config center (zookeeper): "
+ connectString + " in " + 3 * connectTimeout + "ms.");
} else {
logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString);
}
}
} catch (InterruptedException e) {
throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper "
+ connectString + " config center, ", e);
}
initializedLatch = new CountDownLatch(1); initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch); this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true)); this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
// build local cache
zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try { try {
this.buildCache(); // Wait for connection
} catch (Exception e) { this.initializedLatch.await();
logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString); } catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
} }
} }
...@@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { ...@@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
*/ */
@Override @Override
public Object getInternalProperty(String key) { public Object getInternalProperty(String key) {
ChildData childData = treeCache.getCurrentData(key); return zkClient.getContent(key);
if (childData != null) {
return new String(childData.getData(), StandardCharsets.UTF_8);
}
return null;
} }
/** /**
...@@ -141,18 +110,4 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { ...@@ -141,18 +110,4 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
return (String) getInternalProperty(rootPath + "/" + key); return (String) getInternalProperty(rootPath + "/" + key);
} }
/**
* Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background
* thread
*/
private void buildCache() throws Exception {
this.treeCache = new TreeCache(client, rootPath);
// create the watcher for future configuration updates
treeCache.getListenable().addListener(cacheListener, executor);
// it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use.
treeCache.start();
initializedLatch.await();
}
} }
...@@ -19,13 +19,22 @@ package org.apache.dubbo.configcenter.support.zookeeper; ...@@ -19,13 +19,22 @@ package org.apache.dubbo.configcenter.support.zookeeper;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration; import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
/** /**
* *
*/ */
public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override @Override
protected DynamicConfiguration createDynamicConfiguration(URL url) { protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url); return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
} }
} }
...@@ -133,6 +133,7 @@ public class ZookeeperDynamicConfigurationTest { ...@@ -133,6 +133,7 @@ public class ZookeeperDynamicConfigurationTest {
@Override @Override
public void process(ConfigChangeEvent event) { public void process(ConfigChangeEvent event) {
System.out.println(this + ": " + event);
Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0)); Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0));
countMap.put(event.getKey(), ++count); countMap.put(event.getKey(), ++count);
......
...@@ -101,7 +101,6 @@ ...@@ -101,7 +101,6 @@
<httpcore_version>4.4.6</httpcore_version> <httpcore_version>4.4.6</httpcore_version>
<fastjson_version>1.2.46</fastjson_version> <fastjson_version>1.2.46</fastjson_version>
<zookeeper_version>3.4.13</zookeeper_version> <zookeeper_version>3.4.13</zookeeper_version>
<zkclient_version>0.2</zkclient_version>
<curator_version>4.0.1</curator_version> <curator_version>4.0.1</curator_version>
<curator_test_version>2.12.0</curator_test_version> <curator_test_version>2.12.0</curator_test_version>
<jedis_version>2.9.0</jedis_version> <jedis_version>2.9.0</jedis_version>
...@@ -151,7 +150,7 @@ ...@@ -151,7 +150,7 @@
<dependencies> <dependencies>
<!-- Common libs --> <!-- Common libs -->
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId> <artifactId>spring-framework-bom</artifactId>
<version>${spring_version}</version> <version>${spring_version}</version>
<type>pom</type> <type>pom</type>
...@@ -208,17 +207,6 @@ ...@@ -208,17 +207,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient_version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId> <artifactId>curator-framework</artifactId>
......
...@@ -50,10 +50,6 @@ ...@@ -50,10 +50,6 @@
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId> <artifactId>curator-recipes</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
......
...@@ -35,16 +35,10 @@ ...@@ -35,16 +35,10 @@
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.dubbo</groupId>
<artifactId>zookeeper</artifactId> <artifactId>dubbo-dependencies-zookeeper</artifactId>
</dependency> <version>${project.parent.version}</version>
<dependency> <type>pom</type>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
...@@ -52,4 +46,4 @@ ...@@ -52,4 +46,4 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -14,16 +14,12 @@ ...@@ -14,16 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dubbo.remoting.zookeeper.zkclient; package org.apache.dubbo.remoting.zookeeper;
import org.apache.dubbo.common.URL; /**
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; * 2019-02-26
import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter; */
public interface DataListener {
public class ZkclientZookeeperTransporter extends AbstractZookeeperTransporter {
@Override
public ZookeeperClient createZookeeperClient(URL url) {
return new ZkclientZookeeperClient(url);
}
void dataChanged(String path, Object value, EventType eventType);
} }
...@@ -14,40 +14,52 @@ ...@@ -14,40 +14,52 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dubbo.remoting.zookeeper.zkclient; package org.apache.dubbo.remoting.zookeeper;
import org.apache.dubbo.common.URL; import org.apache.zookeeper.Watcher;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; /**
import org.apache.curator.test.TestingServer; * 2019-02-26
import org.junit.jupiter.api.AfterEach; */
import org.junit.jupiter.api.BeforeEach; public enum EventType {
import org.junit.jupiter.api.Test; None(-1),
NodeCreated(1),
import static org.hamcrest.MatcherAssert.assertThat; NodeDeleted(2),
import static org.hamcrest.core.IsNot.not; NodeDataChanged(3),
import static org.hamcrest.core.IsNull.nullValue; NodeChildrenChanged(4),
CONNECTION_SUSPENDED(11),
public class ZkclientZookeeperTransporterTest { CONNECTION_RECONNECTED(12),
private TestingServer zkServer; CONNECTION_LOST(12),
private ZookeeperClient zookeeperClient; INITIALIZED(10);
@BeforeEach
public void setUp() throws Exception {
int zkServerPort = NetUtils.getAvailablePort(); private final int intValue; // Integer representation of value
zkServer = new TestingServer(zkServerPort, true); // for sending over wire
zookeeperClient = new ZkclientZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" +
zkServerPort + "/service")); EventType(int intValue) {
this.intValue = intValue;
} }
@Test public int getIntValue() {
public void testZookeeperClient() { return intValue;
assertThat(zookeeperClient, not(nullValue()));
zookeeperClient.close();
} }
@AfterEach public static Watcher.Event.EventType fromInt(int intValue) {
public void tearDown() throws Exception { switch (intValue) {
zkServer.stop(); case -1:
return Watcher.Event.EventType.None;
case 1:
return Watcher.Event.EventType.NodeCreated;
case 2:
return Watcher.Event.EventType.NodeDeleted;
case 3:
return Watcher.Event.EventType.NodeDataChanged;
case 4:
return Watcher.Event.EventType.NodeChildrenChanged;
default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
}
} }
} }
\ No newline at end of file
...@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.zookeeper; ...@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.zookeeper;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URL;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
public interface ZookeeperClient { public interface ZookeeperClient {
...@@ -30,6 +31,21 @@ public interface ZookeeperClient { ...@@ -30,6 +31,21 @@ public interface ZookeeperClient {
List<String> addChildListener(String path, ChildListener listener); List<String> addChildListener(String path, ChildListener listener);
/**
* @param path: directory. All of child of path will be listened.
* @param listener
*/
void addDataListener(String path, DataListener listener);
/**
* @param path: directory. All of child of path will be listened.
* @param listener
* @param executor another thread
*/
void addDataListener(String path, DataListener listener, Executor executor);
void removeDataListener(String path, DataListener listener);
void removeChildListener(String path, ChildListener listener); void removeChildListener(String path, ChildListener listener);
void addStateListener(StateListener listener); void addStateListener(StateListener listener);
......
...@@ -16,18 +16,24 @@ ...@@ -16,18 +16,24 @@
*/ */
package org.apache.dubbo.remoting.zookeeper.curator; package org.apache.dubbo.remoting.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.zookeeper.ChildListener; import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.EventType;
import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.NodeExistsException;
...@@ -36,11 +42,15 @@ import org.apache.zookeeper.WatchedEvent; ...@@ -36,11 +42,15 @@ import org.apache.zookeeper.WatchedEvent;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
private final Charset charset = Charset.forName("UTF-8"); static final Charset charset = Charset.forName("UTF-8");
private final CuratorFramework client; private final CuratorFramework client;
private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
public CuratorZookeeperClient(URL url) { public CuratorZookeeperClient(URL url) {
...@@ -96,10 +106,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch ...@@ -96,10 +106,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
@Override @Override
protected void createPersistent(String path, String data) { protected void createPersistent(String path, String data) {
byte[] dataBytes = data.getBytes(charset);
try { try {
byte[] dataBytes = data.getBytes(charset);
client.create().forPath(path, dataBytes); client.create().forPath(path, dataBytes);
} catch (NodeExistsException e) { } catch (NodeExistsException e) {
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
}
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e); throw new IllegalStateException(e.getMessage(), e);
} }
...@@ -107,10 +122,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch ...@@ -107,10 +122,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
@Override @Override
protected void createEphemeral(String path, String data) { protected void createEphemeral(String path, String data) {
byte[] dataBytes = data.getBytes(charset);
try { try {
byte[] dataBytes = data.getBytes(charset);
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes); client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
} catch (NodeExistsException e) { } catch (NodeExistsException e) {
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
}
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e); throw new IllegalStateException(e.getMessage(), e);
} }
...@@ -172,12 +192,12 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch ...@@ -172,12 +192,12 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
} }
@Override @Override
public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
return new CuratorWatcherImpl(listener); return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener);
} }
@Override @Override
public List<String> addTargetChildListener(String path, CuratorWatcher listener) { public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try { try {
return client.getChildren().usingWatcher(listener).forPath(path); return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) { } catch (NoNodeException e) {
...@@ -188,27 +208,73 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch ...@@ -188,27 +208,73 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
} }
@Override @Override
public void removeTargetChildListener(String path, CuratorWatcher listener) { protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
((CuratorWatcherImpl) listener).unwatch(); return new CuratorWatcherImpl(client, listener);
}
@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
this.addTargetDataListener(path, treeCacheListener, null);
}
@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
try {
TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
treeCacheMap.putIfAbsent(path, treeCache);
treeCache.start();
if (executor == null) {
treeCache.getListenable().addListener(treeCacheListener);
} else {
treeCache.getListenable().addListener(treeCacheListener, executor);
}
} catch (Exception e) {
throw new IllegalStateException("Add treeCache listener for path:" + path, e);
}
}
@Override
protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
TreeCache treeCache = treeCacheMap.get(path);
if (treeCache != null) {
treeCache.getListenable().removeListener(treeCacheListener);
}
treeCacheListener.dataListener = null;
} }
private class CuratorWatcherImpl implements CuratorWatcher { @Override
public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
listener.unwatch();
}
static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
private CuratorFramework client;
private volatile ChildListener childListener;
private volatile DataListener dataListener;
private volatile ChildListener listener;
public CuratorWatcherImpl(ChildListener listener) { public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) {
this.listener = listener; this.client = client;
this.childListener = listener;
}
public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
this.dataListener = dataListener;
}
protected CuratorWatcherImpl() {
} }
public void unwatch() { public void unwatch() {
this.listener = null; this.childListener = null;
} }
@Override @Override
public void process(WatchedEvent event) throws Exception { public void process(WatchedEvent event) throws Exception {
if (listener != null) { if (childListener != null) {
String path = event.getPath() == null ? "" : event.getPath(); String path = event.getPath() == null ? "" : event.getPath();
listener.childChanged(path, childListener.childChanged(path,
// if path is null, curator using watcher will throw NullPointerException. // if path is null, curator using watcher will throw NullPointerException.
// if client connect or disconnect to server, zookeeper will queue // if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null). // watched event(Watcher.Event.EventType.None, .., path = null).
...@@ -217,6 +283,54 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch ...@@ -217,6 +283,54 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatch
: Collections.<String>emptyList()); : Collections.<String>emptyList());
} }
} }
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (dataListener != null) {
TreeCacheEvent.Type type = event.getType();
EventType eventType = null;
String content = null;
String path = null;
switch (type) {
case NODE_ADDED:
eventType = EventType.NodeCreated;
path = event.getData().getPath();
content = new String(event.getData().getData(), charset);
break;
case NODE_UPDATED:
eventType = EventType.NodeDataChanged;
path = event.getData().getPath();
content = new String(event.getData().getData(), charset);
break;
case NODE_REMOVED:
path = event.getData().getPath();
eventType = EventType.NodeDeleted;
break;
case INITIALIZED:
eventType = EventType.INITIALIZED;
break;
case CONNECTION_LOST:
eventType = EventType.CONNECTION_LOST;
break;
case CONNECTION_RECONNECTED:
eventType = EventType.CONNECTION_RECONNECTED;
break;
case CONNECTION_SUSPENDED:
eventType = EventType.CONNECTION_SUSPENDED;
break;
}
dataListener.dataChanged(path, content, eventType);
}
}
} }
/**
* just for unit test
*
* @return
*/
CuratorFramework getClient() {
return client;
}
} }
...@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; ...@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.zookeeper.ChildListener; import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
...@@ -28,8 +29,9 @@ import java.util.Set; ...@@ -28,8 +29,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient { public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class); protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class);
...@@ -39,6 +41,8 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo ...@@ -39,6 +41,8 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();
private volatile boolean closed = false; private volatile boolean closed = false;
public AbstractZookeeperClient(URL url) { public AbstractZookeeperClient(URL url) {
...@@ -97,6 +101,37 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo ...@@ -97,6 +101,37 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
return addTargetChildListener(path, targetListener); return addTargetChildListener(path, targetListener);
} }
@Override
public void addDataListener(String path, DataListener listener) {
this.addDataListener(path, listener, null);
}
@Override
public void addDataListener(String path, DataListener listener, Executor executor) {
ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
if (dataListenerMap == null) {
listeners.putIfAbsent(path, new ConcurrentHashMap<DataListener, TargetDataListener>());
dataListenerMap = listeners.get(path);
}
TargetDataListener targetListener = dataListenerMap.get(listener);
if (targetListener == null) {
dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener));
targetListener = dataListenerMap.get(listener);
}
addTargetDataListener(path, targetListener, executor);
}
@Override
public void removeDataListener(String path, DataListener listener ){
ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path);
if (dataListenerMap != null) {
TargetDataListener targetListener = dataListenerMap.remove(listener);
if(targetListener != null){
removeTargetDataListener(path, targetListener);
}
}
}
@Override @Override
public void removeChildListener(String path, ChildListener listener) { public void removeChildListener(String path, ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
...@@ -167,6 +202,14 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo ...@@ -167,6 +202,14 @@ public abstract class AbstractZookeeperClient<TargetChildListener> implements Zo
protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener); protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener);
protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener);
protected abstract void addTargetDataListener(String path, TargetDataListener listener);
protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor);
protected abstract void removeTargetDataListener(String path, TargetDataListener listener);
protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract void removeTargetChildListener(String path, TargetChildListener listener);
protected abstract String doGetContent(String path); protected abstract String doGetContent(String path);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time
* It is also consistent with the use of curator
*
* @date 2017/10/29
*/
public class ZkClientWrapper {
private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class);
private long timeout;
private ZkClient client;
private volatile KeeperState state;
private CompletableFuture<ZkClient> completableFuture;
private volatile boolean started = false;
public ZkClientWrapper(final String serverAddr, long timeout) {
this.timeout = timeout;
completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE));
}
public void start() {
if (!started) {
try {
client = completableFuture.get(timeout, TimeUnit.MILLISECONDS);
// this.client.subscribeStateChanges(IZkStateListener);
} catch (Throwable t) {
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
completableFuture.whenComplete(this::makeClientReady);
}
started = true;
} else {
logger.warn("Zkclient has already been started!");
}
}
public void addListener(IZkStateListener listener) {
completableFuture.whenComplete((value, exception) -> {
this.makeClientReady(value, exception);
if (exception == null) {
client.subscribeStateChanges(listener);
}
});
}
public boolean isConnected() {
// return client != null && state == KeeperState.SyncConnected;
return client != null;
}
public void createPersistent(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.createPersistent(path, true);
}
public void createEphemeral(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.createEphemeral(path);
}
public void createPersistent(String path, String data) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.createPersistent(path, data);
}
public void createEphemeral(String path, String data) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.createEphemeral(path, data);
}
public void delete(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.delete(path);
}
public List<String> getChildren(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
return client.getChildren(path);
}
public String getData(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
return client.readData(path);
}
public boolean exists(String path) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
return client.exists(path);
}
public void close() {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.close();
}
public List<String> subscribeChildChanges(String path, final IZkChildListener listener) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
return client.subscribeChildChanges(path, listener);
}
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
client.unsubscribeChildChanges(path, listener);
}
private void makeClientReady(ZkClient client, Throwable e) {
if (e != null) {
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
} else {
this.client = client;
// this.client.subscribeStateChanges(IZkStateListener);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import java.util.List;
public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {
private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class);
private final ZkClientWrapper client;
private volatile KeeperState state = KeeperState.SyncConnected;
public ZkclientZookeeperClient(URL url) {
super(url);
long timeout = url.getParameter(Constants.TIMEOUT_KEY, 30000L);
client = new ZkClientWrapper(url.getBackupAddress(), timeout);
client.addListener(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
client.start();
}
@Override
public void createPersistent(String path) {
try {
client.createPersistent(path);
} catch (ZkNodeExistsException e) {
logger.error("zookeeper failed to create persistent node with " + path + ": ", e);
}
}
@Override
public void createEphemeral(String path) {
try {
client.createEphemeral(path);
} catch (ZkNodeExistsException e) {
logger.error("zookeeper failed to create ephemeral node with " + path + ": ", e);
}
}
@Override
protected void createPersistent(String path, String data) {
try {
client.createPersistent(path, data);
} catch (ZkNodeExistsException e) {
logger.error("zookeeper failed to create persistent node with " +
path + " and " + data + " : ", e);
}
}
@Override
protected void createEphemeral(String path, String data) {
try {
client.createEphemeral(path, data);
} catch (ZkNodeExistsException e) {
logger.error("zookeeper failed to create ephemeral node with " +
path + " and " + data + " : ", e);
}
}
@Override
public void delete(String path) {
try {
client.delete(path);
} catch (ZkNoNodeException e) {
logger.error("zookeeper failed to delete node with " + path + ": ", e);
}
}
@Override
public List<String> getChildren(String path) {
try {
return client.getChildren(path);
} catch (ZkNoNodeException e) {
logger.error("zookeeper failed to get children node with " + path + ": ", e);
return null;
}
}
@Override
public boolean checkExists(String path) {
try {
return client.exists(path);
} catch (Throwable t) {
logger.error("zookeeper failed to check node existing with " + path + ": ", t);
}
return false;
}
@Override
public boolean isConnected() {
return state == KeeperState.SyncConnected;
}
@Override
public String doGetContent(String path) {
try {
return client.getData(path);
} catch (ZkNoNodeException e) {
logger.error("zookeeper failed to get data with " + path + ": ", e);
return null;
}
}
@Override
public void doClose() {
client.close();
}
@Override
public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
return listener::childChanged;
}
@Override
public List<String> addTargetChildListener(String path, final IZkChildListener listener) {
return client.subscribeChildChanges(path, listener);
}
@Override
public void removeTargetChildListener(String path, IZkChildListener listener) {
client.unsubscribeChildChanges(path, listener);
}
}
zkclient=org.apache.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
\ No newline at end of file
...@@ -20,7 +20,11 @@ import org.apache.dubbo.common.URL; ...@@ -20,7 +20,11 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.zookeeper.ChildListener; import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer; import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
...@@ -31,6 +35,7 @@ import org.junit.jupiter.api.Test; ...@@ -31,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
...@@ -41,6 +46,7 @@ import static org.mockito.Mockito.mock; ...@@ -41,6 +46,7 @@ import static org.mockito.Mockito.mock;
public class CuratorZookeeperClientTest { public class CuratorZookeeperClientTest {
private TestingServer zkServer; private TestingServer zkServer;
private CuratorZookeeperClient curatorClient; private CuratorZookeeperClient curatorClient;
CuratorFramework client = null;
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -48,6 +54,8 @@ public class CuratorZookeeperClientTest { ...@@ -48,6 +54,8 @@ public class CuratorZookeeperClientTest {
zkServer = new TestingServer(zkServerPort, true); zkServer = new TestingServer(zkServerPort, true);
curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
zkServerPort + "/org.apache.dubbo.registry.RegistryService")); zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
} }
@Test @Test
...@@ -74,7 +82,8 @@ public class CuratorZookeeperClientTest { ...@@ -74,7 +82,8 @@ public class CuratorZookeeperClientTest {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
curatorClient.create(path, false); curatorClient.create(path, false);
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
curatorClient.addTargetChildListener(path, new CuratorWatcher() { curatorClient.addTargetChildListener(path, new CuratorZookeeperClient.CuratorWatcherImpl() {
@Override @Override
public void process(WatchedEvent watchedEvent) throws Exception { public void process(WatchedEvent watchedEvent) throws Exception {
countDownLatch.countDown(); countDownLatch.countDown();
...@@ -153,4 +162,35 @@ public class CuratorZookeeperClientTest { ...@@ -153,4 +162,35 @@ public class CuratorZookeeperClientTest {
curatorClient.close(); curatorClient.close();
zkServer.stop(); zkServer.stop();
} }
@Test
public void testAddTargetDataListener() throws Exception {
String listenerPath = "/dubbo/service.name/configuration";
String path = listenerPath + "/dat/data";
String value = "vav";
curatorClient.create(path + "/d.json", value, true);
String valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertEquals(value, valueFromCache);
final AtomicInteger atomicInteger = new AtomicInteger(0);
curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("===" + event);
atomicInteger.incrementAndGet();
}
});
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNotNull(valueFromCache);
curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes());
curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes());
curatorClient.delete(path + "/d.json");
curatorClient.delete(path);
valueFromCache = curatorClient.getContent(path + "/d.json");
Assertions.assertNull(valueFromCache);
Thread.sleep(2000l);
Assertions.assertTrue(9l >= atomicInteger.get());
Assertions.assertTrue(2l <= atomicInteger.get());
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;
import org.apache.dubbo.common.utils.NetUtils;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.curator.test.TestingServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
public class ZkClientWrapperTest {
private TestingServer zkServer;
private ZkClientWrapper zkClientWrapper;
@BeforeEach
public void setUp() throws Exception {
int zkServerPort = NetUtils.getAvailablePort();
zkServer = new TestingServer(zkServerPort, true);
zkClientWrapper = new ZkClientWrapper("127.0.0.1:" + zkServerPort, 10000);
}
@AfterEach
public void tearDown() throws Exception {
zkServer.stop();
}
@Test
public void testConnectedStatus() {
boolean connected = zkClientWrapper.isConnected();
assertThat(connected, is(false));
zkClientWrapper.start();
IZkChildListener listener = mock(IZkChildListener.class);
zkClientWrapper.subscribeChildChanges("/path", listener);
zkClientWrapper.unsubscribeChildChanges("/path", listener);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.curator.test.TestingServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
public class ZkclientZookeeperClientTest {
private TestingServer zkServer;
private ZkclientZookeeperClient zkclientZookeeperClient;
@BeforeEach
public void setUp() throws Exception {
int zkServerPort = NetUtils.getAvailablePort();
zkServer = new TestingServer(zkServerPort, true);
zkclientZookeeperClient = new ZkclientZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" +
zkServerPort + "/org.apache.dubbo.registry.RegistryService"));
}
@Test
public void testCheckExists() {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
zkclientZookeeperClient.create(path, false);
assertThat(zkclientZookeeperClient.checkExists(path), is(true));
assertThat(zkclientZookeeperClient.checkExists(path + "/noneexits"), is(false));
}
@Test
public void testDeletePath() {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
zkclientZookeeperClient.create(path, false);
assertThat(zkclientZookeeperClient.checkExists(path), is(true));
zkclientZookeeperClient.delete(path);
assertThat(zkclientZookeeperClient.checkExists(path), is(false));
}
@Test
public void testConnectState() throws Exception {
assertThat(zkclientZookeeperClient.isConnected(), is(true));
final CountDownLatch stopLatch = new CountDownLatch(1);
zkclientZookeeperClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int connected) {
stopLatch.countDown();
}
});
zkServer.stop();
stopLatch.await();
assertThat(zkclientZookeeperClient.isConnected(), is(false));
}
@Test
public void testChildrenListener() throws InterruptedException {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
zkclientZookeeperClient.create(path, false);
final CountDownLatch countDownLatch = new CountDownLatch(1);
zkclientZookeeperClient.addTargetChildListener(path, new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
countDownLatch.countDown();
}
});
zkclientZookeeperClient.createPersistent(path + "/provider1");
countDownLatch.await();
}
@Test
public void testGetChildren() throws IOException {
String path = "/dubbo/org.apache.dubbo.demo.DemoService/parentProviders";
zkclientZookeeperClient.create(path, false);
for (int i = 0; i < 5; i++) {
zkclientZookeeperClient.createEphemeral(path + "/server" + i);
}
List<String> zookeeperClientChildren = zkclientZookeeperClient.getChildren(path);
assertThat(zookeeperClientChildren, hasSize(5));
}
@Test
public void testCreateContentPersistent() {
String path = "/ZkclientZookeeperClient/content.data";
String content = "createContentTest";
zkclientZookeeperClient.delete(path);
assertThat(zkclientZookeeperClient.checkExists(path), is(false));
assertNull(zkclientZookeeperClient.getContent(path));
zkclientZookeeperClient.create(path, content, false);
assertThat(zkclientZookeeperClient.checkExists(path), is(true));
assertEquals(zkclientZookeeperClient.getContent(path), content);
}
@Test
public void testCreateContentTem() {
String path = "/ZkclientZookeeperClient/content.data";
String content = "createContentTest";
zkclientZookeeperClient.delete(path);
assertThat(zkclientZookeeperClient.checkExists(path), is(false));
assertNull(zkclientZookeeperClient.getContent(path));
zkclientZookeeperClient.create(path, content, true);
assertThat(zkclientZookeeperClient.checkExists(path), is(true));
assertEquals(zkclientZookeeperClient.getContent(path), content);
}
@AfterEach
public void tearDown() throws Exception {
zkclientZookeeperClient.close();
zkServer.stop();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册