未验证 提交 81f4c087 编写于 作者: K kezhenxu94 提交者: GitHub

Provide Consul dynamic configuration center implementation (#3560)

* Provide Consul dynamic configuration center implementation

* Add more unit tests

* Fix unit test

* Add more unit tests and remove unused file

* Remove redundant doc
上级 46318e02
......@@ -188,6 +188,17 @@ configuration:
EOT
}
generateConfigurationConsul() {
cat <<EOT >> ${var_application_file}
configuration:
consul:
# Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
hostAndPorts: \${SW_CONFIGURATION_CONSUL_ADDRESS:127.0.0.1:8500}
# Sync period in seconds. Defaults to 60 seconds.
period: \${SW_CONFIGURATION_CONSUL_PERIOD:60}
EOT
}
generateTelemetryNone() {
cat <<EOT >> ${var_application_file}
telemetry:
......@@ -346,6 +357,7 @@ EOT
apollo) generateConfigurationApollo;;
nacos) generateConfigurationNacos;;
zookeeper) generateConfigurationZookeeper;;
consul) generateConfigurationConsul;;
esac
cat <<EOT >> ${var_application_file}
......
......@@ -94,8 +94,18 @@ configuration:
clusterName: "default"
```
## 3rd party Configuration Center
We are welcome contributions to implement this module provider to support popular configuration center,
such as Consul. Submit issue to discuss.
## Dynamic Configuration Consul Implementation
[Consul](https://github.com/rickfast/consul-client) is also supported as DCC(Dynamic Configuration Center), to use it, please configure as follows:
```yaml
configuration:
consul:
# Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
hostAndPorts: 127.0.0.1:8500
# Sync period in seconds. Defaults to 60 seconds.
period: 60
```
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-configuration</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>configuration-consul</artifactId>
<properties>
<consul.client.version>1.2.6</consul.client.version>
<consul.image.version>0.9</consul.image.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>${consul.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<showLogs>true</showLogs>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
<images>
<image>
<name>consul:${consul.image.version}</name>
<alias>cluster-consul-plugin-integration-test-cluster</alias>
<run>
<cmd>agent -server -bootstrap-expect=1 -client=0.0.0.0</cmd>
<ports>
<port>consul.port:8500</port>
</ports>
<wait>
<log>Synced node info</log>
<time>30000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<consul.address>
${docker.hostname}:${consul.port}
</consul.address>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author kezhenxu94
*/
public class ConsulConfigurationCenterSettings extends ModuleConfig {
@Getter
@Setter
private long period;
@Getter
@Setter
private String hostAndPorts;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.configuration.api.AbstractConfigurationProvider;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Get configuration from Consul.
*
* @author kezhenxu94
*/
public class ConsulConfigurationProvider extends AbstractConfigurationProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationProvider.class);
private final ConsulConfigurationCenterSettings settings;
public ConsulConfigurationProvider() {
this.settings = new ConsulConfigurationCenterSettings();
}
@Override
public String name() {
return "consul";
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return settings;
}
@Override
protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
LOGGER.info("consul settings: {}", settings);
if (Strings.isNullOrEmpty(settings.getHostAndPorts())) {
throw new ModuleStartException("Consul hostAndPorts cannot be null or empty");
}
return new ConsulConfigurationWatcherRegister(settings);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.base.Splitter;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author kezhenxu94
*/
@SuppressWarnings("UnstableApiUsage")
public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationWatcherRegister.class);
private static final int DEFAULT_PORT = 8500;
private final KeyValueClient consul;
private final Map<String, Optional<String>> configItemKeyedByName;
private final Map<String, KVCache> cachesByKey;
public ConsulConfigurationWatcherRegister(ConsulConfigurationCenterSettings settings) {
super(settings.getPeriod());
this.configItemKeyedByName = new ConcurrentHashMap<>();
this.cachesByKey = new ConcurrentHashMap<>();
List<HostAndPort> hostAndPorts = Splitter.on(",")
.splitToList(settings.getHostAndPorts())
.parallelStream()
.map(hostAndPort -> HostAndPort.fromString(hostAndPort).withDefaultPort(DEFAULT_PORT))
.collect(Collectors.toList());
Consul.Builder builder = Consul.builder().withConnectTimeoutMillis(3000);
if (hostAndPorts.size() == 1) {
builder.withHostAndPort(hostAndPorts.get(0));
} else {
builder.withMultipleHostAndPort(hostAndPorts, 5000);
}
consul = builder.build().keyValueClient();
}
@Override
public ConfigTable readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
final ConfigTable table = new ConfigTable();
configItemKeyedByName.forEach((key, value) -> {
if (value.isPresent()) {
table.add(new ConfigTable.ConfigItem(key, value.get()));
} else {
table.add(new ConfigTable.ConfigItem(key, null));
}
});
return table;
}
private void registerKeyListeners(final Set<String> keys) {
keys.forEach(key -> {
KVCache cache = KVCache.newCache(consul, key);
cache.addListener(newValues -> {
Optional<Value> value = newValues.values().stream().filter(it -> key.equals(it.getKey())).findAny();
if (value.isPresent()) {
onKeyValueChanged(key, value.get().getValueAsString().orElse(null));
} else {
onKeyValueChanged(key, null);
}
});
cache.start();
cachesByKey.put(key, cache);
});
}
private void removeUninterestedKeys(final Set<String> interestedKeys) {
final Set<String> uninterestedKeys = new HashSet<>(cachesByKey.keySet());
uninterestedKeys.removeAll(interestedKeys);
uninterestedKeys.forEach(k -> {
KVCache cache = cachesByKey.remove(k);
if (cache != null) {
cache.stop();
}
});
}
private void onKeyValueChanged(String key, String value) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Consul config changed: {}: {}", key, value);
}
configItemKeyedByName.put(key, Optional.ofNullable(value));
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.consul.ConsulConfigurationProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Test;
/**
* @author kezhenxu94
*/
public class ConsulConfigurationProviderTest {
@Test(expected = ModuleStartException.class)
public void shouldThrowWhenSettingsInvalid() throws ModuleStartException {
ConsulConfigurationProvider provider = new ConsulConfigurationProvider();
provider.initConfigReader();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author kezhenxu94
*/
public class ConsulConfigurationTestModule extends ModuleDefine {
public static final String NAME = "test-module";
public ConsulConfigurationTestModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author kezhenxu94
*/
public class ConsulConfigurationTestProvider extends ModuleProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return ConsulConfigurationTestModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return new ModuleConfig() {
};
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
watcher = new ConfigChangeWatcher(ConsulConfigurationTestModule.NAME, this, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeWatcher.ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
testValue = value.getNewValue();
}
}
@Override
public String value() {
return testValue;
}
};
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[]{
ConfigurationModule.NAME
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.ImmutableValue;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author kezhenxu94
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(KVCache.class)
@SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"})
public class ConsulConfigurationWatcherRegisterTest {
@Mock
private ConsulConfigurationWatcherRegister register;
private ConcurrentHashMap<String, KVCache> cacheByKey;
private ConcurrentHashMap<String, Optional<String>> configItemKeyedByName;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldUpdateCachesWhenNotified() {
cacheByKey = new ConcurrentHashMap<>();
configItemKeyedByName = new ConcurrentHashMap<>();
Whitebox.setInternalState(register, "cachesByKey", cacheByKey);
Whitebox.setInternalState(register, "configItemKeyedByName", configItemKeyedByName);
KVCache cache1 = mock(KVCache.class);
KVCache cache2 = mock(KVCache.class);
ArgumentCaptor<ConsulCache.Listener> listener1 = ArgumentCaptor.forClass(ConsulCache.Listener.class);
ArgumentCaptor<ConsulCache.Listener> listener2 = ArgumentCaptor.forClass(ConsulCache.Listener.class);
PowerMockito.mockStatic(KVCache.class);
PowerMockito.when(KVCache.newCache(any(KeyValueClient.class), eq("key1"))).thenReturn(cache1);
PowerMockito.when(KVCache.newCache(any(KeyValueClient.class), eq("key2"))).thenReturn(cache2);
when(register.readConfig(any(Set.class))).thenCallRealMethod();
register.readConfig(Sets.newHashSet("key1", "key2"));
verify(cache1).addListener(listener1.capture());
verify(cache2).addListener(listener2.capture());
listener1.getValue().notify(
ImmutableMap.of(
"key1",
ImmutableValue
.builder()
.createIndex(0)
.modifyIndex(0)
.lockIndex(0)
.key("key1")
.flags(0)
.value(BaseEncoding.base64().encode("val1".getBytes()))
.build())
);
listener2.getValue().notify(
ImmutableMap.of(
"key2",
ImmutableValue
.builder()
.createIndex(0)
.modifyIndex(0)
.lockIndex(0)
.key("key2")
.flags(0)
.value(BaseEncoding.base64().encode("val2".getBytes()))
.build())
);
assertEquals(2, configItemKeyedByName.size());
assertEquals("val1", configItemKeyedByName.get("key1").get());
assertEquals("val2", configItemKeyedByName.get("key2").get());
}
@Test
public void shouldUnsubscribeWhenKeyRemoved() {
cacheByKey = new ConcurrentHashMap<>();
KVCache existedCache = mock(KVCache.class);
cacheByKey.put("existedKey", existedCache);
configItemKeyedByName = new ConcurrentHashMap<>();
Whitebox.setInternalState(register, "cachesByKey", cacheByKey);
Whitebox.setInternalState(register, "configItemKeyedByName", configItemKeyedByName);
KVCache cache1 = mock(KVCache.class);
KVCache cache2 = mock(KVCache.class);
ArgumentCaptor<ConsulCache.Listener> listener1 = ArgumentCaptor.forClass(ConsulCache.Listener.class);
ArgumentCaptor<ConsulCache.Listener> listener2 = ArgumentCaptor.forClass(ConsulCache.Listener.class);
PowerMockito.mockStatic(KVCache.class);
PowerMockito.when(KVCache.newCache(any(KeyValueClient.class), eq("key1"))).thenReturn(cache1);
PowerMockito.when(KVCache.newCache(any(KeyValueClient.class), eq("key2"))).thenReturn(cache2);
when(register.readConfig(any(Set.class))).thenCallRealMethod();
register.readConfig(Sets.newHashSet("key1", "key2"));
verify(cache1).addListener(listener1.capture());
verify(cache2).addListener(listener2.capture());
verify(existedCache).stop();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.consul;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.Map;
import java.util.Properties;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* @author kezhenxu94
*/
public class ITConsulConfigurationTest {
private final Yaml yaml = new Yaml();
private ConsulConfigurationTestProvider provider;
@Before
public void setUp() throws Exception {
final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
loadConfig(applicationConfiguration);
final ModuleManager moduleManager = new ModuleManager();
moduleManager.init(applicationConfiguration);
provider =
(ConsulConfigurationTestProvider) moduleManager
.find(ConsulConfigurationTestModule.NAME)
.provider();
assertNotNull(provider);
}
@Test(timeout = 60000)
public void shouldReadUpdated() {
assertNull(provider.watcher.value());
String hostAndPort = System.getProperty("consul.address", "127.0.0.1:8500");
Consul consul = Consul.builder().withHostAndPort(HostAndPort.fromString(hostAndPort)).withConnectTimeoutMillis(5000).build();
KeyValueClient client = consul.keyValueClient();
assertTrue(client.putValue("test-module.default.testKey", "1000"));
for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) {
}
assertEquals("1000", provider.watcher.value());
client.deleteKey("test-module.default.testKey");
for (String v = provider.watcher.value(); v != null; v = provider.watcher.value()) {
}
assertNull(provider.watcher.value());
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
final Object replaceValue = yaml.load(
PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(value + "", properties)
);
if (replaceValue != null) {
properties.replace(key, replaceValue);
}
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.api.ConfigurationModule
org.apache.skywalking.oap.server.configuration.consul.ConsulConfigurationTestModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.consul.ConsulConfigurationTestProvider
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test-module:
default:
testKey: 300
configuration:
consul:
# Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
hostAndPorts: ${consul.address}
# Sync period in seconds. Defaults to 60 seconds.
period: 1
......@@ -27,6 +27,7 @@
<artifactId>server-configuration</artifactId>
<packaging>pom</packaging>
<modules>
<module>configuration-api</module>
<module>grpc-configuration-sync</module>
......@@ -34,6 +35,7 @@
<module>configuration-nacos</module>
<module>configuration-zookeeper</module>
<module>configuration-etcd</module>
<module>configuration-consul</module>
</modules>
</project>
......@@ -13,72 +13,33 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
import java.io.*;
import org.slf4j.*;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public enum FileUtils {
INSTANCE;
public class BooleanUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
@Test
public void testValueToBoolean() {
assertEquals(1, BooleanUtils.booleanToValue(true));
assertEquals(0, BooleanUtils.booleanToValue(false));
}
public String readLastLine(File file) {
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");
long length = randomAccessFile.length();
if (length == 0) {
return "";
} else {
long position = length - 1;
randomAccessFile.seek(position);
while (position >= 0) {
if (randomAccessFile.read() == '\n') {
return randomAccessFile.readLine();
}
randomAccessFile.seek(position);
if (position == 0) {
return randomAccessFile.readLine();
}
position--;
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (randomAccessFile != null) {
try {
randomAccessFile.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return "";
@Test
public void testBooleanToValue() {
assertTrue(BooleanUtils.valueToBoolean(1));
assertFalse(BooleanUtils.valueToBoolean(0));
}
public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) {
if (randomAccessFile == null) {
try {
randomAccessFile = new RandomAccessFile(file, "rwd");
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
try {
long length = randomAccessFile.length();
randomAccessFile.seek(length);
randomAccessFile.writeBytes(System.lineSeparator());
randomAccessFile.writeBytes(value);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
@Test(expected = RuntimeException.class)
public void shouldThrowIfValueIsNotZeroOrOne() {
boolean ignored = BooleanUtils.valueToBoolean(123);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.library.util;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author kezhenxu94
*/
public class CollectionUtilsTest {
@Test
public void test() {
assertTrue(CollectionUtils.isEmpty((Map) null));
assertTrue(CollectionUtils.isEmpty(Collections.emptyMap()));
assertFalse(CollectionUtils.isEmpty(ImmutableMap.of(1, 2)));
assertFalse(CollectionUtils.isNotEmpty((Map) null));
assertFalse(CollectionUtils.isNotEmpty(Collections.emptyMap()));
assertTrue(CollectionUtils.isNotEmpty(ImmutableMap.of(1, 2)));
assertTrue(CollectionUtils.isEmpty((List) null));
assertTrue(CollectionUtils.isEmpty(Collections.emptyList()));
assertFalse(CollectionUtils.isEmpty(Arrays.asList(1, 2)));
assertFalse(CollectionUtils.isNotEmpty((List) null));
assertFalse(CollectionUtils.isNotEmpty(Collections.emptyList()));
assertTrue(CollectionUtils.isNotEmpty(Arrays.asList(1, 2)));
assertTrue(CollectionUtils.isEmpty((Set) null));
assertTrue(CollectionUtils.isEmpty(Collections.emptySet()));
assertFalse(CollectionUtils.isEmpty(new HashSet<>(Arrays.asList(1, 2))));
assertFalse(CollectionUtils.isNotEmpty((List) null));
assertFalse(CollectionUtils.isNotEmpty(Collections.emptySet()));
assertTrue(CollectionUtils.isNotEmpty(new HashSet<>(Arrays.asList(1, 2))));
assertFalse(CollectionUtils.isNotEmpty((Object[]) null));
assertTrue(CollectionUtils.isEmpty(new byte[0]));
assertTrue(CollectionUtils.isEmpty((byte[]) null));
assertTrue(CollectionUtils.isNotEmpty(new byte[1]));
}
}
\ No newline at end of file
......@@ -65,6 +65,21 @@ public class ConnectUtilTestCase {
List<Address> list = ConnectUtils.parse("");
}
@Test(expected = ConnectStringParseException.class)
public void shouldThrowIfOnlyComma() throws ConnectStringParseException {
List<Address> list = ConnectUtils.parse(",,");
}
@Test(expected = ConnectStringParseException.class)
public void shouldThrowIfHostWithoutPort() throws ConnectStringParseException {
List<Address> list = ConnectUtils.parse("localhost");
}
@Test(expected = ConnectStringParseException.class)
public void shouldThrowIfPortIsNotNumber() throws ConnectStringParseException {
List<Address> list = ConnectUtils.parse("localhost:what");
}
@Test(expected = ConnectStringParseException.class)
public void invalidPattern1() throws ConnectStringParseException {
List<Address> list = ConnectUtils.parse("10.0.0.1:");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.library.util;
import org.junit.Test;
import java.io.FileNotFoundException;
/**
* @author kezhenxu94
*/
public class ResourceUtilsTest {
@Test(expected = FileNotFoundException.class)
public void shouldThrowWhenResourceNotFound() throws FileNotFoundException {
ResourceUtils.read("/not-existed");
}
}
\ No newline at end of file
......@@ -208,6 +208,11 @@
<artifactId>configuration-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-consul</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
......
......@@ -139,6 +139,12 @@ telemetry:
none:
configuration:
none:
# apollo:
# apolloMeta: http://106.12.25.204:8080
# apolloCluster: default
# # apolloEnv: # defaults to null
# appId: skywalking
# period: 5
# nacos:
# # Nacos Server Host
# serverAddr: 127.0.0.1
......@@ -162,6 +168,12 @@ configuration:
# group : 'skywalking'
# serverAddr: localhost:2379
# clusterName: "default"
# consul:
# # Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
# hostAndPorts: ${consul.address}
# # Sync period in seconds. Defaults to 60 seconds.
# period: 1
#exporter:
# grpc:
# targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
......
......@@ -180,6 +180,11 @@ configuration:
# group : 'skywalking'
# serverAddr: localhost:2379
# clusterName: "default"
# consul:
# # Consul host and ports, separated by comma, e.g. 1.2.3.4:8500,2.3.4.5:8500
# hostAndPorts: ${consul.address}
# # Sync period in seconds. Defaults to 60 seconds.
# period: 1
#exporter:
# grpc:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册