diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index 730b73cf5d317795d011eed5dd863ccfbca6818f..f38201f8e309e5c6ee0f34555bece9be84d45d89 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -234,6 +234,13 @@ compile true + + org.apache.dubbo + dubbo-registry-consul + ${project.version} + compile + true + org.apache.dubbo dubbo-monitor-api @@ -346,6 +353,13 @@ compile true + + org.apache.dubbo + dubbo-configcenter-consul + ${project.version} + compile + true + org.apache.dubbo dubbo-compatible @@ -381,6 +395,13 @@ compile true + + org.apache.dubbo + dubbo-metadata-report-consul + ${project.version} + compile + true + @@ -468,6 +489,7 @@ org.apache.dubbo:dubbo-registry-multicast org.apache.dubbo:dubbo-registry-zookeeper org.apache.dubbo:dubbo-registry-redis + org.apache.dubbo:dubbo-registry-consul org.apache.dubbo:dubbo-monitor-api org.apache.dubbo:dubbo-monitor-default org.apache.dubbo:dubbo-config-api @@ -488,10 +510,12 @@ org.apache.dubbo:dubbo-configcenter-definition org.apache.dubbo:dubbo-configcenter-apollo org.apache.dubbo:dubbo-configcenter-zookeeper + org.apache.dubbo:dubbo-configcenter-consul org.apache.dubbo:dubbo-metadata-report-api org.apache.dubbo:dubbo-metadata-definition org.apache.dubbo:dubbo-metadata-report-redis org.apache.dubbo:dubbo-metadata-report-zookeeper + org.apache.dubbo:dubbo-metadata-report-consul diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index c099731324730be694044021b9e578f45ea8fcc8..73e73dfb681898e23c7c3d2fbddf6e2d90c5ec28 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -218,6 +218,11 @@ dubbo-registry-redis ${project.version} + + org.apache.dubbo + dubbo-registry-consul + ${project.version} + org.apache.dubbo dubbo-monitor-api @@ -303,6 +308,11 @@ dubbo-metadata-report-redis ${project.version} + + org.apache.dubbo + dubbo-metadata-report-consul + ${project.version} + org.apache.dubbo dubbo-configcenter-api @@ -318,6 +328,11 @@ dubbo-configcenter-apollo ${project.version} + + org.apache.dubbo + dubbo-configcenter-consul + ${project.version} + org.apache.dubbo dubbo-metadata-definition diff --git a/dubbo-configcenter/dubbo-configcenter-consul/pom.xml b/dubbo-configcenter/dubbo-configcenter-consul/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..2a7580e837eaeff7cee793e5ebb3b308a9fa2c6b --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/pom.xml @@ -0,0 +1,44 @@ + + + + + + dubbo-configcenter + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-configcenter-consul + + + + org.apache.dubbo + dubbo-configcenter-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + + diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..5ef1985e96f33847119ff6910489f2697b87f322 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.ecwid.consul.v1.kv.model.GetValue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; +import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; + +/** + * config center implementation for consul + */ +public class ConsulDynamicConfiguration implements DynamicConfiguration { + private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class); + + private static final int DEFAULT_PORT = 8500; + private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000; + private static final String WATCH_TIMEOUT = "consul-watch-timeout"; + + private URL url; + private String rootPath; + private ConsulClient client; + private int watchTimeout = -1; + private ConcurrentMap watchers = new ConcurrentHashMap<>(); + private ConcurrentMap consulIndexes = new ConcurrentHashMap<>(); + private ExecutorService watcherService = newCachedThreadPool( + new NamedThreadFactory("dubbo-consul-configuration-watcher", true)); + + public ConsulDynamicConfiguration(URL url) { + this.url = url; + this.rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + "config"; + this.watchTimeout = buildWatchTimeout(url); + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + public void addListener(String key, String group, ConfigurationListener listener) { + logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); + ConsulKVWatcher watcher = watchers.putIfAbsent(key, new ConsulKVWatcher(key)); + if (watcher == null) { + watcher = watchers.get(key); + watcherService.submit(watcher); + } + watcher.addListener(listener); + } + + @Override + public void removeListener(String key, String group, ConfigurationListener listener) { + logger.info("unregister listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); + ConsulKVWatcher watcher = watchers.get(key); + if (watcher != null) { + watcher.removeListener(listener); + } + } + + @Override + public String getConfig(String key, String group, long timeout) throws IllegalStateException { + if (StringUtils.isNotEmpty(group)) { + key = group + PATH_SEPARATOR + key; + } else { + int i = key.lastIndexOf("."); + key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1); + } + + return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key); + } + + @Override + public Object getInternalProperty(String key) { + logger.info("get config from: " + key); + Long currentIndex = consulIndexes.computeIfAbsent(key, k -> -1L); + Response response = client.getKVValue(key, new QueryParams(watchTimeout, currentIndex)); + GetValue value = response.getValue(); + consulIndexes.put(key, response.getConsulIndex()); + return value != null ? value.getDecodedValue() : null; + } + + private int buildWatchTimeout(URL url) { + return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000; + } + + private class ConsulKVWatcher implements Runnable { + private String key; + private Set listeners; + private boolean running = true; + + public ConsulKVWatcher(String key) { + this.key = convertKey(key); + this.listeners = new HashSet<>(); + } + + @Override + public void run() { + while (running) { + Long lastIndex = consulIndexes.computeIfAbsent(key, k -> -1L); + Response response = client.getKVValue(key, new QueryParams(watchTimeout, lastIndex)); + + Long currentIndex = response.getConsulIndex(); + if (currentIndex == null || currentIndex <= lastIndex) { + continue; + } + + consulIndexes.put(key, currentIndex); + String value = response.getValue().getDecodedValue(); + logger.info("notify change for key: " + key + ", the value is: " + value); + ConfigChangeEvent event = new ConfigChangeEvent(key, value); + for (ConfigurationListener listener : listeners) { + listener.process(event); + } + } + } + + private void addListener(ConfigurationListener listener) { + this.listeners.add(listener); + } + + private void removeListener(ConfigurationListener listener) { + this.listeners.remove(listener); + } + + private String convertKey(String key) { + int index = key.lastIndexOf('.'); + return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); + } + + private void stop() { + running = false; + } + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..813b61746626a78713a85a26fb4cd3b881ccf828 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +/** + * Config center factory for consul + */ +public class ConsulDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + @Override + protected DynamicConfiguration createDynamicConfiguration(URL url) { + return new ConsulDynamicConfiguration(url); + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory new file mode 100644 index 0000000000000000000000000000000000000000..b7a5091efa32c5fc264d149be76ae0a98934b81a --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.configcenter.consul.ConsulDynamicConfigurationFactory diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml index 6512dc4fd5706704a9cb52caedcc569ce39bbdc6..fa703be6864bb3f55e5b06220412987f6e91816a 100644 --- a/dubbo-configcenter/pom.xml +++ b/dubbo-configcenter/pom.xml @@ -33,5 +33,6 @@ dubbo-configcenter-api dubbo-configcenter-zookeeper dubbo-configcenter-apollo + dubbo-configcenter-consul - \ No newline at end of file + diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 2b1666b80d6362b5b7e51a1014231ece8a4cdc28..c9fa9636592c555cb49f12d7976902c1f4285360 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -105,6 +105,7 @@ 4.0.1 2.12.0 2.9.0 + 1.4.2 1.3.6 3.1.15 0.8.0 @@ -232,6 +233,11 @@ jedis ${jedis_version} + + com.ecwid.consul + consul-api + ${consul_version} + com.googlecode.xmemcached xmemcached diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml b/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..ad02eaedda63843c1ee91b68480bd1483cb6094e --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml @@ -0,0 +1,43 @@ + + + + + + dubbo-metadata-report + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-metadata-report-consul + + + + org.apache.dubbo + dubbo-metadata-report-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java new file mode 100644 index 0000000000000000000000000000000000000000..6e24fd9dab3f93af6828441c073dc6407c5d2bc8 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metadata.store.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.metadata.identifier.MetadataIdentifier; +import org.apache.dubbo.metadata.support.AbstractMetadataReport; +import org.apache.dubbo.rpc.RpcException; + +import com.ecwid.consul.v1.ConsulClient; + +/** + * metadata report impl for consul + */ +public class ConsulMetadataReport extends AbstractMetadataReport { + private static final Logger logger = LoggerFactory.getLogger(ConsulMetadataReport.class); + private static final int DEFAULT_PORT = 8500; + + private ConsulClient client; + + public ConsulMetadataReport(URL url) { + super(url); + + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { + this.storeMetadata(providerMetadataIdentifier, serviceDefinitions); + } + + @Override + protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { + this.storeMetadata(consumerMetadataIdentifier, value); + } + + private void storeMetadata(MetadataIdentifier identifier, String v) { + try { + client.setKVValue(identifier.getIdentifierKey() + META_DATA_SOTRE_TAG, v); + } catch (Throwable t) { + logger.error("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); + throw new RpcException("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); + } + } +} diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..66d7b5e5e4527a675eb73c4dada4f2bcdac98f06 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metadata.store.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.metadata.store.MetadataReport; +import org.apache.dubbo.metadata.support.AbstractMetadataReportFactory; + +/** + * metadata report factory impl for consul + */ +public class ConsulMetadataReportFactory extends AbstractMetadataReportFactory { + @Override + protected MetadataReport createMetadataReport(URL url) { + return new ConsulMetadataReport(url); + } +} diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory new file mode 100644 index 0000000000000000000000000000000000000000..1f27535d442d1562cd0c6f1042f1b10349af42e7 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.metadata.store.consul.ConsulMetadataReportFactory diff --git a/dubbo-metadata-report/pom.xml b/dubbo-metadata-report/pom.xml index 195ae5e9299f6ed32642ecdf94372fde69135fb1..aa14d56122ee55cb252bc6044b4fe8703c37c8d3 100644 --- a/dubbo-metadata-report/pom.xml +++ b/dubbo-metadata-report/pom.xml @@ -29,6 +29,7 @@ dubbo-metadata-report-zookeeper dubbo-metadata-report-redis dubbo-metadata-definition + dubbo-metadata-report-consul diff --git a/dubbo-registry/dubbo-registry-consul/pom.xml b/dubbo-registry/dubbo-registry-consul/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..38647f259acbfa0127dae298c549705576add790 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/pom.xml @@ -0,0 +1,43 @@ + + + + + + dubbo-registry + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-registry-consul + + + + org.apache.dubbo + dubbo-registry-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java new file mode 100644 index 0000000000000000000000000000000000000000..72f7ff43b8683cad247cda05445fb21adb989775 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.consul; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.ecwid.consul.v1.agent.model.NewService; +import com.ecwid.consul.v1.catalog.CatalogServicesRequest; +import com.ecwid.consul.v1.health.HealthServicesRequest; +import com.ecwid.consul.v1.health.model.HealthService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.apache.dubbo.common.Constants.ANY_VALUE; + +/** + * registry center implementation for consul + */ +public class ConsulRegistry extends FailbackRegistry { + private static final Logger logger = LoggerFactory.getLogger(ConsulRegistry.class); + + private static final String SERVICE_TAG = "dubbo"; + private static final String URL_META_KEY = "url"; + private static final String WATCH_TIMEOUT = "consul-watch-timeout"; + private static final String CHECK_INTERVAL = "consul-check-interval"; + private static final String CHECK_TIMEOUT = "consul-check-timeout"; + private static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after"; + + private static final int DEFAULT_PORT = 8500; + // default watch timeout in millisecond + private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000; + // default tcp check interval + private static final String DEFAULT_CHECK_INTERVAL = "10s"; + // default tcp check timeout + private static final String DEFAULT_CHECK_TIMEOUT = "1s"; + // default deregister critical server after + private static final String DEFAULT_DEREGISTER_TIME = "20s"; + + private ConsulClient client; + + private ExecutorService notifierExecutor = newCachedThreadPool( + new NamedThreadFactory("dubbo-consul-notifier", true)); + private ConcurrentMap notifiers = new ConcurrentHashMap<>(); + + public ConsulRegistry(URL url) { + super(url); + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + public void register(URL url) { + if (isConsumerSide(url)) { + return; + } + + super.register(url); + } + + @Override + public void doRegister(URL url) { + client.agentServiceRegister(buildService(url)); + } + + @Override + public void unregister(URL url) { + if (isConsumerSide(url)) { + return; + } + + super.unregister(url); + } + + @Override + public void doUnregister(URL url) { + client.agentServiceDeregister(buildId(url)); + } + + @Override + public void subscribe(URL url, NotifyListener listener) { + if (isProviderSide(url)) { + return; + } + + super.subscribe(url, listener); + } + + @Override + public void doSubscribe(URL url, NotifyListener listener) { + Long index; + List urls; + if (ANY_VALUE.equals(url.getServiceInterface())) { + Response>> response = getAllServices(-1, buildWatchTimeout(url)); + index = response.getConsulIndex(); + List services = getHealthServices(response.getValue()); + urls = convert(services); + } else { + String service = url.getServiceKey(); + Response> response = getHealthServices(service, -1, buildWatchTimeout(url)); + index = response.getConsulIndex(); + urls = convert(response.getValue()); + } + + notify(url, listener, urls); + ConsulNotifier notifier = notifiers.computeIfAbsent(url, k -> new ConsulNotifier(url, index)); + notifierExecutor.submit(notifier); + } + + @Override + public void unsubscribe(URL url, NotifyListener listener) { + if (isProviderSide(url)) { + return; + } + + super.unsubscribe(url, listener); + } + + @Override + public void doUnsubscribe(URL url, NotifyListener listener) { + ConsulNotifier notifier = notifiers.remove(url); + notifier.stop(); + } + + @Override + public boolean isAvailable() { + return client.getAgentSelf() != null; + } + + @Override + public void destroy() { + super.destroy(); + notifierExecutor.shutdown(); + } + + private Response> getHealthServices(String service, long index, int watchTimeout) { + HealthServicesRequest request = HealthServicesRequest.newBuilder() + .setTag(SERVICE_TAG) + .setQueryParams(new QueryParams(watchTimeout, index)) + .setPassing(true) + .build(); + return client.getHealthServices(service, request); + } + + private Response>> getAllServices(long index, int watchTimeout) { + CatalogServicesRequest request = CatalogServicesRequest.newBuilder() + .setQueryParams(new QueryParams(watchTimeout, index)) + .build(); + return client.getCatalogServices(request); + } + + private List getHealthServices(Map> services) { + return services.keySet().stream() + .filter(s -> services.get(s).contains(SERVICE_TAG)) + .map(s -> getHealthServices(s, -1, -1).getValue()) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + + private boolean isConsumerSide(URL url) { + return url.getProtocol().equals(Constants.CONSUMER_PROTOCOL); + } + + private boolean isProviderSide(URL url) { + return url.getProtocol().equals(Constants.PROVIDER_PROTOCOL); + } + + private List convert(List services) { + return services.stream() + .map(s -> s.getService().getMeta().get(URL_META_KEY)) + .map(URL::valueOf) + .collect(Collectors.toList()); + } + + private NewService buildService(URL url) { + NewService service = new NewService(); + service.setAddress(url.getHost()); + service.setPort(url.getPort()); + service.setId(buildId(url)); + service.setName(url.getServiceInterface()); + service.setCheck(buildCheck(url)); + service.setTags(buildTags(url)); + service.setMeta(Collections.singletonMap(URL_META_KEY, url.toFullString())); + return service; + } + + private List buildTags(URL url) { + Map params = url.getParameters(); + List tags = params.keySet().stream() + .map(k -> k + "=" + params.get(k)) + .collect(Collectors.toList()); + tags.add(SERVICE_TAG); + return tags; + } + + private String buildId(URL url) { + // let's simply use url's hashcode to generate unique service id for now + return Integer.toHexString(url.hashCode()); + } + + private NewService.Check buildCheck(URL url) { + NewService.Check check = new NewService.Check(); + check.setTcp(url.getAddress()); + check.setInterval(url.getParameter(CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL)); + check.setTimeout(url.getParameter(CHECK_TIMEOUT, DEFAULT_CHECK_TIMEOUT)); + check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME)); + return check; + } + + private int buildWatchTimeout(URL url) { + return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000; + } + + private class ConsulNotifier implements Runnable { + private URL url; + private long consulIndex; + private boolean running; + + ConsulNotifier(URL url, long consulIndex) { + this.url = url; + this.consulIndex = consulIndex; + this.running = true; + } + + @Override + public void run() { + while (this.running) { + if (ANY_VALUE.equals(url.getServiceInterface())) { + processServices(); + } else { + processService(); + } + } + } + + private void processService() { + String service = url.getServiceKey(); + Response> response = getHealthServices(service, consulIndex, buildWatchTimeout(url)); + Long currentIndex = response.getConsulIndex(); + if (currentIndex != null && currentIndex > consulIndex) { + consulIndex = currentIndex; + List services = response.getValue(); + List urls = convert(services); + for (NotifyListener listener : getSubscribed().get(url)) { + doNotify(url, listener, urls); + } + } + } + + private void processServices() { + Response>> response = getAllServices(consulIndex, buildWatchTimeout(url)); + Long currentIndex = response.getConsulIndex(); + if (currentIndex != null && currentIndex > consulIndex) { + consulIndex = currentIndex; + List services = getHealthServices(response.getValue()); + List urls = convert(services); + for (NotifyListener listener : getSubscribed().get(url)) { + doNotify(url, listener, urls); + } + } + } + + void stop() { + this.running = false; + } + } +} diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..c36f009c0d07812c8300c2d584afb01290aef34a --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.support.AbstractRegistryFactory; + +/** + * registry center factory implementation for consul + */ +public class ConsulRegistryFactory extends AbstractRegistryFactory { + @Override + protected Registry createRegistry(URL url) { + return new ConsulRegistry(url); + } +} diff --git a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory new file mode 100644 index 0000000000000000000000000000000000000000..7aea18f4d8f472d43ac597f2bb4b98200e177854 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml index 93f64bfe8a497264cb4058dec444814d00a69569..f74cca0efdda47ba9fbd698cb5feee489f06aa53 100644 --- a/dubbo-registry/pom.xml +++ b/dubbo-registry/pom.xml @@ -34,5 +34,6 @@ dubbo-registry-multicast dubbo-registry-zookeeper dubbo-registry-redis + dubbo-registry-consul diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 7678977a47aac1af9375bdd3449127e2b75c80bc..77443016cfb36266924effcd4194aef5ac63282a 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -152,8 +152,8 @@ public class DubboProtocol extends AbstractProtocol { @Override public void disconnected(Channel channel) throws RemotingException { - if (logger.isInfoEnabled()) { - logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); + if (logger.isDebugEnabled()) { + logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); }