提交 76d91635 编写于 作者: A Alan Lau 提交者: wu-sheng

Support etcd configuration. (#2973)

* Support etcd configuration.
上级 860fb084
......@@ -325,6 +325,7 @@ The text of each license is the standard Apache 2.0 license.
Ctripcorp: apollo 1.4.0: https://github.com/ctripcorp/apollo Apache 2.0
etcd4j 2.17.0: https://github.com/jurmous/etcd4j Apache 2.0
javaassist 3.25.0-GA: https://github.com/jboss-javassist/javassist Apache 2.0
jackson-module-afterburner 2.9.5: https://github.com/FasterXML/jackson-modules-base, Apache 2.0
========================================================================
MIT licenses
......
......@@ -82,10 +82,14 @@
<curator-test.version>2.12.0</curator-test.version>
<etcd4j.version>2.17.0</etcd4j.version>
<etcd.version>v3.2.3</etcd.version>
<netty.version>4.1.27.Final</netty.version>
<jackson-module-afterburner.version>2.9.5</jackson-module-afterburner.version>
<antlr.version>4.7.1</antlr.version>
<freemarker.version>2.3.28</freemarker.version>
<javaassist.version>3.25.0-GA</javaassist.version>
<zookeeper.image.version>3.5</zookeeper.image.version>
</properties>
<dependencies>
......@@ -378,11 +382,75 @@
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
<exclusions>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-http</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-handler</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-resolver-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</exclusion>
</exclusions>
<version>${etcd4j.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-dns</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
<version>${jackson-module-afterburner.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
......
......@@ -36,60 +36,34 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-dns</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.27.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>4.1.27.Final</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
<exclusions>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-http</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-handler</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-resolver-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-configuration</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>configuration-etcd</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-dns</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<name>quayio/coreos-etcd:${etcd.version}</name>
<alias>etcd-client-integration-test</alias>
<run>
<ports>
<port>etcd.port:2379</port>
</ports>
<wait>
<time>5000</time>
</wait>
<entrypoint>
<!-- exec form -->
<exec>
<arg>/usr/local/bin/etcd</arg>
<arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
<arg>--listen-client-urls=http://0.0.0.0:2379</arg>
</exec>
</entrypoint>
</run>
</image>
</images>
</configuration>
</execution>
<execution>
<id>remove-it-etcd</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('etcd.host', 'localhost')
log.info("Etcd host is " + project.properties['etcd.host'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<etcd.host>
${etcd.host}
</etcd.host>
<etcd.port>
${etcd.port}
</etcd.port>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
/**
* exception type throw by Etcd Configuration.
*
* @author Alan Lau
*/
public class EtcdConfigException extends RuntimeException {
public EtcdConfigException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import mousio.client.promises.ResponsePromise;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdErrorCode;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Alan Lau
*/
public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
private final static Logger logger = LoggerFactory.getLogger(EtcdConfigWatcherRegister.class);
/**
* server settings for Etcd configuration
*/
private EtcdServerSettings settings;
/**
* etcd client.
*/
private final EtcdClient client;
private final Map<String, ResponsePromise.IsSimplePromiseResponseHandler> listenersByKey;
private final Map<String, Optional<String>> configItemKeyedByName;
private final Map<String, EtcdResponsePromise<EtcdKeysResponse>> responsePromiseByKey;
public EtcdConfigWatcherRegister(EtcdServerSettings settings) {
super(settings.getPeriod());
this.settings = settings;
this.configItemKeyedByName = new ConcurrentHashMap<>();
this.client = new EtcdClient(EtcdUtils.parse(settings).toArray(new URI[] {}));
this.listenersByKey = new ConcurrentHashMap<>();
responsePromiseByKey = new ConcurrentHashMap<>();
}
@Override public ConfigTable readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
final ConfigTable table = new ConfigTable();
for (Map.Entry<String, Optional<String>> entry : configItemKeyedByName.entrySet()) {
final String key = entry.getKey();
final Optional<String> value = entry.getValue();
if (value.isPresent()) {
table.add(new ConfigTable.ConfigItem(key, value.get()));
} else {
table.add(new ConfigTable.ConfigItem(key, null));
}
}
return table;
}
private void registerKeyListeners(final Set<String> keys) {
for (final String key : keys) {
String dataId = "/" + settings.getGroup() + "/" + key;
if (listenersByKey.containsKey(dataId)) {
continue;
}
listenersByKey.putIfAbsent(dataId, p -> {
onDataValueChanged(p, dataId);
});
try {
EtcdResponsePromise<EtcdKeysResponse> responsePromise = client.get(dataId).waitForChange().send();
responsePromise.addListener(listenersByKey.get(dataId));
responsePromiseByKey.putIfAbsent(dataId, responsePromise);
// the key is newly added, read the config for the first time
EtcdResponsePromise<EtcdKeysResponse> promise = client.get(dataId).send();
onDataValueChanged(promise, dataId);
} catch (Exception e) {
throw new EtcdConfigException("wait for etcd value change fail", e);
}
}
}
private void removeUninterestedKeys(final Set<String> interestedKeys) {
final Set<String> uninterestedKeys = new HashSet<>(listenersByKey.keySet());
uninterestedKeys.removeAll(interestedKeys);
uninterestedKeys.forEach(k -> {
final ResponsePromise.IsSimplePromiseResponseHandler listener = listenersByKey.remove(k);
if (listener != null) {
responsePromiseByKey.remove(k).removeListener(listener);
}
});
}
private void onDataValueChanged(ResponsePromise<EtcdKeysResponse> promise, String dataId) {
String key = getRealKey(dataId, settings.getGroup());
try {
EtcdKeysResponse.EtcdNode node = promise.get().getNode();
String value = node.getValue();
if (logger.isInfoEnabled()) {
logger.info("Etcd config changed: {}: {}", key, node.getValue());
}
configItemKeyedByName.put(key, Optional.ofNullable(value));
} catch (Exception e) {
if (e instanceof EtcdException) {
if (EtcdErrorCode.KeyNotFound == ((EtcdException)e).errorCode) {
configItemKeyedByName.put(key, Optional.empty());
return;
}
}
throw new EtcdConfigException("wait for value changed fail", e);
}
}
/**
* get real key in etcd cluster which is removed "/${group}" from the key retrive from etcd.
*
* @param key
* @param group
* @return
*/
private String getRealKey(String key, String group) {
int index = key.indexOf(group);
if (index <= 0) {
throw new RuntimeException("the group doesn't match");
}
String realKey = key.substring(index + group.length() + 1);
return realKey;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.configuration.api.AbstractConfigurationProvider;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Get Configuration from etcd.
*
* @author Alan Lau
*/
public class EtcdConfigurationProvider extends AbstractConfigurationProvider {
private final static Logger logger = LoggerFactory.getLogger(EtcdConfigurationProvider.class);
private EtcdServerSettings settings;
public EtcdConfigurationProvider() {
settings = new EtcdServerSettings();
}
@Override protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
logger.info("settings: {}", settings);
if (Strings.isNullOrEmpty(settings.getServerAddr())) {
throw new ModuleStartException("Etcd serverAddr cannot be null or empty.");
}
if (Strings.isNullOrEmpty(settings.getGroup())) {
throw new ModuleStartException("Etcd group cannot be null or empty.");
}
try {
return new EtcdConfigWatcherRegister(settings);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override public String name() {
return "etcd";
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return settings;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* entity wrapps the etcd cluster configuration.
*
* @author Alan Lau
*/
@ToString
@Getter
@Setter
public class EtcdServerSettings extends ModuleConfig {
private String clusterName = "default";
/**
* etcd cluster address, like "10.10.10.1:2379, 10.10.10.2:2379,10.10.10.3.2379".
*/
private String serverAddr;
/**
* directory for configuration
*/
private String group;
/**
* sec for interval refresh config data.
*/
private int period = 60;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.util.Address;
import org.apache.skywalking.oap.server.library.util.ConnectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a util for etcd serverAddr parse.
*
* @author Alan Lau
*/
public class EtcdUtils {
private final static Logger logger = LoggerFactory.getLogger(EtcdUtils.class);
public EtcdUtils() {
}
public static List<URI> parse(EtcdServerSettings settings) {
List<URI> uris = new ArrayList<>();
try {
logger.info("etcd settings is {}", settings);
List<Address> addressList = ConnectUtils.parse(settings.getServerAddr());
for (Address address : addressList) {
uris.add(new URI("http", null, address.getHost(), address.getPort(), null, null, null));
}
} catch (Exception e) {
throw new EtcdConfigException(e.getMessage(), e);
}
return uris;
}
public static List<URI> parseProp(Properties properties) {
List<URI> uris = new ArrayList<>();
try {
logger.info("etcd server addr is {}", properties);
List<Address> addressList = ConnectUtils.parse(properties.getProperty("serverAddr"));
for (Address address : addressList) {
uris.add(new URI("http", null, address.getHost(), address.getPort(), null, null, null));
}
} catch (Exception e) {
throw new EtcdConfigException(e.getMessage(), e);
}
return uris;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.etcd.EtcdConfigurationProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import com.google.common.collect.Sets;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import mousio.client.promises.ResponsePromise;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.requests.EtcdKeyGetRequest;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static junit.framework.TestCase.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* @author Alan Lau
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({EtcdKeysResponse.class, EtcdUtils.class, EtcdClient.class, URI.class})
@PowerMockIgnore({"javax.management.*"})
public class EtcdConfigWatcherRegisterTest {
@Before
@Test
public void shouldReadConfigs() throws Exception {
final String group = "skywalking";
final String testKey1 = "receiver-trace.default.slowDBAccessThreshold";
final String testVal1 = "test";
final String testKey2 = "testKey";
final String testVal2 = "testVal";
final EtcdServerSettings mockSettings = mock(EtcdServerSettings.class);
when(mockSettings.getGroup()).thenReturn(group);
mockStatic(EtcdUtils.class);
List<URI> uris = mock(List.class);
when(EtcdUtils.parse(any())).thenReturn(uris);
final EtcdClient client = PowerMockito.mock(EtcdClient.class);
whenNew(EtcdClient.class).withAnyArguments().thenReturn(client);
String port = System.getProperty("etcd.port");
URI uri = new URI("http://localhost:" + port);
List<URI> urisArray = spy(ArrayList.class);
urisArray.add(uri);
URI[] array = urisArray.toArray(new URI[] {});
when(uris.toArray(new URI[] {})).thenReturn(array);
final EtcdConfigWatcherRegister mockRegister = spy(new EtcdConfigWatcherRegister(mockSettings));
Whitebox.setInternalState(mockRegister, "client", client);
Whitebox.setInternalState(mockRegister, "settings", mockSettings);
final EtcdKeysResponse response = PowerMockito.mock(EtcdKeysResponse.class);
final EtcdKeysResponse response1 = PowerMockito.mock(EtcdKeysResponse.class);
final EtcdKeyGetRequest request = PowerMockito.mock(EtcdKeyGetRequest.class);
when(client.get("/skywalking/receiver-trace.default.slowDBAccessThreshold")).thenReturn(request);
when(request.waitForChange()).thenReturn(request);
final EtcdResponsePromise<EtcdKeysResponse> promise = mock(EtcdResponsePromise.class);
final ResponsePromise<EtcdKeysResponse> responseResponsePromise = mock(ResponsePromise.class);
when(request.send()).thenReturn(promise);
when(promise.get()).thenReturn(response);
when(responseResponsePromise.get()).thenReturn(response);
final EtcdKeysResponse.EtcdNode node = mock(EtcdKeysResponse.EtcdNode.class);
when(response.getNode()).thenReturn(node);
when(node.getKey()).thenReturn("/skywalking/receiver-trace.default.slowDBAccessThreshold");
when(node.getValue()).thenReturn("test");
final EtcdKeyGetRequest request1 = mock(EtcdKeyGetRequest.class);
when(client.get("/skywalking/testKey")).thenReturn(request1);
when(request1.waitForChange()).thenReturn(request1);
final EtcdResponsePromise<EtcdKeysResponse> promise1 = mock(EtcdResponsePromise.class);
final ResponsePromise<EtcdKeysResponse> responseResponsePromise1 = mock(ResponsePromise.class);
when(request1.send()).thenReturn(promise1);
when(promise1.get()).thenReturn(response1);
when(responseResponsePromise1.get()).thenReturn(response1);
final EtcdKeysResponse.EtcdNode node1 = mock(EtcdKeysResponse.EtcdNode.class);
when(response1.getNode()).thenReturn(node1);
when(node1.getKey()).thenReturn("/skywalking/testKey");
when(node1.getValue()).thenReturn("testVal");
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2));
assertEquals(2, configTable.getItems().size());
Map<String, String> kvs = new HashMap<>();
for (ConfigTable.ConfigItem item : configTable.getItems()) {
kvs.put(item.getName(), item.getValue());
}
assertEquals(testVal1, kvs.get(testKey1));
assertEquals(testVal2, kvs.get(testKey2));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author Alan Lau
*/
public class EtcdConfigurationTestModule extends ModuleDefine {
public static final String NAME = "test-module";
public EtcdConfigurationTestModule() {
super(NAME);
}
@Override public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Alan Lau
*/
public class EtcdConfigurationTestProvider extends ModuleProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
@Override public String name() {
return "default";
}
@Override public Class<? extends ModuleDefine> module() {
return EtcdConfigurationTestModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return new ModuleConfig() {
};
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
watcher = new ConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeWatcher.ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
testValue = value.getNewValue();
}
}
@Override
public String value() {
return testValue;
}
};
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* @author Alan Lau
*/
public class ITEtcdConfigurationTest {
private static final Logger logger = LoggerFactory.getLogger(ITEtcdConfigurationTest.class);
private final Yaml yaml = new Yaml();
private EtcdServerSettings settings;
private EtcdConfigurationTestProvider provider;
private EtcdClient client;
@Before
public void setUp() throws Exception {
final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
loadConfig(applicationConfiguration);
final ModuleManager moduleManager = new ModuleManager();
moduleManager.init(applicationConfiguration);
final String etcdHost = System.getProperty("etcd.host");
final String etcdPort = System.getProperty("etcd.port");
logger.info("etcdHost: {}, etcdPort: {}", etcdHost, etcdPort);
Properties properties = new Properties();
properties.setProperty("serverAddr", etcdHost + ":" + etcdPort);
List<URI> uris = EtcdUtils.parseProp(properties);
client = new EtcdClient(uris.toArray(new URI[] {}));
provider =
(EtcdConfigurationTestProvider)moduleManager
.find(EtcdConfigurationTestModule.NAME)
.provider();
assertNotNull(provider);
}
@Test(timeout = 20000)
public void shouldReadUpdated() throws Exception {
assertNull(provider.watcher.value());
assertTrue(publishConfig("test-module.default.testKey", "skywalking", "500"));
for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) {
logger.info("value is : {}", provider.watcher.value());
}
assertEquals("500", provider.watcher.value());
assertTrue(removeConfig("test-module.default.testKey", "skywalking"));
for (String v = provider.watcher.value(); v != null; v = provider.watcher.value()) {
}
assertNull(provider.watcher.value());
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
final Object replaceValue = yaml.load(PropertyPlaceholderHelper.INSTANCE
.replacePlaceholders(value + "", properties));
if (replaceValue != null) {
properties.replace(key, replaceValue);
}
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
}
private boolean publishConfig(String key, String group, String value) {
try {
client.putDir(group).send().get();
EtcdResponsePromise<EtcdKeysResponse> promise = client.put(generateKey(key, group), value).send();
promise.get();
return true;
} catch (Exception e) {
return false;
}
}
private boolean removeConfig(String key, String group) throws Exception {
client.delete(generateKey(key, group)).send().get();
return true;
}
private String generateKey(String key, String group) {
return new StringBuilder("/").append(group).append("/").append(key).toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.etcd;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author Alan Lau
*/
public class TestEtcdUtils {
private EtcdServerSettings settings;
private Properties properties;
@Before
public void setUp() {
settings = new EtcdServerSettings();
settings.setServerAddr("localhost:2379");
properties = new Properties();
properties.setProperty("serverAddr", "localhost:2379");
}
@Test
public void testParse() {
List<URI> list = EtcdUtils.parse(settings);
Assert.assertEquals(1, list.size());
URI uri = list.get(0);
Assert.assertEquals("http", uri.getScheme());
Assert.assertEquals("localhost", uri.getHost());
Assert.assertEquals(2379, uri.getPort());
}
@Test
public void testProp() {
List<URI> list = EtcdUtils.parseProp(properties);
Assert.assertEquals(1, list.size());
URI uri = list.get(0);
Assert.assertEquals("http", uri.getScheme());
Assert.assertEquals("localhost", uri.getHost());
Assert.assertEquals(2379, uri.getPort());
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.api.ConfigurationModule
org.apache.skywalking.oap.server.configuration.etcd.EtcdConfigurationTestModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.etcd.EtcdConfigurationTestProvider
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test-module:
default:
testKey: 300
configuration:
etcd:
# Etcd Server Host
serverAddr: ${etcd.host}:${etcd.port}
# Etcd Server Port
port: ${etcd.port}
# Etcd Configuration Group
group: 'skywalking'
# Unit seconds, sync period. Default fetch every 60 seconds.
period: 1
# the name of current cluster, set the name if you want to upstream system known.
clusterName: "default"
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -33,6 +33,7 @@
<module>configuration-apollo</module>
<module>configuration-nacos</module>
<module>configuration-zookeeper</module>
<module>configuration-etcd</module>
</modules>
</project>
......@@ -203,6 +203,11 @@
<artifactId>configuration-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-etcd</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
......
......@@ -149,6 +149,11 @@ configuration:
# #Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
# etcd:
# period : 60 # Unit seconds, sync period. Default fetch every 60 seconds.
# group : 'skywalking'
# serverAddr: localhost:2379
# clusterName: "default"
#exporter:
# grpc:
# targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
......
......@@ -168,6 +168,11 @@ configuration:
# #Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
# etcd:
# period : 60 # Unit seconds, sync period. Default fetch every 60 seconds.
# group : 'skywalking'
# serverAddr: localhost:2379
# clusterName: "default"
#exporter:
# grpc:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册