未验证 提交 b363b0ea 编写于 作者: S songzhendong 提交者: GitHub

Add Nacos back into SkyWalking as an option(nacos-1.3.1) (#5137)

上级 20b1b517
......@@ -308,6 +308,7 @@ The text of each license is the standard Apache 2.0 license.
kubernetes-client 8.0.0: https://github.com/kubernetes-client/java, Apache 2.0
proto files from istio/istio: https://github.com/istio/istio Apache 2.0
proto files from istio/api: https://github.com/istio/api Apache 2.0
nacos 1.3.1: https://github.com/alibaba/nacos, Apache 2.0
consul-client 1.2.6: https://github.com/rickfast/consul-client, Apache 2.0
okhttp 3.9.0: https://github.com/square/okhttp, Apache 2.0
prometheus client_java 0.6.0: https://github.com/prometheus/client_java, Apache 2.0
......
......@@ -10,7 +10,7 @@ with each other.
by using k8s native APIs to manage cluster.
- [Consul](#consul). Use Consul as backend cluster management implementor, to coordinate backend instances.
- [Etcd](#etcd). Use Etcd to coordinate backend instances.
- [Nacos](#nacos). Use Nacos to coordinate backend instances.
In the `application.yml`, there're default configurations for the aforementioned coordinators under the section `cluster`,
you can specify one of them in the `selector` property to enable it.
......@@ -91,3 +91,12 @@ cluster:
selector: ${SW_CLUSTER:etcd}
# other configurations
```
## Nacos
Set the **cluster/selector** to **nacos** in the yml to enable.
```yaml
cluster:
selector: ${SW_CLUSTER:nacos}
# other configurations
```
\ No newline at end of file
......@@ -119,4 +119,25 @@ configuration:
namespace: ${SW_CLUSTER_K8S_NAMESPACE:default}
# Labelselector is used to locate specific configmap
labelSelector: ${SW_CLUSTER_K8S_LABEL:app=collector,release=skywalking}
```
## Dynamic Configuration Nacos Implementation
[Nacos](https://github.com/alibaba/nacos) is also supported as DCC(Dynamic Configuration Center), to use it, please configure as follows:
```yaml
configuration:
selector: ${SW_CONFIGURATION:nacos}
nacos:
# Nacos Server Host
serverAddr: ${SW_CONFIG_NACOS_SERVER_ADDR:127.0.0.1}
# Nacos Server Port
port: ${SW_CONFIG_NACOS_SERVER_PORT:8848}
# Nacos Configuration Group
group: ${SW_CONFIG_NACOS_SERVER_GROUP:skywalking}
# Nacos Configuration namespace
namespace: ${SW_CONFIG_NACOS_SERVER_NAMESPACE:}
# Unit seconds, sync period. Default fetch every 60 seconds.
period: ${SW_CONFIG_NACOS_PERIOD:60}
# the name of current cluster, set the name if you want to upstream system known.
clusterName: ${SW_CONFIG_NACOS_CLUSTER_NAME:default}
```
\ No newline at end of file
......@@ -79,6 +79,7 @@
<commons-text.version>1.4</commons-text.version>
<simpleclient.version>0.6.0</simpleclient.version>
<apollo.version>1.4.0</apollo.version>
<nacos.version>1.3.1</nacos.version>
<maven-docker-plugin.version>0.30.0</maven-docker-plugin.version>
<curator.version>4.0.1</curator.version>
<curator-test.version>2.12.0</curator-test.version>
......@@ -379,13 +380,16 @@
<artifactId>simpleclient</artifactId>
<version>${simpleclient.version}</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>${apollo.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
......
......@@ -68,6 +68,11 @@
<artifactId>cluster-etcd-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-nacos-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- cluster module -->
<!-- receiver module -->
......@@ -200,6 +205,11 @@
<artifactId>configuration-k8s-configmap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-nacos</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
......
......@@ -41,6 +41,11 @@ cluster:
serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
# etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379
hostPort: ${SW_CLUSTER_ETCD_HOST_PORT:localhost:2379}
nacos:
serviceName: ${SW_SERVICE_NAME:"SkyWalking_OAP_Cluster"}
hostPort: ${SW_CLUSTER_NACOS_HOST_PORT:localhost:8848}
# Nacos Configuration namespace
namespace: ${SW_CLUSTER_NACOS_NAMESPACE:"public"}
core:
selector: ${SW_CORE:default}
......@@ -268,6 +273,19 @@ configuration:
period: ${SW_CONFIG_CONFIGMAP_PERIOD:60}
namespace: ${SW_CLUSTER_K8S_NAMESPACE:default}
labelSelector: ${SW_CLUSTER_K8S_LABEL:app=collector,release=skywalking}
nacos:
# Nacos Server Host
serverAddr: ${SW_CONFIG_NACOS_SERVER_ADDR:127.0.0.1}
# Nacos Server Port
port: ${SW_CONFIG_NACOS_SERVER_PORT:8848}
# Nacos Configuration Group
group: ${SW_CONFIG_NACOS_SERVER_GROUP:skywalking}
# Nacos Configuration namespace
namespace: ${SW_CONFIG_NACOS_SERVER_NAMESPACE:}
# Unit seconds, sync period. Default fetch every 60 seconds.
period: ${SW_CONFIG_NACOS_PERIOD:60}
# the name of current cluster, set the name if you want to upstream system known.
clusterName: ${SW_CONFIG_NACOS_CLUSTER_NAME:default}
exporter:
selector: ${SW_EXPORTER:-}
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-cluster-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-nacos-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<showLogs>true</showLogs>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
<images>
<image>
<name>nacos/nacos-server:${nacos.version}</name>
<alias>cluster-nacos-plugin-integration-test-nacos</alias>
<run>
<env>
<MODE>standalone</MODE>
</env>
<ports>
<port>nacos.port:8848</port>
</ports>
<wait>
<log>Nacos started successfully</log>
<time>120000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<nacos.address>
${docker.hostname}:${nacos.port}
</nacos.address>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class ClusterModuleNacosConfig extends ModuleConfig {
@Setter
@Getter
private String serviceName;
@Setter
@Getter
private String hostPort;
@Setter
@Getter
private String namespace = "public";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
public class ClusterModuleNacosProvider extends ModuleProvider {
private final ClusterModuleNacosConfig config;
private NamingService namingService;
public ClusterModuleNacosProvider() {
super();
this.config = new ClusterModuleNacosConfig();
}
@Override
public String name() {
return "nacos";
}
@Override
public Class<? extends ModuleDefine> module() {
return ClusterModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
try {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, config.getHostPort());
properties.put(PropertyKeyConst.NAMESPACE, config.getNamespace());
namingService = NamingFactory.createNamingService(properties);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
NacosCoordinator coordinator = new NacosCoordinator(namingService, config);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override
public void start() throws ServiceNotProvidedException {
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
private final NamingService namingService;
private final ClusterModuleNacosConfig config;
private volatile Address selfAddress;
public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) {
this.namingService = namingService;
this.config = config;
}
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> result = new ArrayList<>();
try {
List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);
if (CollectionUtils.isNotEmpty(instances)) {
instances.forEach(instance -> {
Address address = new Address(instance.getIp(), instance.getPort(), false);
if (address.equals(selfAddress)) {
address.setSelf(true);
}
result.add(new RemoteInstance(address));
});
}
} catch (NacosException e) {
throw new ServiceQueryException(e.getErrMsg());
}
return result;
}
@Override
public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
String host = remoteInstance.getAddress().getHost();
int port = remoteInstance.getAddress().getPort();
try {
namingService.registerInstance(config.getServiceName(), host, port);
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.cluster.plugin.nacos.ClusterModuleNacosProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@RunWith(PowerMockRunner.class)
@PrepareForTest(NamingFactory.class)
@PowerMockIgnore("javax.management.*")
public class ClusterModuleNacosProviderTest {
private static final String SERVICE_NAME = "test-service_name";
private ClusterModuleNacosProvider provider = new ClusterModuleNacosProvider();
@Test
public void name() {
assertEquals("nacos", provider.name());
}
@Test
public void module() {
assertEquals(ClusterModule.class, provider.module());
}
@Test
public void createConfigBeanIfAbsent() {
ModuleConfig moduleConfig = provider.createConfigBeanIfAbsent();
assertTrue(moduleConfig instanceof ClusterModuleNacosConfig);
}
@Test(expected = ModuleStartException.class)
public void prepareWithNonHost() throws Exception {
provider.prepare();
}
@Test
public void prepare() throws Exception {
PowerMockito.mockStatic(NamingFactory.class);
ClusterModuleNacosConfig nacosConfig = new ClusterModuleNacosConfig();
nacosConfig.setHostPort("10.0.0.1:1000,10.0.0.2:1001");
nacosConfig.setServiceName(SERVICE_NAME);
Whitebox.setInternalState(provider, "config", nacosConfig);
NamingService namingService = mock(NamingService.class);
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, "10.0.0.1:1000,10.0.0.2:1001");
PowerMockito.when(NamingFactory.createNamingService(properties)).thenReturn(namingService);
provider.prepare();
ArgumentCaptor<Properties> addressCaptor = ArgumentCaptor.forClass(Properties.class);
PowerMockito.verifyStatic();
NamingFactory.createNamingService(addressCaptor.capture());
Properties data = addressCaptor.getValue();
assertEquals("10.0.0.1:1000,10.0.0.2:1001", data.getProperty(PropertyKeyConst.SERVER_ADDR));
}
@Test
public void start() {
provider.start();
}
@Test
public void notifyAfterCompleted() {
provider.notifyAfterCompleted();
}
@Test
public void requiredModules() {
String[] modules = provider.requiredModules();
assertArrayEquals(new String[] {CoreModule.NAME}, modules);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.naming.NamingService;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ITClusterModuleNacosProviderFunctionalTest {
private String nacosAddress;
@Before
public void before() {
nacosAddress = System.getProperty("nacos.address");
assertFalse(StringUtil.isEmpty(nacosAddress));
}
@Test
public void registerRemote() throws Exception {
final String serviceName = "register_remote";
ModuleProvider provider = createProvider(serviceName);
Address selfAddress = new Address("127.0.0.1", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfReceiver() throws Exception {
final String serviceName = "register_remote_receiver";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
// Mixed or Aggregator
Address selfAddress = new Address("127.0.0.3", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(providerA).registerRemote(instance);
// Receiver
List<RemoteInstance> remoteInstances = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertFalse(queryAddress.isSelf());
}
@Test
public void registerRemoteOfCluster() throws Exception {
final String serviceName = "register_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
}
@Test
public void deregisterRemoteOfCluster() throws Exception {
final String serviceName = "deregister_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.6", 1000, true);
Address addressB = new Address("127.0.0.7", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// deregister A
ClusterRegister register = getClusterRegister(providerA);
NamingService namingServiceA = Whitebox.getInternalState(register, "namingService");
namingServiceA.deregisterInstance(serviceName, addressA.getHost(), addressA.getPort());
// only B
remoteInstancesOfB = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstancesOfB.size());
Address address = remoteInstancesOfB.get(0).getAddress();
assertEquals(addressB, address);
assertTrue(address.isSelf());
}
private ClusterModuleNacosProvider createProvider(String servicName) throws ModuleStartException {
ClusterModuleNacosProvider provider = new ClusterModuleNacosProvider();
ClusterModuleNacosConfig config = (ClusterModuleNacosConfig) provider.createConfigBeanIfAbsent();
config.setHostPort(nacosAddress);
config.setServiceName(servicName);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
return provider;
}
private ClusterRegister getClusterRegister(ModuleProvider provider) {
return provider.getService(ClusterRegister.class);
}
private ClusterNodesQuery getClusterNodesQuery(ModuleProvider provider) {
return provider.getService(ClusterNodesQuery.class);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals) throws InterruptedException {
int i = 20;
do {
List<RemoteInstance> instances = getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
return instances;
} else {
Thread.sleep(1000);
}
}
while (--i > 0);
return Collections.EMPTY_LIST;
}
private void validateServiceInstance(Address selfAddress, Address otherAddress, List<RemoteInstance> queryResult) {
assertEquals(2, queryResult.size());
boolean selfExist = false, otherExist = false;
for (RemoteInstance instance : queryResult) {
Address queryAddress = instance.getAddress();
if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
selfExist = true;
} else if (queryAddress.equals(otherAddress) && !queryAddress.isSelf()) {
otherExist = true;
}
}
assertTrue(selfExist);
assertTrue(otherExist);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class NacosCoordinatorTest {
private NamingService namingService = mock(NamingService.class);
private ClusterModuleNacosConfig nacosConfig = new ClusterModuleNacosConfig();
private NacosCoordinator coordinator;
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
private static final String SERVICE_NAME = "test-service";
@Before
public void setUp() throws NacosException {
nacosConfig.setServiceName(SERVICE_NAME);
coordinator = new NacosCoordinator(namingService, nacosConfig);
}
@Test
@SuppressWarnings("unchecked")
public void queryRemoteNodesWithNonOrEmpty() throws NacosException {
when(namingService.selectInstances(anyString(), anyBoolean())).thenReturn(null, Collections.emptyList());
assertEquals(0, coordinator.queryRemoteNodes().size());
}
@Test
public void queryRemoteNodes() throws NacosException {
registerSelfRemote();
List<Instance> instances = mockInstance();
when(namingService.selectInstances(anyString(), anyBoolean())).thenReturn(instances);
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertEquals(2, remoteInstances.size());
RemoteInstance selfInstance = remoteInstances.get(0);
validate(selfRemoteAddress, selfInstance);
RemoteInstance notSelfInstance = remoteInstances.get(1);
validate(remoteAddress, notSelfInstance);
}
@Test
public void queryRemoteNodesWithNullSelf() throws NacosException {
List<Instance> instances = mockInstance();
when(namingService.selectInstances(anyString(), anyBoolean())).thenReturn(instances);
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertEquals(remoteInstances.size(), instances.size());
}
@Test
public void registerRemote() throws NacosException {
registerRemote(remoteAddress);
}
@Test
public void registerSelfRemote() throws NacosException {
registerRemote(selfRemoteAddress);
}
private void validate(Address originArress, RemoteInstance instance) {
Address instanceAddress = instance.getAddress();
assertEquals(originArress.getHost(), instanceAddress.getHost());
assertEquals(originArress.getPort(), instanceAddress.getPort());
}
private void registerRemote(Address address) throws NacosException {
coordinator.registerRemote(new RemoteInstance(address));
ArgumentCaptor<String> serviceNameArgumentCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> hostArgumentCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Integer> portArgumentCaptor = ArgumentCaptor.forClass(Integer.class);
verify(namingService).registerInstance(serviceNameArgumentCaptor.capture(), hostArgumentCaptor.capture(), portArgumentCaptor
.capture());
assertEquals(SERVICE_NAME, serviceNameArgumentCaptor.getValue());
assertEquals(address.getHost(), hostArgumentCaptor.getValue());
assertEquals(Long.valueOf(address.getPort()), Long.valueOf(portArgumentCaptor.getValue()));
}
private List<Instance> mockInstance() {
Instance remoteInstance = new Instance();
Instance selfInstance = new Instance();
selfInstance.setIp(selfRemoteAddress.getHost());
selfInstance.setPort(selfRemoteAddress.getPort());
remoteInstance.setIp(remoteAddress.getHost());
remoteInstance.setPort(remoteAddress.getPort());
List<Instance> instances = new ArrayList<>();
instances.add(selfInstance);
instances.add(remoteInstance);
return instances;
}
}
......@@ -33,6 +33,7 @@
<module>cluster-kubernetes-plugin</module>
<module>cluster-consul-plugin</module>
<module>cluster-etcd-plugin</module>
<module>cluster-nacos-plugin</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-configuration</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>configuration-nacos</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<showLogs>true</showLogs>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
<images>
<image>
<name>nacos/nacos-server:${nacos.version}</name>
<alias>nacos-dynamic-configuration-integration-test-nacos</alias>
<run>
<env>
<MODE>standalone</MODE>
</env>
<ports>
<port>nacos.port:8848</port>
</ports>
<wait>
<log>Nacos started successfully</log>
<time>120000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<nacos.host>
${docker.hostname}
</nacos.host>
<nacos.port>
${nacos.port}
</nacos.port>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfigWatcherRegister.class);
private final NacosServerSettings settings;
private final ConfigService configService;
private final Map<String, Optional<String>> configItemKeyedByName;
private final Map<String, Listener> listenersByKey;
public NacosConfigWatcherRegister(NacosServerSettings settings) throws NacosException {
super(settings.getPeriod());
this.settings = settings;
this.configItemKeyedByName = new ConcurrentHashMap<>();
this.listenersByKey = new ConcurrentHashMap<>();
final int port = this.settings.getPort();
final String serverAddr = this.settings.getServerAddr();
final Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr + ":" + port);
properties.put(PropertyKeyConst.NAMESPACE, settings.getNamespace());
this.configService = NacosFactory.createConfigService(properties);
}
@Override
public Optional<ConfigTable> readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
final ConfigTable table = new ConfigTable();
for (Map.Entry<String, Optional<String>> entry : configItemKeyedByName.entrySet()) {
final String key = entry.getKey();
final Optional<String> value = entry.getValue();
if (value.isPresent()) {
table.add(new ConfigTable.ConfigItem(key, value.get()));
} else {
table.add(new ConfigTable.ConfigItem(key, null));
}
}
return Optional.of(table);
}
private void registerKeyListeners(final Set<String> keys) {
final String group = settings.getGroup();
for (final String dataId : keys) {
if (listenersByKey.containsKey(dataId)) {
continue;
}
try {
listenersByKey.putIfAbsent(dataId, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
onDataIdValueChanged(dataId, configInfo);
}
});
configService.addListener(dataId, group, listenersByKey.get(dataId));
// the key is newly added, read the config for the first time
final String config = configService.getConfig(dataId, group, 1000);
onDataIdValueChanged(dataId, config);
} catch (NacosException e) {
LOGGER.warn("Failed to register Nacos listener for dataId: {}", dataId);
}
}
}
private void removeUninterestedKeys(final Set<String> interestedKeys) {
final String group = settings.getGroup();
final Set<String> uninterestedKeys = new HashSet<>(listenersByKey.keySet());
uninterestedKeys.removeAll(interestedKeys);
uninterestedKeys.forEach(k -> {
final Listener listener = listenersByKey.remove(k);
if (listener != null) {
configService.removeListener(k, group, listener);
}
});
}
void onDataIdValueChanged(String dataId, String configInfo) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Nacos config changed: {}: {}", dataId, configInfo);
}
configItemKeyedByName.put(dataId, Optional.ofNullable(configInfo));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.configuration.api.AbstractConfigurationProvider;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Get configuration from Nacos.
*/
public class NacosConfigurationProvider extends AbstractConfigurationProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfigurationProvider.class);
private NacosServerSettings settings;
public NacosConfigurationProvider() {
settings = new NacosServerSettings();
}
@Override
public String name() {
return "nacos";
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return settings;
}
@Override
protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
LOGGER.info("settings: {}", settings);
if (Strings.isNullOrEmpty(settings.getServerAddr())) {
throw new ModuleStartException("Nacos serverAddr cannot be null or empty.");
}
if (settings.getPort() <= 0) {
throw new ModuleStartException("Nacos port must be positive integer.");
}
if (Strings.isNullOrEmpty(settings.getGroup())) {
throw new ModuleStartException("Nacos group cannot be null or empty.");
}
try {
return new NacosConfigWatcherRegister(settings);
} catch (NacosException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Getter
@Setter
@ToString
public class NacosServerSettings extends ModuleConfig {
private String clusterName = "default";
private String namespace = "";
private String serverAddr;
private int port = 8848;
private String group;
private int period = 60;
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.nacos.NacosConfigurationProvider
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.Map;
import java.util.Properties;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ITNacosConfigurationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ITNacosConfigurationTest.class);
private final Yaml yaml = new Yaml();
private NacosConfigurationTestProvider provider;
@Before
public void setUp() throws Exception {
final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
loadConfig(applicationConfiguration);
final ModuleManager moduleManager = new ModuleManager();
moduleManager.init(applicationConfiguration);
provider = (NacosConfigurationTestProvider) moduleManager.find(NacosConfigurationTestModule.NAME).provider();
assertNotNull(provider);
}
@SuppressWarnings("StatementWithEmptyBody")
@Test(timeout = 20000)
public void shouldReadUpdated() throws NacosException {
assertNull(provider.watcher.value());
final Properties properties = new Properties();
final String nacosHost = System.getProperty("nacos.host");
final String nacosPort = System.getProperty("nacos.port");
LOGGER.info("nacosHost: {}, nacosPort: {}", nacosHost, nacosPort);
properties.put("serverAddr", nacosHost + ":" + nacosPort);
final ConfigService configService = NacosFactory.createConfigService(properties);
assertTrue(configService.publishConfig("test-module.default.testKey", "skywalking", "500"));
for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) {
}
assertEquals("500", provider.watcher.value());
assertTrue(configService.removeConfig("test-module.default.testKey", "skywalking"));
for (String v = provider.watcher.value(); v != null; v = provider.watcher.value()) {
}
assertNull(provider.watcher.value());
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
final Object replaceValue = yaml.load(PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(value + "", properties));
if (replaceValue != null) {
properties.replace(key, replaceValue);
}
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class NacosConfigWatcherRegisterTest {
@Test
public void shouldReadConfigs() throws NacosException {
final String group = "skywalking";
final String testKey1 = "receiver-trace.default.slowDBAccessThreshold";
final String testVal1 = "test";
final String testKey2 = "testKey";
final String testVal2 = "testVal";
final NacosServerSettings mockSettings = mock(NacosServerSettings.class);
when(mockSettings.getGroup()).thenReturn(group);
when(mockSettings.getNamespace()).thenReturn("");
final NacosConfigWatcherRegister mockRegister = spy(new NacosConfigWatcherRegister(mockSettings));
final ConfigService mockConfigService = mock(ConfigService.class);
when(mockConfigService.getConfig(testKey1, group, 1000)).thenReturn(testVal1);
when(mockConfigService.getConfig(testKey2, group, 1000)).thenReturn(testVal2);
Whitebox.setInternalState(mockRegister, "configService", mockConfigService);
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2)).get();
assertEquals(2, configTable.getItems().size());
Map<String, String> kvs = new HashMap<>();
for (ConfigTable.ConfigItem item : configTable.getItems()) {
kvs.put(item.getName(), item.getValue());
}
assertEquals(testVal1, kvs.get(testKey1));
assertEquals(testVal2, kvs.get(testKey2));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class NacosConfigurationTestModule extends ModuleDefine {
public static final String NAME = "test-module";
public NacosConfigurationTestModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.nacos;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NacosConfigurationTestProvider extends ModuleProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return NacosConfigurationTestModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return new ModuleConfig() {
};
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
watcher = new ConfigChangeWatcher(NacosConfigurationTestModule.NAME, this, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
testValue = value.getNewValue();
}
}
@Override
public String value() {
return testValue;
}
};
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {
ConfigurationModule.NAME
};
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.api.ConfigurationModule
org.apache.skywalking.oap.server.configuration.nacos.NacosConfigurationTestModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.configuration.nacos.NacosConfigurationTestProvider
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test-module:
default:
testKey: 300
configuration:
nacos:
# Nacos Server Host
serverAddr: ${nacos.host}
# Nacos Server Port
port: ${nacos.port}
# Nacos Configuration Group
group: 'skywalking'
# Nacos Configuration namespace
namespace: ''
# Unit seconds, sync period. Default fetch every 60 seconds.
period: 1
# the name of current cluster, set the name if you want to upstream system known.
clusterName: "default"
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
CREATE DATABASE test DEFAULT CHARACTER SET = 'utf8';
USE test;
/******************************************/
/* database name = nacos_config */
/* table_name = config_info */
/******************************************/
CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(255) DEFAULT NULL,
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL,
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(255) NOT NULL,
`datum_id` varchar(255) NOT NULL,
`content` longtext NOT NULL,
`gmt_modified` datetime NOT NULL,
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL,
`content` longtext NOT NULL,
`beta_ips` varchar(1024) DEFAULT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`tenant_id` varchar(128) DEFAULT '',
`tag_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL,
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL,
`tag_name` varchar(128) NOT NULL,
`tag_type` varchar(64) DEFAULT NULL,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`tenant_id` varchar(128) DEFAULT '',
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`group_id` varchar(128) NOT NULL DEFAULT '',
`quota` int(10) unsigned NOT NULL DEFAULT '0',
`usage` int(10) unsigned NOT NULL DEFAULT '0',
`max_size` int(10) unsigned NOT NULL DEFAULT '0',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL,
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL,
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`src_user` text,
`src_ip` varchar(20) DEFAULT NULL,
`op_type` char(10) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/******************************************/
/* database name = nacos_config */
/* table name = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`tenant_id` varchar(128) NOT NULL DEFAULT '',
`quota` int(10) unsigned NOT NULL DEFAULT '0',
`usage` int(10) unsigned NOT NULL DEFAULT '0',
`max_size` int(10) unsigned NOT NULL DEFAULT '0',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`kp` varchar(128) NOT NULL,
`tenant_id` varchar(128) default '',
`tenant_name` varchar(128) default '',
`tenant_desc` varchar(256) DEFAULT NULL,
`create_source` varchar(32) DEFAULT NULL,
`gmt_create` bigint(20) NOT NULL,
`gmt_modified` bigint(20) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY,
password varchar(500) NOT NULL,
enabled boolean NOT NULL
);
CREATE TABLE roles (
username varchar(50) NOT NULL,
role varchar(50) NOT NULL
);
INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);
INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');
......@@ -36,6 +36,7 @@
<module>configuration-etcd</module>
<module>configuration-consul</module>
<module>configuration-k8s-configmap</module>
<module>configuration-nacos</module>
</modules>
</project>
......@@ -122,6 +122,9 @@ lucene-suggest-8.0.0.jar
minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
nacos-api-1.3.1.jar
nacos-client-1.3.1.jar
nacos-common-1.3.1.jar
netty-3.10.5.Final.jar
netty-buffer-4.1.42.Final.jar
netty-codec-4.1.42.Final.jar
......
......@@ -120,6 +120,9 @@ lucene-suggest-7.3.1.jar
minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
nacos-api-1.3.1.jar
nacos-client-1.3.1.jar
nacos-common-1.3.1.jar
netty-3.10.5.Final.jar
netty-buffer-4.1.42.Final.jar
netty-codec-4.1.42.Final.jar
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册