提交 096d1dae 编写于 作者: I Ian Luo 提交者: ken.lj

Merge pull request #3593, Consul support for Registry and Metadata.

上级 10ec77fb
......@@ -234,6 +234,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-consul</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
......@@ -346,6 +353,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-consul</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compatible</artifactId>
......@@ -381,6 +395,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-report-consul</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- Transitive dependencies -->
<dependency>
......@@ -468,6 +489,7 @@
<include>org.apache.dubbo:dubbo-registry-multicast</include>
<include>org.apache.dubbo:dubbo-registry-zookeeper</include>
<include>org.apache.dubbo:dubbo-registry-redis</include>
<include>org.apache.dubbo:dubbo-registry-consul</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-config-api</include>
......@@ -488,10 +510,12 @@
<include>org.apache.dubbo:dubbo-configcenter-definition</include>
<include>org.apache.dubbo:dubbo-configcenter-apollo</include>
<include>org.apache.dubbo:dubbo-configcenter-zookeeper</include>
<include>org.apache.dubbo:dubbo-configcenter-consul</include>
<include>org.apache.dubbo:dubbo-metadata-report-api</include>
<include>org.apache.dubbo:dubbo-metadata-definition</include>
<include>org.apache.dubbo:dubbo-metadata-report-redis</include>
<include>org.apache.dubbo:dubbo-metadata-report-zookeeper</include>
<include>org.apache.dubbo:dubbo-metadata-report-consul</include>
</includes>
</artifactSet>
<transformers>
......
......@@ -218,6 +218,11 @@
<artifactId>dubbo-registry-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
......@@ -303,6 +308,11 @@
<artifactId>dubbo-metadata-report-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-report-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-api</artifactId>
......@@ -318,6 +328,11 @@
<artifactId>dubbo-configcenter-apollo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-definition</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-configcenter</artifactId>
<groupId>org.apache.dubbo</groupId>
<version>2.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-configcenter-consul</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.configcenter.consul;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;
/**
* config center implementation for consul
*/
public class ConsulDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class);
private static final int DEFAULT_PORT = 8500;
private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
private static final String WATCH_TIMEOUT = "consul-watch-timeout";
private URL url;
private String rootPath;
private ConsulClient client;
private int watchTimeout = -1;
private ConcurrentMap<String, ConsulKVWatcher> watchers = new ConcurrentHashMap<>();
private ConcurrentMap<String, Long> consulIndexes = new ConcurrentHashMap<>();
private ExecutorService watcherService = newCachedThreadPool(
new NamedThreadFactory("dubbo-consul-configuration-watcher", true));
public ConsulDynamicConfiguration(URL url) {
this.url = url;
this.rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + "config";
this.watchTimeout = buildWatchTimeout(url);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
client = new ConsulClient(host, port);
}
@Override
public void addListener(String key, String group, ConfigurationListener listener) {
logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group);
ConsulKVWatcher watcher = watchers.putIfAbsent(key, new ConsulKVWatcher(key));
if (watcher == null) {
watcher = watchers.get(key);
watcherService.submit(watcher);
}
watcher.addListener(listener);
}
@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
logger.info("unregister listener " + listener.getClass() + " for config with key: " + key + ", group: " + group);
ConsulKVWatcher watcher = watchers.get(key);
if (watcher != null) {
watcher.removeListener(listener);
}
}
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
if (StringUtils.isNotEmpty(group)) {
key = group + PATH_SEPARATOR + key;
} else {
int i = key.lastIndexOf(".");
key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
}
return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
}
@Override
public Object getInternalProperty(String key) {
logger.info("get config from: " + key);
Long currentIndex = consulIndexes.computeIfAbsent(key, k -> -1L);
Response<GetValue> response = client.getKVValue(key, new QueryParams(watchTimeout, currentIndex));
GetValue value = response.getValue();
consulIndexes.put(key, response.getConsulIndex());
return value != null ? value.getDecodedValue() : null;
}
private int buildWatchTimeout(URL url) {
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
}
private class ConsulKVWatcher implements Runnable {
private String key;
private Set<ConfigurationListener> listeners;
private boolean running = true;
public ConsulKVWatcher(String key) {
this.key = convertKey(key);
this.listeners = new HashSet<>();
}
@Override
public void run() {
while (running) {
Long lastIndex = consulIndexes.computeIfAbsent(key, k -> -1L);
Response<GetValue> response = client.getKVValue(key, new QueryParams(watchTimeout, lastIndex));
Long currentIndex = response.getConsulIndex();
if (currentIndex == null || currentIndex <= lastIndex) {
continue;
}
consulIndexes.put(key, currentIndex);
String value = response.getValue().getDecodedValue();
logger.info("notify change for key: " + key + ", the value is: " + value);
ConfigChangeEvent event = new ConfigChangeEvent(key, value);
for (ConfigurationListener listener : listeners) {
listener.process(event);
}
}
}
private void addListener(ConfigurationListener listener) {
this.listeners.add(listener);
}
private void removeListener(ConfigurationListener listener) {
this.listeners.remove(listener);
}
private String convertKey(String key) {
int index = key.lastIndexOf('.');
return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
}
private void stop() {
running = false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.configcenter.consul;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;
/**
* Config center factory for consul
*/
public class ConsulDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ConsulDynamicConfiguration(url);
}
}
......@@ -33,5 +33,6 @@
<module>dubbo-configcenter-api</module>
<module>dubbo-configcenter-zookeeper</module>
<module>dubbo-configcenter-apollo</module>
<module>dubbo-configcenter-consul</module>
</modules>
</project>
\ No newline at end of file
</project>
......@@ -105,6 +105,7 @@
<curator_version>4.0.1</curator_version>
<curator_test_version>2.12.0</curator_test_version>
<jedis_version>2.9.0</jedis_version>
<consul_version>1.4.2</consul_version>
<xmemcached_version>1.3.6</xmemcached_version>
<cxf_version>3.1.15</cxf_version>
<thrift_version>0.8.0</thrift_version>
......@@ -232,6 +233,11 @@
<artifactId>jedis</artifactId>
<version>${jedis_version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>${consul_version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-metadata-report</artifactId>
<groupId>org.apache.dubbo</groupId>
<version>2.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-metadata-report-consul</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-report-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata.store.consul;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.metadata.identifier.MetadataIdentifier;
import org.apache.dubbo.metadata.support.AbstractMetadataReport;
import org.apache.dubbo.rpc.RpcException;
import com.ecwid.consul.v1.ConsulClient;
/**
* metadata report impl for consul
*/
public class ConsulMetadataReport extends AbstractMetadataReport {
private static final Logger logger = LoggerFactory.getLogger(ConsulMetadataReport.class);
private static final int DEFAULT_PORT = 8500;
private ConsulClient client;
public ConsulMetadataReport(URL url) {
super(url);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
client = new ConsulClient(host, port);
}
@Override
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
this.storeMetadata(providerMetadataIdentifier, serviceDefinitions);
}
@Override
protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) {
this.storeMetadata(consumerMetadataIdentifier, value);
}
private void storeMetadata(MetadataIdentifier identifier, String v) {
try {
client.setKVValue(identifier.getIdentifierKey() + META_DATA_SOTRE_TAG, v);
} catch (Throwable t) {
logger.error("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t);
throw new RpcException("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.metadata.store.consul;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.store.MetadataReport;
import org.apache.dubbo.metadata.support.AbstractMetadataReportFactory;
/**
* metadata report factory impl for consul
*/
public class ConsulMetadataReportFactory extends AbstractMetadataReportFactory {
@Override
protected MetadataReport createMetadataReport(URL url) {
return new ConsulMetadataReport(url);
}
}
......@@ -29,6 +29,7 @@
<module>dubbo-metadata-report-zookeeper</module>
<module>dubbo-metadata-report-redis</module>
<module>dubbo-metadata-definition</module>
<module>dubbo-metadata-report-consul</module>
</modules>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-registry</artifactId>
<groupId>org.apache.dubbo</groupId>
<version>2.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-registry-consul</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.consul;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.catalog.CatalogServicesRequest;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.dubbo.common.Constants.ANY_VALUE;
/**
* registry center implementation for consul
*/
public class ConsulRegistry extends FailbackRegistry {
private static final Logger logger = LoggerFactory.getLogger(ConsulRegistry.class);
private static final String SERVICE_TAG = "dubbo";
private static final String URL_META_KEY = "url";
private static final String WATCH_TIMEOUT = "consul-watch-timeout";
private static final String CHECK_INTERVAL = "consul-check-interval";
private static final String CHECK_TIMEOUT = "consul-check-timeout";
private static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after";
private static final int DEFAULT_PORT = 8500;
// default watch timeout in millisecond
private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
// default tcp check interval
private static final String DEFAULT_CHECK_INTERVAL = "10s";
// default tcp check timeout
private static final String DEFAULT_CHECK_TIMEOUT = "1s";
// default deregister critical server after
private static final String DEFAULT_DEREGISTER_TIME = "20s";
private ConsulClient client;
private ExecutorService notifierExecutor = newCachedThreadPool(
new NamedThreadFactory("dubbo-consul-notifier", true));
private ConcurrentMap<URL, ConsulNotifier> notifiers = new ConcurrentHashMap<>();
public ConsulRegistry(URL url) {
super(url);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
client = new ConsulClient(host, port);
}
@Override
public void register(URL url) {
if (isConsumerSide(url)) {
return;
}
super.register(url);
}
@Override
public void doRegister(URL url) {
client.agentServiceRegister(buildService(url));
}
@Override
public void unregister(URL url) {
if (isConsumerSide(url)) {
return;
}
super.unregister(url);
}
@Override
public void doUnregister(URL url) {
client.agentServiceDeregister(buildId(url));
}
@Override
public void subscribe(URL url, NotifyListener listener) {
if (isProviderSide(url)) {
return;
}
super.subscribe(url, listener);
}
@Override
public void doSubscribe(URL url, NotifyListener listener) {
Long index;
List<URL> urls;
if (ANY_VALUE.equals(url.getServiceInterface())) {
Response<Map<String, List<String>>> response = getAllServices(-1, buildWatchTimeout(url));
index = response.getConsulIndex();
List<HealthService> services = getHealthServices(response.getValue());
urls = convert(services);
} else {
String service = url.getServiceKey();
Response<List<HealthService>> response = getHealthServices(service, -1, buildWatchTimeout(url));
index = response.getConsulIndex();
urls = convert(response.getValue());
}
notify(url, listener, urls);
ConsulNotifier notifier = notifiers.computeIfAbsent(url, k -> new ConsulNotifier(url, index));
notifierExecutor.submit(notifier);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (isProviderSide(url)) {
return;
}
super.unsubscribe(url, listener);
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConsulNotifier notifier = notifiers.remove(url);
notifier.stop();
}
@Override
public boolean isAvailable() {
return client.getAgentSelf() != null;
}
@Override
public void destroy() {
super.destroy();
notifierExecutor.shutdown();
}
private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
HealthServicesRequest request = HealthServicesRequest.newBuilder()
.setTag(SERVICE_TAG)
.setQueryParams(new QueryParams(watchTimeout, index))
.setPassing(true)
.build();
return client.getHealthServices(service, request);
}
private Response<Map<String, List<String>>> getAllServices(long index, int watchTimeout) {
CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
.setQueryParams(new QueryParams(watchTimeout, index))
.build();
return client.getCatalogServices(request);
}
private List<HealthService> getHealthServices(Map<String, List<String>> services) {
return services.keySet().stream()
.filter(s -> services.get(s).contains(SERVICE_TAG))
.map(s -> getHealthServices(s, -1, -1).getValue())
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private boolean isConsumerSide(URL url) {
return url.getProtocol().equals(Constants.CONSUMER_PROTOCOL);
}
private boolean isProviderSide(URL url) {
return url.getProtocol().equals(Constants.PROVIDER_PROTOCOL);
}
private List<URL> convert(List<HealthService> services) {
return services.stream()
.map(s -> s.getService().getMeta().get(URL_META_KEY))
.map(URL::valueOf)
.collect(Collectors.toList());
}
private NewService buildService(URL url) {
NewService service = new NewService();
service.setAddress(url.getHost());
service.setPort(url.getPort());
service.setId(buildId(url));
service.setName(url.getServiceInterface());
service.setCheck(buildCheck(url));
service.setTags(buildTags(url));
service.setMeta(Collections.singletonMap(URL_META_KEY, url.toFullString()));
return service;
}
private List<String> buildTags(URL url) {
Map<String, String> params = url.getParameters();
List<String> tags = params.keySet().stream()
.map(k -> k + "=" + params.get(k))
.collect(Collectors.toList());
tags.add(SERVICE_TAG);
return tags;
}
private String buildId(URL url) {
// let's simply use url's hashcode to generate unique service id for now
return Integer.toHexString(url.hashCode());
}
private NewService.Check buildCheck(URL url) {
NewService.Check check = new NewService.Check();
check.setTcp(url.getAddress());
check.setInterval(url.getParameter(CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL));
check.setTimeout(url.getParameter(CHECK_TIMEOUT, DEFAULT_CHECK_TIMEOUT));
check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME));
return check;
}
private int buildWatchTimeout(URL url) {
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
}
private class ConsulNotifier implements Runnable {
private URL url;
private long consulIndex;
private boolean running;
ConsulNotifier(URL url, long consulIndex) {
this.url = url;
this.consulIndex = consulIndex;
this.running = true;
}
@Override
public void run() {
while (this.running) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
processServices();
} else {
processService();
}
}
}
private void processService() {
String service = url.getServiceKey();
Response<List<HealthService>> response = getHealthServices(service, consulIndex, buildWatchTimeout(url));
Long currentIndex = response.getConsulIndex();
if (currentIndex != null && currentIndex > consulIndex) {
consulIndex = currentIndex;
List<HealthService> services = response.getValue();
List<URL> urls = convert(services);
for (NotifyListener listener : getSubscribed().get(url)) {
doNotify(url, listener, urls);
}
}
}
private void processServices() {
Response<Map<String, List<String>>> response = getAllServices(consulIndex, buildWatchTimeout(url));
Long currentIndex = response.getConsulIndex();
if (currentIndex != null && currentIndex > consulIndex) {
consulIndex = currentIndex;
List<HealthService> services = getHealthServices(response.getValue());
List<URL> urls = convert(services);
for (NotifyListener listener : getSubscribed().get(url)) {
doNotify(url, listener, urls);
}
}
}
void stop() {
this.running = false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.consul;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.support.AbstractRegistryFactory;
/**
* registry center factory implementation for consul
*/
public class ConsulRegistryFactory extends AbstractRegistryFactory {
@Override
protected Registry createRegistry(URL url) {
return new ConsulRegistry(url);
}
}
......@@ -34,5 +34,6 @@
<module>dubbo-registry-multicast</module>
<module>dubbo-registry-zookeeper</module>
<module>dubbo-registry-redis</module>
<module>dubbo-registry-consul</module>
</modules>
</project>
......@@ -152,8 +152,8 @@ public class DubboProtocol extends AbstractProtocol {
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isInfoEnabled()) {
logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
if (logger.isDebugEnabled()) {
logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册