diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/NotifyListener.java index e410a8dffe905c69707b6d4f72d6c6f4c59d3eb0..758059327a4848c8e037624eb29096dde2d9791d 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/NotifyListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/NotifyListener.java @@ -31,8 +31,9 @@ public interface NotifyListener { * 当收到服务变更通知时触发。 * * 通知需处理契约:
- * 1. 总是以服务接口为维度全量通知,即不会通知一个服务的部分数据,用户不需要对比上一次通知结果。
- * 2. 允许不同类型的数据分开通知,比如:providers, consumers, routes, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。
+ * 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。
+ * 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。
+ * 2. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routes, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。
* 3. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。
* 4. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。
* diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/RegistryService.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/RegistryService.java index 0191cfaaf7624457fb09f063cea916854ce4b86e..e5719074ba4e4b6534ecc70bea935ac1775eb077 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/RegistryService.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/RegistryService.java @@ -63,6 +63,7 @@ public interface RegistryService { * 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*
* 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。
* 6. 允许URI相同但参数不同的URL并存,不能覆盖。
+ * 7. 必须阻塞订阅过程,等第一次通知完后再返回。
* * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 变更事件监听器,不允许为空 diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/AbstractRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/AbstractRegistry.java index f5446a9d7965c37ab2c8808fa42f25ffb7486536..434105e6bdface340923d5f4d0c02f1748d0da9a 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/AbstractRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/AbstractRegistry.java @@ -30,10 +30,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.dubbo.common.Constants; @@ -260,36 +263,28 @@ public abstract class AbstractRegistry implements Registry { } return null; } - + public List lookup(URL url) { - List urls= new ArrayList(); Map> notifiedUrls = getNotified().get(url); if (notifiedUrls != null && notifiedUrls.size() > 0) { - for (List values : notifiedUrls.values()) { - urls.addAll(values); + List result = new ArrayList(); + for (List urls : notifiedUrls.values()) { + result.addAll(urls); } - } - if (urls == null || urls.size() == 0) { - List cacheUrls = getCacheUrls(url); - if (cacheUrls != null && cacheUrls.size() > 0) { - urls.addAll(cacheUrls); - } - } - if (urls == null || urls.size() == 0) { - for (URL u: getRegistered()) { - if (UrlUtils.isMatch(url, u)) { - urls.add(u); - } - } - } - if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { - for (URL u: getSubscribed().keySet()) { - if (UrlUtils.isMatch(url, u)) { - urls.add(u); + return result; + } else { + final BlockingQueue> queue = new LinkedBlockingQueue>(1); + subscribe(url, new NotifyListener() { + public void notify(List urls) { + queue.offer(urls); } + }); + try { + return queue.poll(getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; } } - return urls; } public void register(URL url) {