提交 a70a2b23 编写于 作者: A Alan Lau 提交者: wu-sheng

Add cluster-etcd-plugin (#2725)

* Add cluster-etcd-plugin
上级 71f3e861
......@@ -318,8 +318,11 @@ The text of each license is the standard Apache 2.0 license.
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0
json-flatter 0.6.0: https://github.com/wnameless/json-flattener Apache 2.0
Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0
sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0
Ctripcorp: apollo 1.4.0: https://github.com/ctripcorp/apollo Apache 2.0
etcd4j 2.17.0: https://github.com/jurmous/etcd4j Apache 2.0
========================================================================
MIT licenses
......@@ -335,6 +338,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
jopt-simple 5.0.2: https://github.com/jopt-simple/jopt-simple , MIT
bcpkix-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
bcprov-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
minimao-json 0.9.5: https://github.com/ralfstx/minimal-json, MIT
========================================================================
BSD licenses
......
Copyright (c) 2013, 2014 EclipseSource
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
......@@ -49,7 +49,7 @@ EOT
}
generateClusterConsul() {
cat <<EOT >> ${var_application_file}
cat <<EOT >> ${var_application_file}
cluster:
consul:
serviceName: \${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
......@@ -58,6 +58,16 @@ cluster:
EOT
}
generateClusterEtcd() {
cat <<EOT >> ${var_application_file}
cluster:
etcd:
serviceName: \${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# Etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
hostPort: \${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
EOT
}
generateStorageElastisearch() {
cat <<EOT >> ${var_application_file}
storage:
......@@ -120,7 +130,7 @@ generateApplicationYaml() {
# validate
[[ -z "$SW_CLUSTER" ]] && [[ -z "$SW_STORAGE" ]] && { echo "Error: please specify \"SW_CLUSTER\" \"SW_STORAGE\""; exit 1; }
validateVariables "SW_CLUSTER" "$SW_CLUSTER" "standalone zookeeper kubernetes consul"
validateVariables "SW_CLUSTER" "$SW_CLUSTER" "standalone zookeeper kubernetes consul etcd"
validateVariables "SW_STORAGE" "$SW_STORAGE" "elasticsearch h2 mysql"
......@@ -131,6 +141,7 @@ generateApplicationYaml() {
zookeeper) generateClusterZookeeper;;
kubernetes) generateClusterK8s;;
consul) generateClusterConsul;;
etcd) generateClusterEtcd;;
esac
#generate core
......
......@@ -79,6 +79,8 @@
<apollo.version>1.4.0</apollo.version>
<maven-docker-plugin.version>0.30.0</maven-docker-plugin.version>
<nacos.version>1.0.0</nacos.version>
<etcd4j.version>2.17.0</etcd4j.version>
<etcd.version>v3.2.3</etcd.version>
</properties>
<dependencies>
......@@ -371,6 +373,11 @@
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
<version>${etcd4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-cluster-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-etcd-plugin</artifactId>
<name>cluster-etcd-plugin</name>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.27.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>4.1.27.Final</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
<exclusions>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-codec-http</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-handler</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-resolver-dns</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<name>quayio/coreos-etcd:${etcd.version}</name>
<alias>etcd-client-integration-test</alias>
<run>
<ports>
<port>+etcd.host:etcd.port:2379</port>
</ports>
<wait>
<time>20000</time>
</wait>
<entrypoint>
<!-- exec form -->
<exec>
<arg>/usr/local/bin/etcd</arg>
<arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
<arg>--listen-client-urls=http://0.0.0.0:2379</arg>
</exec>
</entrypoint>
</run>
</image>
</images>
</configuration>
</execution>
<execution>
<id>remove-it-etcd</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<etcd.host>
${etcd.host}
</etcd.host>
<etcd.port>
${etcd.port}
</etcd.port>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author Alan Lau
*/
public class ClusterModuleEtcdConfig extends ModuleConfig {
@Setter @Getter private String serviceName;
@Setter @Getter private String hostPort;
@Setter @Getter private boolean isSSL;
@Setter @Getter private String internalComHost;
@Setter @Getter private int internalComPort = -1;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import java.net.URI;
import java.util.List;
import mousio.etcd4j.EtcdClient;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
/**
* etcd Provider.
*
* @author Alan Lau
*/
public class ClusterModuleEtcdProvider extends ModuleProvider {
private final ClusterModuleEtcdConfig config;
private EtcdClient client;
public ClusterModuleEtcdProvider() {
super();
this.config = new ClusterModuleEtcdConfig();
}
@Override public String name() {
return "etcd";
}
@Override public Class<? extends ModuleDefine> module() {
return ClusterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
List<URI> uris = EtcdUtils.parse(config);
//TODO check isSSL
client = new EtcdClient(uris.toArray(new URI[] {}));
EtcdCoordinator coordinator = new EtcdCoordinator(config, client);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override public void start() throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
/**
* @author Alan Lau
*/
public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
private ClusterModuleEtcdConfig config;
private EtcdClient client;
private volatile Address selfAddress;
private final String serviceName;
private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private static final Integer KEY_TTL = 45;
public EtcdCoordinator(ClusterModuleEtcdConfig config, EtcdClient client) {
this.config = config;
this.client = client;
this.serviceName = config.getServiceName();
}
@Override public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> res = new ArrayList<>();
try {
EtcdKeysResponse response = client.get(serviceName + "/").send().get();
List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
Gson gson = new Gson();
if (nodes != null) {
nodes.forEach(node -> {
EtcdEndpoint endpoint = gson.fromJson(node.getValue(), EtcdEndpoint.class);
res.add(new RemoteInstance(new Address(endpoint.getHost(), endpoint.getPort(), true)));
});
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return res;
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
if (needUsingInternalAddr()) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
}
this.selfAddress = remoteInstance.getAddress();
EtcdEndpoint endpoint = new EtcdEndpoint.Builder().serviceName(serviceName).host(selfAddress.getHost()).port(selfAddress.getPort()).build();
try {
client.putDir(serviceName).send();
String key = buildKey(serviceName, selfAddress, remoteInstance);
String json = new Gson().toJson(endpoint);
EtcdResponsePromise<EtcdKeysResponse> promise = client.put(key, json).ttl(KEY_TTL).send();
//check register.
promise.get();
renew(client, key, json);
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
}
private void renew(EtcdClient client, String key, String json) {
service.scheduleAtFixedRate(() -> {
try {
client.refresh(key, KEY_TTL).send().get();
} catch (Exception e) {
}
}, 5 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
}
private String buildKey(String serviceName, Address address, RemoteInstance instance) {
return new StringBuilder(serviceName).append("/").append(address.getHost()).append("_").append(instance.hashCode()).toString();
}
private boolean needUsingInternalAddr() {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import java.io.Serializable;
import lombok.Getter;
import lombok.Setter;
/**
* an instance json to register to etcd.
*
* @author Alan Lau
*/
public class EtcdEndpoint implements Serializable {
@Setter @Getter private String serviceName;
@Setter @Getter private String host;
@Setter @Getter private int port;
public EtcdEndpoint(Builder builder) {
setServiceName(builder.serviceName);
setHost(builder.host);
setPort(builder.port);
}
public static class Builder {
private String serviceName;
private String host;
private int port;
public Builder serviceName(String serviceName) {
this.serviceName = serviceName;
return this;
}
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public EtcdEndpoint build() {
return new EtcdEndpoint(this);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.Address;
import org.apache.skywalking.oap.server.library.util.ConnectStringParseException;
import org.apache.skywalking.oap.server.library.util.ConnectUtils;
/**
* @author Alan Lau
*/
public class EtcdUtils {
public EtcdUtils() {
}
public static List<URI> parse(ClusterModuleEtcdConfig config) throws ModuleStartException {
List<URI> uris = new ArrayList<>();
try {
List<Address> addressList = ConnectUtils.parse(config.getHostPort());
for (Address address : addressList) {
uris.add(URI.create(new StringBuilder("http://").append(address.getHost()).append(":").append(address.getPort()).toString()));
}
} catch (ConnectStringParseException e) {
throw new ModuleStartException(e.getMessage(), e);
}
return uris;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.cluster.plugin.etcd.ClusterModuleEtcdProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import java.net.URI;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* @author Alan Lau
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(EtcdUtils.class)
@PowerMockIgnore("javax.management.*")
public class ClusterModuleEtcdProviderTest {
private ClusterModuleEtcdProvider provider = new ClusterModuleEtcdProvider();
@Test
public void name() {
assertEquals("etcd", provider.name());
}
@Test
public void module() {
assertEquals(ClusterModule.class, provider.module());
}
@Test
public void createConfigBeanIfAbsent() {
ModuleConfig moduleConfig = provider.createConfigBeanIfAbsent();
assertTrue(moduleConfig instanceof ClusterModuleEtcdConfig);
}
@Test(expected = ModuleStartException.class)
public void prepareWithNonHost() throws Exception {
provider.prepare();
}
@Test
@SuppressWarnings("unchecked")
public void prepare() throws Exception {
PowerMockito.mockStatic(EtcdUtils.class);
ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
etcdConfig.setHostPort("10.0.0.1:1000,10.0.0.2:1001");
Whitebox.setInternalState(provider, "config", etcdConfig);
provider.prepare();
List<URI> uris = mock(List.class);
PowerMockito.when(EtcdUtils.parse(etcdConfig)).thenReturn(uris);
ArgumentCaptor<ClusterModuleEtcdConfig> addressCaptor = ArgumentCaptor.forClass(ClusterModuleEtcdConfig.class);
PowerMockito.verifyStatic();
EtcdUtils.parse(addressCaptor.capture());
ClusterModuleEtcdConfig cfg = addressCaptor.getValue();
assertEquals(etcdConfig.getHostPort(), cfg.getHostPort());
}
@Test
public void prepareSingle() throws Exception {
PowerMockito.mockStatic(EtcdUtils.class);
ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
etcdConfig.setHostPort("10.0.0.1:1000");
Whitebox.setInternalState(provider, "config", etcdConfig);
provider.prepare();
List<URI> uris = mock(List.class);
PowerMockito.when(EtcdUtils.parse(etcdConfig)).thenReturn(uris);
ArgumentCaptor<ClusterModuleEtcdConfig> addressCaptor = ArgumentCaptor.forClass(ClusterModuleEtcdConfig.class);
PowerMockito.verifyStatic();
EtcdUtils.parse(addressCaptor.capture());
ClusterModuleEtcdConfig cfg = addressCaptor.getValue();
assertEquals(etcdConfig.getHostPort(), cfg.getHostPort());
}
@Test
public void start() {
provider.start();
}
@Test
public void notifyAfterCompleted() {
provider.notifyAfterCompleted();
}
@Test
public void requiredModules() {
String[] modules = provider.requiredModules();
assertArrayEquals(new String[] {CoreModule.NAME}, modules);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.List;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.requests.EtcdKeyGetRequest;
import mousio.etcd4j.requests.EtcdKeyPutRequest;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Alan Lau
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(EtcdKeysResponse.class)
@PowerMockIgnore("javax.management.*")
public class EtcdCoordinatorTest {
private ClusterModuleEtcdConfig etcdConfig = new ClusterModuleEtcdConfig();
private EtcdClient client;
private EtcdCoordinator coordinator;
private Gson gson = new Gson();
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
private Address internalAddress = new Address("10.0.0.3", 1002, false);
private static final String SERVICE_NAME = "my-service";
private EtcdResponsePromise<EtcdKeysResponse> getPromise, putPromise;
private EtcdKeysResponse response;
private EtcdKeyPutRequest putRequest = mock(EtcdKeyPutRequest.class);
private EtcdKeyGetRequest getRequest = mock(EtcdKeyGetRequest.class);
private EtcdKeyPutRequest putDirRequest = mock(EtcdKeyPutRequest.class);
private EtcdResponsePromise<EtcdKeysResponse> putDirPromise;
@Mock
private List<EtcdKeysResponse.EtcdNode> list = mock(List.class);
@Before
public void setUp() throws Exception {
etcdConfig.setServiceName(SERVICE_NAME);
client = mock(EtcdClient.class);
PowerMockito.whenNew(EtcdClient.class).withAnyArguments().thenReturn(client);
client = new EtcdClient("http://10.0.0.1:1000", "http://10.0.0.2:2000");
coordinator = new EtcdCoordinator(etcdConfig, client);
putPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
getPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
putDirPromise = (EtcdResponsePromise<EtcdKeysResponse>)mock(EtcdResponsePromise.class);
PowerMockito.when(client.putDir(anyString())).thenReturn(putDirRequest);
PowerMockito.when(putDirRequest.ttl(anyInt())).thenReturn(putDirRequest);
PowerMockito.when(putDirRequest.send()).thenReturn(putDirPromise);
PowerMockito.when(client.put(anyString(), anyString())).thenReturn(putRequest);
PowerMockito.when(putRequest.ttl(anyInt())).thenReturn(putRequest);
PowerMockito.when(putRequest.send()).thenReturn(putPromise);
PowerMockito.when(client.get(anyString())).thenReturn(getRequest);
PowerMockito.when(getRequest.send()).thenReturn(getPromise);
response = PowerMockito.mock(EtcdKeysResponse.class);
response = PowerMockito.mock(EtcdKeysResponse.class);
when(putPromise.get()).thenReturn(response);
when(getPromise.get()).thenReturn(response);
when(putDirPromise.get()).thenReturn(response);
}
@Test
@SuppressWarnings("unchecked")
public void queryRemoteNodesWithNonOrEmpty() {
EtcdKeysResponse.EtcdNode node = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
when(response.getNode()).thenReturn(node);
when(node.getValue()).thenReturn("{}");
assertEquals(0, coordinator.queryRemoteNodes().size());
assertEquals(0, coordinator.queryRemoteNodes().size());
}
@Test
public void queryRemoteNodes() {
registerSelfRemote();
EtcdKeysResponse.EtcdNode node = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
EtcdKeysResponse.EtcdNode node1 = PowerMockito.mock(EtcdKeysResponse.EtcdNode.class);
when(response.getNode()).thenReturn(node);
list = new ArrayList<>();
List list1 = Mockito.spy(list);
list1.add(node1);
when(node.getNodes()).thenReturn(list1);
when(node1.getValue()).thenReturn("{\"serviceId\":\"my-service\",\"host\":\"10.0.0.2\",\"port\":1001}");
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertEquals(1, remoteInstances.size());
RemoteInstance selfInstance = remoteInstances.get(0);
velidate(selfRemoteAddress, selfInstance);
}
@Test
public void registerRemote() {
registerRemote(remoteAddress);
}
@Test
public void registerSelfRemote() {
registerRemote(selfRemoteAddress);
}
@Test
public void registerRemoteUsingInternal() {
etcdConfig.setInternalComHost(internalAddress.getHost());
etcdConfig.setInternalComPort(internalAddress.getPort());
registerRemote(internalAddress);
}
private void velidate(Address originArress, RemoteInstance instance) {
Address instanceAddress = instance.getAddress();
assertEquals(originArress.getHost(), instanceAddress.getHost());
assertEquals(originArress.getPort(), instanceAddress.getPort());
}
private void registerRemote(Address address) {
coordinator.registerRemote(new RemoteInstance(address));
EtcdEndpoint endpoint = afterRegister().get(0);
verifyRegistration(address, endpoint);
}
private List<EtcdEndpoint> afterRegister() {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
verify(client).put(nameCaptor.capture(), argumentCaptor.capture());
EtcdEndpoint endpoint = gson.fromJson(argumentCaptor.getValue(), EtcdEndpoint.class);
List<EtcdEndpoint> list = new ArrayList<>();
list.add(endpoint);
return list;
}
private void verifyRegistration(Address remoteAddress, EtcdEndpoint endpoint) {
assertNotNull(endpoint);
assertEquals(SERVICE_NAME, endpoint.getServiceName());
assertEquals(remoteAddress.getHost(), endpoint.getHost());
assertEquals(remoteAddress.getPort(), endpoint.getPort());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import com.google.gson.Gson;
import java.net.URI;
import java.util.List;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* @author Alan Lau
*/
public class ITClusterEtcdPluginTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ITClusterEtcdPluginTest.class);
private ClusterModuleEtcdConfig etcdConfig;
private EtcdClient client;
private EtcdCoordinator coordinator;
private Gson gson = new Gson();
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
private Address internalAddress = new Address("10.0.0.3", 1002, false);
private static final String SERVICE_NAME = "my-service";
@Before
public void before() throws Exception {
String etcdHost = System.getProperty("etcd.host");
String port = System.getProperty("etcd.port");
String baseUrl = "http://" + etcdHost + ":" + port;
LOGGER.info("etcd baseURL: {}", baseUrl);
etcdConfig = new ClusterModuleEtcdConfig();
etcdConfig.setServiceName(SERVICE_NAME);
client = new EtcdClient(URI.create(baseUrl));
coordinator = new EtcdCoordinator(etcdConfig, client);
}
@After
public void after() throws Exception {
client.close();
}
@Test
public void registerRemote() throws Throwable {
registerRemote(remoteAddress);
clear();
}
@Test
public void registerSelfRemote() throws Throwable {
registerRemote(selfRemoteAddress);
clear();
}
@Test
public void registerRemoteUsingInternal() throws Throwable {
etcdConfig.setInternalComHost(internalAddress.getHost());
etcdConfig.setInternalComPort(internalAddress.getPort());
etcdConfig.setServiceName(SERVICE_NAME);
registerRemote(internalAddress);
clear();
}
@Test
public void queryRemoteNodes() throws Throwable {
registerRemote(selfRemoteAddress);
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertEquals(1, remoteInstances.size());
RemoteInstance selfInstance = remoteInstances.get(0);
velidate(selfRemoteAddress, selfInstance);
clear();
}
private void velidate(Address originArress, RemoteInstance instance) {
Address instanceAddress = instance.getAddress();
assertEquals(originArress.getHost(), instanceAddress.getHost());
assertEquals(originArress.getPort(), instanceAddress.getPort());
}
private void registerRemote(Address address) throws Throwable {
coordinator.registerRemote(new RemoteInstance(address));
EtcdEndpoint endpoint = afterRegister();
verifyRegistration(address, endpoint);
}
private EtcdEndpoint afterRegister() throws Throwable {
List<RemoteInstance> list = coordinator.queryRemoteNodes();
assertEquals(list.size(), 1L);
return buildEndpoint(list.get(0));
}
private void clear() throws Throwable {
EtcdKeysResponse response = client.get(SERVICE_NAME + "/").send().get();
List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
for (EtcdKeysResponse.EtcdNode node : nodes) {
client.delete(node.getKey()).send().get();
}
}
private void verifyRegistration(Address remoteAddress, EtcdEndpoint endpoint) {
assertNotNull(endpoint);
assertEquals(SERVICE_NAME, endpoint.getServiceName());
assertEquals(remoteAddress.getHost(), endpoint.getHost());
assertEquals(remoteAddress.getPort(), endpoint.getPort());
}
private EtcdEndpoint buildEndpoint(RemoteInstance instance) {
Address address = instance.getAddress();
EtcdEndpoint endpoint = new EtcdEndpoint.Builder().host(address.getHost()).port(address.getPort()).serviceName(SERVICE_NAME).build();
return endpoint;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -33,6 +33,7 @@
<module>cluster-kubernetes-plugin</module>
<module>cluster-consul-plugin</module>
<module>cluster-nacos-plugin</module>
<module>cluster-etcd-plugin</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-configuration</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>configuration-etcd</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-etcd-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<name>quayio/coreos-etcd:${etcd.version}</name>
<alias>etcd-client-integration-test</alias>
<run>
<ports>
<port>2379:2379</port>
</ports>
<wait>
<time>30000</time>
</wait>
<entrypoint>
<!-- exec form -->
<exec>
<arg>/usr/local/bin/etcd</arg>
<arg>--advertise-client-urls=http://0.0.0.0:2379</arg>
<arg>--listen-client-urls=http://0.0.0.0:2379</arg>
</exec>
</entrypoint>
</run>
</image>
</images>
</configuration>
</execution>
<execution>
<id>remove-it-etcd</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.cluster.plugin.etcd.ClusterModuleEtcdProvider
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -34,6 +34,7 @@
<module>grpc-configuration-sync</module>
<module>configuration-apollo</module>
<module>configuration-nacos</module>
<module>configuration-etcd</module>
</modules>
</project>
......@@ -75,6 +75,11 @@
<artifactId>cluster-nacos-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-etcd-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- cluster module -->
<!-- receiver module -->
......
......@@ -36,6 +36,10 @@ cluster:
# nacos:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
# etcd:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
# hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
core:
default:
# Mixed: Receive agent data, Level 1 aggregate, Level 2 aggregate
......
......@@ -36,6 +36,10 @@ cluster:
# nacos:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
# etcd:
# serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
# hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
core:
default:
# Mixed: Receive agent data, Level 1 aggregate, Level 2 aggregate
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册