From 9598cd09607b2dbe660ae16f4a95c79e107907a8 Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 14 Mar 2019 19:14:03 +0800 Subject: [PATCH] Merge pull request #3603, configcenter share zookeeper connection with registry. Fixes #3288 --- .../dubbo/configcenter/ConfigChangeEvent.java | 10 +- .../dubbo-configcenter-zookeeper/pom.xml | 15 +- .../support/zookeeper/CacheListener.java | 94 +++++----- .../ZookeeperDynamicConfiguration.java | 71 ++------ .../ZookeeperDynamicConfigurationFactory.java | 11 +- .../ZookeeperDynamicConfigurationTest.java | 1 + dubbo-dependencies-bom/pom.xml | 14 +- .../dubbo-dependencies-zookeeper/pom.xml | 4 - .../dubbo-remoting-zookeeper/pom.xml | 16 +- ...eperTransporter.java => DataListener.java} | 16 +- .../dubbo/remoting/zookeeper/EventType.java | 65 +++++++ .../remoting/zookeeper/ZookeeperClient.java | 16 ++ .../curator/CuratorZookeeperClient.java | 158 +++++++++++++--- .../support/AbstractZookeeperClient.java | 45 ++++- .../zookeeper/zkclient/ZkClientWrapper.java | 144 --------------- .../zkclient/ZkclientZookeeperClient.java | 168 ------------------ ...bo.remoting.zookeeper.ZookeeperTransporter | 3 +- .../curator/CuratorZookeeperClientTest.java | 44 ++++- .../zkclient/ZkClientWrapperTest.java | 56 ------ .../zkclient/ZkclientZookeeperClientTest.java | 140 --------------- .../ZkclientZookeeperTransporterTest.java | 53 ------ 21 files changed, 396 insertions(+), 748 deletions(-) rename dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/{zkclient/ZkclientZookeeperTransporter.java => DataListener.java} (64%) create mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java diff --git a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java index 4a2190a47..cdedd15e3 100644 --- a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java +++ b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java @@ -49,4 +49,12 @@ public class ConfigChangeEvent { return changeType; } -} \ No newline at end of file + @Override + public String toString() { + return "ConfigChangeEvent{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + ", changeType=" + changeType + + '}'; + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml index bb9e1ad5d..5c84f6515 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml @@ -33,16 +33,9 @@ ${project.parent.version} - org.apache.curator - curator-framework - - - org.apache.curator - curator-recipes - - - org.apache.zookeeper - zookeeper + org.apache.dubbo + dubbo-remoting-zookeeper + ${project.parent.version} org.apache.curator @@ -50,4 +43,4 @@ test - \ No newline at end of file + diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java index 1851a22b2..4f6c63829 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java @@ -21,13 +21,9 @@ import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.ConfigChangeEvent; import org.apache.dubbo.configcenter.ConfigChangeType; import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; - -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -37,9 +33,8 @@ import java.util.concurrent.CountDownLatch; /** * */ -public class CacheListener implements TreeCacheListener { - private static final byte[] EMPTY_BYTES = new byte[0]; +public class CacheListener implements DataListener { private Map> keyListeners = new ConcurrentHashMap<>(); private CountDownLatch initializedLatch; private String rootPath; @@ -49,76 +44,73 @@ public class CacheListener implements TreeCacheListener { this.initializedLatch = initializedLatch; } + public void addListener(String key, ConfigurationListener configurationListener) { + Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()); + listeners.add(configurationListener); + } + + public void removeListener(String key, ConfigurationListener configurationListener) { + Set listeners = this.keyListeners.get(key); + if (listeners != null) { + listeners.remove(configurationListener); + } + } + + /** + * This is used to convert a configuration nodePath into a key + * TODO doc + * + * @param path + * @return key (nodePath less the config root path) + */ + private String pathToKey(String path) { + if (StringUtils.isEmpty(path)) { + return path; + } + return path.replace(rootPath + "/", "").replaceAll("/", "."); + } + + @Override - public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception { + public void dataChanged(String path, Object value, EventType eventType) { + if (eventType == null) { + return; + } - TreeCacheEvent.Type type = event.getType(); - ChildData data = event.getData(); - if (type == TreeCacheEvent.Type.INITIALIZED) { + if (eventType == EventType.INITIALIZED) { initializedLatch.countDown(); return; } - // TODO, ignore other event types - if (data == null) { + if (path == null || (value == null && eventType != EventType.NodeDeleted)) { return; } // TODO We limit the notification of config changes to a specific path level, for example // /dubbo/config/service/configurators, other config changes not in this level will not get notified, // say /dubbo/config/dubbo.properties - if (data.getPath().split("/").length >= 5) { - byte[] value = data.getData(); - String key = pathToKey(data.getPath()); + if (path.split("/").length >= 5) { + String key = pathToKey(path); ConfigChangeType changeType; - switch (type) { - case NODE_ADDED: + switch (eventType) { + case NodeCreated: changeType = ConfigChangeType.ADDED; break; - case NODE_REMOVED: + case NodeDeleted: changeType = ConfigChangeType.DELETED; break; - case NODE_UPDATED: + case NodeDataChanged: changeType = ConfigChangeType.MODIFIED; break; default: return; } - if (value == null) { - value = EMPTY_BYTES; - } - ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType); + ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType); Set listeners = keyListeners.get(key); if (CollectionUtils.isNotEmpty(listeners)) { listeners.forEach(listener -> listener.process(configChangeEvent)); } } } - - public void addListener(String key, ConfigurationListener configurationListener) { - Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()); - listeners.add(configurationListener); - } - - public void removeListener(String key, ConfigurationListener configurationListener) { - Set listeners = this.keyListeners.get(key); - if (listeners != null) { - listeners.remove(configurationListener); - } - } - - /** - * This is used to convert a configuration nodePath into a key - * TODO doc - * - * @param path - * @return key (nodePath less the config root path) - */ - private String pathToKey(String path) { - if (StringUtils.isEmpty(path)) { - return path; - } - return path.replace(rootPath + "/", "").replaceAll("/", "."); - } } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java index 7a106f86d..dac49ccb0 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java @@ -16,28 +16,21 @@ */ package org.apache.dubbo.configcenter.support.zookeeper; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; +import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.apache.curator.framework.CuratorFrameworkFactory.newClient; import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; /** @@ -45,52 +38,32 @@ import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; */ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class); - private Executor executor; - private CuratorFramework client; + private Executor executor; // The final root path would be: /configRootPath/"config" private String rootPath; - private TreeCache treeCache; + private final ZookeeperClient zkClient; private CountDownLatch initializedLatch; private CacheListener cacheListener; private URL url; - ZookeeperDynamicConfiguration(URL url) { + ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) { this.url = url; rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; - RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); - int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000); - int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000); - String connectString = url.getBackupAddress(); - client = newClient(connectString, sessionTimeout, connectTimeout, policy); - client.start(); - - try { - boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS); - if (!connected) { - if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) { - throw new IllegalStateException("Failed to connect to config center (zookeeper): " - + connectString + " in " + 3 * connectTimeout + "ms."); - } else { - logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString); - } - } - } catch (InterruptedException e) { - throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper " - + connectString + " config center, ", e); - } - initializedLatch = new CountDownLatch(1); this.cacheListener = new CacheListener(rootPath, initializedLatch); this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true)); - // build local cache + + zkClient = zookeeperTransporter.connect(url); + zkClient.addDataListener(rootPath, cacheListener, executor); try { - this.buildCache(); - } catch (Exception e) { - logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString); + // Wait for connection + this.initializedLatch.await(); + } catch (InterruptedException e) { + logger.warn("Failed to build local cache for config center (zookeeper)." + url); } } @@ -100,11 +73,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { */ @Override public Object getInternalProperty(String key) { - ChildData childData = treeCache.getCurrentData(key); - if (childData != null) { - return new String(childData.getData(), StandardCharsets.UTF_8); - } - return null; + return zkClient.getContent(key); } /** @@ -141,18 +110,4 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { return (String) getInternalProperty(rootPath + "/" + key); } - - /** - * Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background - * thread - */ - private void buildCache() throws Exception { - this.treeCache = new TreeCache(client, rootPath); - // create the watcher for future configuration updates - treeCache.getListenable().addListener(cacheListener, executor); - - // it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use. - treeCache.start(); - initializedLatch.await(); - } } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java index 7994e0461..4d78133db 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java @@ -19,13 +19,22 @@ package org.apache.dubbo.configcenter.support.zookeeper; import org.apache.dubbo.common.URL; import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; /** * */ public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + + private ZookeeperTransporter zookeeperTransporter; + + public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { + this.zookeeperTransporter = zookeeperTransporter; + } + + @Override protected DynamicConfiguration createDynamicConfiguration(URL url) { - return new ZookeeperDynamicConfiguration(url); + return new ZookeeperDynamicConfiguration(url, zookeeperTransporter); } } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java index e1ca40f80..40f9f04a9 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java @@ -133,6 +133,7 @@ public class ZookeeperDynamicConfigurationTest { @Override public void process(ConfigChangeEvent event) { + System.out.println(this + ": " + event); Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0)); countMap.put(event.getKey(), ++count); diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 1b5d108ca..245b6ff61 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -101,7 +101,6 @@ 4.4.6 1.2.46 3.4.13 - 0.2 4.0.1 2.12.0 2.9.0 @@ -151,7 +150,7 @@ - org.springframework +org.springframework spring-framework-bom ${spring_version} pom @@ -208,17 +207,6 @@ - - com.101tec - zkclient - ${zkclient_version} - - - org.apache.zookeeper - zookeeper - - - org.apache.curator curator-framework diff --git a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml index 7305c2bc8..18a3f2abc 100644 --- a/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml +++ b/dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml @@ -50,10 +50,6 @@ org.apache.curator curator-recipes - - com.101tec - zkclient - org.apache.zookeeper zookeeper diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml index 97ce12adc..24b14e9bd 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml +++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml @@ -35,16 +35,10 @@ ${project.parent.version} - org.apache.zookeeper - zookeeper - - - com.101tec - zkclient - - - org.apache.curator - curator-framework + org.apache.dubbo + dubbo-dependencies-zookeeper + ${project.parent.version} + pom org.apache.curator @@ -52,4 +46,4 @@ test - \ No newline at end of file + diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java similarity index 64% rename from dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java rename to dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java index 0ad86ff78..95b948ada 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java @@ -14,16 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.remoting.zookeeper.zkclient; +package org.apache.dubbo.remoting.zookeeper; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter; - -public class ZkclientZookeeperTransporter extends AbstractZookeeperTransporter { - @Override - public ZookeeperClient createZookeeperClient(URL url) { - return new ZkclientZookeeperClient(url); - } +/** + * 2019-02-26 + */ +public interface DataListener { + void dataChanged(String path, Object value, EventType eventType); } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java new file mode 100644 index 000000000..a1de03736 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.zookeeper; + +import org.apache.zookeeper.Watcher; + +/** + * 2019-02-26 + */ +public enum EventType { + None(-1), + NodeCreated(1), + NodeDeleted(2), + NodeDataChanged(3), + NodeChildrenChanged(4), + CONNECTION_SUSPENDED(11), + CONNECTION_RECONNECTED(12), + CONNECTION_LOST(12), + INITIALIZED(10); + + + + private final int intValue; // Integer representation of value + // for sending over wire + + EventType(int intValue) { + this.intValue = intValue; + } + + public int getIntValue() { + return intValue; + } + + public static Watcher.Event.EventType fromInt(int intValue) { + switch (intValue) { + case -1: + return Watcher.Event.EventType.None; + case 1: + return Watcher.Event.EventType.NodeCreated; + case 2: + return Watcher.Event.EventType.NodeDeleted; + case 3: + return Watcher.Event.EventType.NodeDataChanged; + case 4: + return Watcher.Event.EventType.NodeChildrenChanged; + + default: + throw new RuntimeException("Invalid integer value for conversion to EventType"); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java index b6875ee8e..cbb37479c 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java @@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.zookeeper; import org.apache.dubbo.common.URL; import java.util.List; +import java.util.concurrent.Executor; public interface ZookeeperClient { @@ -30,6 +31,21 @@ public interface ZookeeperClient { List addChildListener(String path, ChildListener listener); + /** + * @param path: directory. All of child of path will be listened. + * @param listener + */ + void addDataListener(String path, DataListener listener); + + /** + * @param path: directory. All of child of path will be listened. + * @param listener + * @param executor another thread + */ + void addDataListener(String path, DataListener listener, Executor executor); + + void removeDataListener(String path, DataListener listener); + void removeChildListener(String path, ChildListener listener); void addStateListener(StateListener listener); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index a78edda76..4bf7b6d3b 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -16,18 +16,24 @@ */ package org.apache.dubbo.remoting.zookeeper.curator; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.retry.RetryNTimes; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -36,11 +42,15 @@ import org.apache.zookeeper.WatchedEvent; import java.nio.charset.Charset; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; -public class CuratorZookeeperClient extends AbstractZookeeperClient { +public class CuratorZookeeperClient extends AbstractZookeeperClient { - private final Charset charset = Charset.forName("UTF-8"); + static final Charset charset = Charset.forName("UTF-8"); private final CuratorFramework client; + private Map treeCacheMap = new ConcurrentHashMap<>(); public CuratorZookeeperClient(URL url) { @@ -96,10 +106,15 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient addTargetChildListener(String path, CuratorWatcher listener) { + public List addTargetChildListener(String path, CuratorWatcherImpl listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { @@ -188,27 +208,73 @@ public class CuratorZookeeperClient extends AbstractZookeeperClientemptyList()); } } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + if (dataListener != null) { + TreeCacheEvent.Type type = event.getType(); + EventType eventType = null; + String content = null; + String path = null; + switch (type) { + case NODE_ADDED: + eventType = EventType.NodeCreated; + path = event.getData().getPath(); + content = new String(event.getData().getData(), charset); + break; + case NODE_UPDATED: + eventType = EventType.NodeDataChanged; + path = event.getData().getPath(); + content = new String(event.getData().getData(), charset); + break; + case NODE_REMOVED: + path = event.getData().getPath(); + eventType = EventType.NodeDeleted; + break; + case INITIALIZED: + eventType = EventType.INITIALIZED; + break; + case CONNECTION_LOST: + eventType = EventType.CONNECTION_LOST; + break; + case CONNECTION_RECONNECTED: + eventType = EventType.CONNECTION_RECONNECTED; + break; + case CONNECTION_SUSPENDED: + eventType = EventType.CONNECTION_SUSPENDED; + break; + + } + dataListener.dataChanged(path, content, eventType); + } + } } + /** + * just for unit test + * + * @return + */ + CuratorFramework getClient() { + return client; + } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java index e90f7fb9c..9697cea01 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; @@ -28,8 +29,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; -public abstract class AbstractZookeeperClient implements ZookeeperClient { +public abstract class AbstractZookeeperClient implements ZookeeperClient { protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class); @@ -39,6 +41,8 @@ public abstract class AbstractZookeeperClient implements Zo private final ConcurrentMap> childListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> listeners = new ConcurrentHashMap>(); + private volatile boolean closed = false; public AbstractZookeeperClient(URL url) { @@ -97,6 +101,37 @@ public abstract class AbstractZookeeperClient implements Zo return addTargetChildListener(path, targetListener); } + @Override + public void addDataListener(String path, DataListener listener) { + this.addDataListener(path, listener, null); + } + + @Override + public void addDataListener(String path, DataListener listener, Executor executor) { + ConcurrentMap dataListenerMap = listeners.get(path); + if (dataListenerMap == null) { + listeners.putIfAbsent(path, new ConcurrentHashMap()); + dataListenerMap = listeners.get(path); + } + TargetDataListener targetListener = dataListenerMap.get(listener); + if (targetListener == null) { + dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener)); + targetListener = dataListenerMap.get(listener); + } + addTargetDataListener(path, targetListener, executor); + } + + @Override + public void removeDataListener(String path, DataListener listener ){ + ConcurrentMap dataListenerMap = listeners.get(path); + if (dataListenerMap != null) { + TargetDataListener targetListener = dataListenerMap.remove(listener); + if(targetListener != null){ + removeTargetDataListener(path, targetListener); + } + } + } + @Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); @@ -167,6 +202,14 @@ public abstract class AbstractZookeeperClient implements Zo protected abstract List addTargetChildListener(String path, TargetChildListener listener); + protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener); + + protected abstract void addTargetDataListener(String path, TargetDataListener listener); + + protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor); + + protected abstract void removeTargetDataListener(String path, TargetDataListener listener); + protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract String doGetContent(String path); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java deleted file mode 100644 index ae8a3ef87..000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkClient; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.Assert; -import org.apache.zookeeper.Watcher.Event.KeeperState; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time - * It is also consistent with the use of curator - * - * @date 2017/10/29 - */ -public class ZkClientWrapper { - private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class); - private long timeout; - private ZkClient client; - private volatile KeeperState state; - private CompletableFuture completableFuture; - private volatile boolean started = false; - - public ZkClientWrapper(final String serverAddr, long timeout) { - this.timeout = timeout; - completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE)); - } - - public void start() { - if (!started) { - try { - client = completableFuture.get(timeout, TimeUnit.MILLISECONDS); -// this.client.subscribeStateChanges(IZkStateListener); - } catch (Throwable t) { - logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); - completableFuture.whenComplete(this::makeClientReady); - } - started = true; - } else { - logger.warn("Zkclient has already been started!"); - } - } - - public void addListener(IZkStateListener listener) { - completableFuture.whenComplete((value, exception) -> { - this.makeClientReady(value, exception); - if (exception == null) { - client.subscribeStateChanges(listener); - } - }); - } - - public boolean isConnected() { -// return client != null && state == KeeperState.SyncConnected; - return client != null; - } - - public void createPersistent(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createPersistent(path, true); - } - - public void createEphemeral(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createEphemeral(path); - } - - public void createPersistent(String path, String data) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createPersistent(path, data); - } - - public void createEphemeral(String path, String data) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createEphemeral(path, data); - } - - public void delete(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.delete(path); - } - - public List getChildren(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.getChildren(path); - } - - public String getData(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.readData(path); - } - - public boolean exists(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.exists(path); - } - - public void close() { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.close(); - } - - public List subscribeChildChanges(String path, final IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.subscribeChildChanges(path, listener); - } - - public void unsubscribeChildChanges(String path, IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.unsubscribeChildChanges(path, listener); - } - - private void makeClientReady(ZkClient client, Throwable e) { - if (e != null) { - logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); - } else { - this.client = client; -// this.client.subscribeStateChanges(IZkStateListener); - } - } - - -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java deleted file mode 100644 index c36640025..000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - - -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.I0Itec.zkclient.exception.ZkNodeExistsException; -import org.apache.dubbo.common.Constants; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.remoting.zookeeper.ChildListener; -import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; -import org.apache.zookeeper.Watcher.Event.KeeperState; - -import java.util.List; - -public class ZkclientZookeeperClient extends AbstractZookeeperClient { - - private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class); - - private final ZkClientWrapper client; - - private volatile KeeperState state = KeeperState.SyncConnected; - - public ZkclientZookeeperClient(URL url) { - super(url); - long timeout = url.getParameter(Constants.TIMEOUT_KEY, 30000L); - client = new ZkClientWrapper(url.getBackupAddress(), timeout); - client.addListener(new IZkStateListener() { - @Override - public void handleStateChanged(KeeperState state) throws Exception { - ZkclientZookeeperClient.this.state = state; - if (state == KeeperState.Disconnected) { - stateChanged(StateListener.DISCONNECTED); - } else if (state == KeeperState.SyncConnected) { - stateChanged(StateListener.CONNECTED); - } - } - - @Override - public void handleNewSession() throws Exception { - stateChanged(StateListener.RECONNECTED); - } - }); - client.start(); - } - - @Override - public void createPersistent(String path) { - try { - client.createPersistent(path); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create persistent node with " + path + ": ", e); - } - } - - @Override - public void createEphemeral(String path) { - try { - client.createEphemeral(path); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create ephemeral node with " + path + ": ", e); - } - } - - @Override - protected void createPersistent(String path, String data) { - try { - client.createPersistent(path, data); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create persistent node with " + - path + " and " + data + " : ", e); - } - } - - @Override - protected void createEphemeral(String path, String data) { - try { - client.createEphemeral(path, data); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create ephemeral node with " + - path + " and " + data + " : ", e); - } - } - - @Override - public void delete(String path) { - try { - client.delete(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to delete node with " + path + ": ", e); - } - } - - @Override - public List getChildren(String path) { - try { - return client.getChildren(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to get children node with " + path + ": ", e); - return null; - } - } - - @Override - public boolean checkExists(String path) { - try { - return client.exists(path); - } catch (Throwable t) { - logger.error("zookeeper failed to check node existing with " + path + ": ", t); - } - return false; - } - - @Override - public boolean isConnected() { - return state == KeeperState.SyncConnected; - } - - @Override - public String doGetContent(String path) { - try { - return client.getData(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to get data with " + path + ": ", e); - return null; - } - } - - @Override - public void doClose() { - client.close(); - } - - @Override - public IZkChildListener createTargetChildListener(String path, final ChildListener listener) { - return listener::childChanged; - } - - @Override - public List addTargetChildListener(String path, final IZkChildListener listener) { - return client.subscribeChildChanges(path, listener); - } - - @Override - public void removeTargetChildListener(String path, IZkChildListener listener) { - client.unsubscribeChildChanges(path, listener); - } - -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter index e9b9349f3..f8cbd5b41 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter @@ -1,2 +1 @@ -zkclient=org.apache.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter -curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter \ No newline at end of file +curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index cb89b166c..f1882e1c7 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -20,7 +20,11 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.zookeeper.ChildListener; -import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.zookeeper.WatchedEvent; import org.junit.jupiter.api.AfterEach; @@ -31,6 +35,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -41,6 +46,7 @@ import static org.mockito.Mockito.mock; public class CuratorZookeeperClientTest { private TestingServer zkServer; private CuratorZookeeperClient curatorClient; + CuratorFramework client = null; @BeforeEach public void setUp() throws Exception { @@ -48,6 +54,8 @@ public class CuratorZookeeperClientTest { zkServer = new TestingServer(zkServerPort, true); curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService")); + client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + client.start(); } @Test @@ -74,7 +82,8 @@ public class CuratorZookeeperClientTest { String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; curatorClient.create(path, false); final CountDownLatch countDownLatch = new CountDownLatch(1); - curatorClient.addTargetChildListener(path, new CuratorWatcher() { + curatorClient.addTargetChildListener(path, new CuratorZookeeperClient.CuratorWatcherImpl() { + @Override public void process(WatchedEvent watchedEvent) throws Exception { countDownLatch.countDown(); @@ -153,4 +162,35 @@ public class CuratorZookeeperClientTest { curatorClient.close(); zkServer.stop(); } + + @Test + public void testAddTargetDataListener() throws Exception { + String listenerPath = "/dubbo/service.name/configuration"; + String path = listenerPath + "/dat/data"; + String value = "vav"; + + curatorClient.create(path + "/d.json", value, true); + String valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertEquals(value, valueFromCache); + final AtomicInteger atomicInteger = new AtomicInteger(0); + curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() { + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + System.out.println("===" + event); + atomicInteger.incrementAndGet(); + } + }); + + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNotNull(valueFromCache); + curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes()); + curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes()); + curatorClient.delete(path + "/d.json"); + curatorClient.delete(path); + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNull(valueFromCache); + Thread.sleep(2000l); + Assertions.assertTrue(9l >= atomicInteger.get()); + Assertions.assertTrue(2l <= atomicInteger.get()); + } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java deleted file mode 100644 index 629c0e9f7..000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.utils.NetUtils; -import org.I0Itec.zkclient.IZkChildListener; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; - -public class ZkClientWrapperTest { - private TestingServer zkServer; - private ZkClientWrapper zkClientWrapper; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zkClientWrapper = new ZkClientWrapper("127.0.0.1:" + zkServerPort, 10000); - } - - @AfterEach - public void tearDown() throws Exception { - zkServer.stop(); - } - - @Test - public void testConnectedStatus() { - boolean connected = zkClientWrapper.isConnected(); - assertThat(connected, is(false)); - zkClientWrapper.start(); - - IZkChildListener listener = mock(IZkChildListener.class); - zkClientWrapper.subscribeChildChanges("/path", listener); - zkClientWrapper.unsubscribeChildChanges("/path", listener); - } -} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java deleted file mode 100644 index 73c402a4b..000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.I0Itec.zkclient.IZkChildListener; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.Is.is; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.hamcrest.MatcherAssert.assertThat; - -public class ZkclientZookeeperClientTest { - private TestingServer zkServer; - private ZkclientZookeeperClient zkclientZookeeperClient; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zkclientZookeeperClient = new ZkclientZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + - zkServerPort + "/org.apache.dubbo.registry.RegistryService")); - } - - @Test - public void testCheckExists() { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertThat(zkclientZookeeperClient.checkExists(path + "/noneexits"), is(false)); - } - - @Test - public void testDeletePath() { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - } - - @Test - public void testConnectState() throws Exception { - assertThat(zkclientZookeeperClient.isConnected(), is(true)); - final CountDownLatch stopLatch = new CountDownLatch(1); - zkclientZookeeperClient.addStateListener(new StateListener() { - @Override - public void stateChanged(int connected) { - stopLatch.countDown(); - } - }); - zkServer.stop(); - stopLatch.await(); - assertThat(zkclientZookeeperClient.isConnected(), is(false)); - } - - @Test - public void testChildrenListener() throws InterruptedException { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - final CountDownLatch countDownLatch = new CountDownLatch(1); - zkclientZookeeperClient.addTargetChildListener(path, new IZkChildListener() { - @Override - public void handleChildChange(String s, List list) throws Exception { - countDownLatch.countDown(); - } - }); - zkclientZookeeperClient.createPersistent(path + "/provider1"); - countDownLatch.await(); - } - - @Test - public void testGetChildren() throws IOException { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/parentProviders"; - zkclientZookeeperClient.create(path, false); - for (int i = 0; i < 5; i++) { - zkclientZookeeperClient.createEphemeral(path + "/server" + i); - } - List zookeeperClientChildren = zkclientZookeeperClient.getChildren(path); - assertThat(zookeeperClientChildren, hasSize(5)); - } - - @Test - public void testCreateContentPersistent() { - String path = "/ZkclientZookeeperClient/content.data"; - String content = "createContentTest"; - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - assertNull(zkclientZookeeperClient.getContent(path)); - - zkclientZookeeperClient.create(path, content, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertEquals(zkclientZookeeperClient.getContent(path), content); - } - - @Test - public void testCreateContentTem() { - String path = "/ZkclientZookeeperClient/content.data"; - String content = "createContentTest"; - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - assertNull(zkclientZookeeperClient.getContent(path)); - - zkclientZookeeperClient.create(path, content, true); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertEquals(zkclientZookeeperClient.getContent(path), content); - } - - @AfterEach - public void tearDown() throws Exception { - zkclientZookeeperClient.close(); - zkServer.stop(); - } -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java deleted file mode 100644 index cbadda97a..000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNot.not; -import static org.hamcrest.core.IsNull.nullValue; - -public class ZkclientZookeeperTransporterTest { - private TestingServer zkServer; - private ZookeeperClient zookeeperClient; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zookeeperClient = new ZkclientZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" + - zkServerPort + "/service")); - } - - @Test - public void testZookeeperClient() { - assertThat(zookeeperClient, not(nullValue())); - zookeeperClient.close(); - } - - @AfterEach - public void tearDown() throws Exception { - zkServer.stop(); - } -} \ No newline at end of file -- GitLab