提交 ed99079d 编写于 作者: Z zhangwei 提交者: wu-sheng

Fix service cluster plugin bug (#3074)

* fix 3069
上级 54b01223
...@@ -83,6 +83,7 @@ ...@@ -83,6 +83,7 @@
<curator-test.version>2.12.0</curator-test.version> <curator-test.version>2.12.0</curator-test.version>
<etcd4j.version>2.17.0</etcd4j.version> <etcd4j.version>2.17.0</etcd4j.version>
<etcd.version>v3.2.3</etcd.version> <etcd.version>v3.2.3</etcd.version>
<zookeeper.image.version>3.5</zookeeper.image.version>
</properties> </properties>
<dependencies> <dependencies>
......
...@@ -28,6 +28,10 @@ ...@@ -28,6 +28,10 @@
<artifactId>cluster-consul-plugin</artifactId> <artifactId>cluster-consul-plugin</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties>
<consul.image.version>0.9</consul.image.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.skywalking</groupId> <groupId>org.apache.skywalking</groupId>
...@@ -50,4 +54,100 @@ ...@@ -50,4 +54,100 @@
</exclusions> </exclusions>
</dependency> </dependency>
</dependencies> </dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<logDate>default</logDate>
<verbose>true</verbose>
<showLogs>true</showLogs>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
</configuration>
<executions>
<execution>
<id>prepare-consul</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<name>consul:${consul.image.version}</name>
<alias>cluster-consul-plugin-integration-test-cluster</alias>
<run>
<cmd>agent -server -bootstrap-expect=1 -client=0.0.0.0</cmd>
<ports>
<port>consul.port:8500</port>
</ports>
<wait>
<log>Synced node info</log>
<time>30000</time>
</wait>
</run>
</image>
</images>
</configuration>
</execution>
<execution>
<id>prepare-consul-stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<consul.address>
${docker.hostname}:${consul.port}
</consul.address>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project> </project>
...@@ -54,13 +54,11 @@ public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery { ...@@ -54,13 +54,11 @@ public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery {
if (CollectionUtils.isNotEmpty(nodes)) { if (CollectionUtils.isNotEmpty(nodes)) {
nodes.forEach(node -> { nodes.forEach(node -> {
if (!Strings.isNullOrEmpty(node.getService().getAddress())) { if (!Strings.isNullOrEmpty(node.getService().getAddress())) {
if (Objects.nonNull(selfAddress)) { Address address = new Address(node.getService().getAddress(), node.getService().getPort(), false);
if (selfAddress.getHost().equals(node.getService().getAddress()) && selfAddress.getPort() == node.getService().getPort()) { if (address.equals(selfAddress)) {
remoteInstances.add(new RemoteInstance(new Address(node.getService().getAddress(), node.getService().getPort(), true))); address.setSelf(true);
} else {
remoteInstances.add(new RemoteInstance(new Address(node.getService().getAddress(), node.getService().getPort(), false)));
}
} }
remoteInstances.add(new RemoteInstance(address));
} }
}); });
} }
......
...@@ -104,7 +104,8 @@ public class ConsulCoordinatorTest { ...@@ -104,7 +104,8 @@ public class ConsulCoordinatorTest {
List<ServiceHealth> serviceHealths = mockHealth(); List<ServiceHealth> serviceHealths = mockHealth();
when(consulResponse.getResponse()).thenReturn(serviceHealths); when(consulResponse.getResponse()).thenReturn(serviceHealths);
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes(); List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertTrue(remoteInstances.isEmpty()); // filter empty address
assertEquals(2, remoteInstances.size());
} }
@Test @Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.consul;
import com.google.common.base.Strings;
import com.orbitz.consul.AgentClient;
import com.orbitz.consul.Consul;
import com.orbitz.consul.model.agent.ImmutableRegistration;
import com.orbitz.consul.model.agent.Registration;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author zhangwei
*/
public class ITClusterModuleConsulProviderFunctionalTest {
private String consulAddress;
@Before
public void before() {
consulAddress = System.getProperty("consul.address");
assertFalse(StringUtil.isEmpty(consulAddress));
}
@Test
public void registerRemote() throws Exception {
final String serviceName = "register_remote";
ModuleProvider provider = createProvider(serviceName);
Address selfAddress = new Address("127.0.0.1", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfInternal() throws Exception {
final String serviceName = "register_remote_internal";
ModuleProvider provider =
createProvider(serviceName, "127.0.1.2", 1001);
Address selfAddress = new Address("127.0.0.2", 1002, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
ClusterModuleConsulConfig config = (ClusterModuleConsulConfig) provider.createConfigBeanIfAbsent();
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(config.getInternalComHost(), queryAddress.getHost());
assertEquals(config.getInternalComPort(), queryAddress.getPort());
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfReceiver() throws Exception {
final String serviceName = "register_remote_receiver";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
// Mixed or Aggregator
Address selfAddress = new Address("127.0.0.3", 1003, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(providerA).registerRemote(instance);
// Receiver
List<RemoteInstance> remoteInstances = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertFalse(queryAddress.isSelf());
}
@Test
public void registerRemoteOfCluster() throws Exception {
final String serviceName = "register_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.4", 1004, true);
Address addressB = new Address("127.0.0.5", 1005, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
}
@Test
public void unregisterRemoteOfCluster() throws Exception {
final String serviceName = "unregister_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.6", 1006, true);
Address addressB = new Address("127.0.0.7", 1007, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// unregister A
Consul client = Whitebox.getInternalState(providerA, "client");
AgentClient agentClient = client.agentClient();
agentClient.deregister(instanceA.getAddress().toString());
// only B
remoteInstancesOfB = queryRemoteNodes(providerB, 1, 120);
assertEquals(1, remoteInstancesOfB.size());
Address address = remoteInstancesOfB.get(0).getAddress();
assertEquals(address, addressB);
assertTrue(addressB.isSelf());
}
private ClusterModuleConsulProvider createProvider(String serviceName) throws Exception {
return createProvider(serviceName, null, 0);
}
private ClusterModuleConsulProvider createProvider(String serviceName, String internalComHost, int internalComPort) throws Exception {
ClusterModuleConsulProvider provider = new ClusterModuleConsulProvider();
ClusterModuleConsulConfig config = (ClusterModuleConsulConfig) provider.createConfigBeanIfAbsent();
config.setHostPort(consulAddress);
config.setServiceName(serviceName);
if (!StringUtil.isEmpty(internalComHost)) {
config.setInternalComHost(internalComHost);
}
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
ConsulCoordinator consulCoordinator = (ConsulCoordinator) provider.getService(ClusterRegister.class);
// ignore health check
ClusterRegister register = remoteInstance -> {
if (needUsingInternalAddr(config)) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
}
Consul client = Whitebox.getInternalState(consulCoordinator, "client");
AgentClient agentClient = client.agentClient();
Whitebox.setInternalState(consulCoordinator, "selfAddress", remoteInstance.getAddress());
TelemetryRelatedContext.INSTANCE.setId(remoteInstance.getAddress().toString());
Registration registration = ImmutableRegistration.builder()
.id(remoteInstance.getAddress().toString())
.name(serviceName)
.address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort())
.build();
agentClient.register(registration);
};
provider.registerServiceImplementation(ClusterRegister.class, register);
return provider;
}
private boolean needUsingInternalAddr(ClusterModuleConsulConfig config) {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
private ClusterRegister getClusterRegister(ModuleProvider provider) {
return provider.getService(ClusterRegister.class);
}
private ClusterNodesQuery getClusterNodesQuery(ModuleProvider provider) {
return provider.getService(ClusterNodesQuery.class);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals) throws InterruptedException {
return queryRemoteNodes(provider, goals, 20);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals, int cyclic) throws InterruptedException {
do {
List<RemoteInstance> instances = getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
return instances;
} else {
Thread.sleep(1000);
}
} while (--cyclic > 0);
return Collections.EMPTY_LIST;
}
private void validateServiceInstance(Address selfAddress, Address otherAddress, List<RemoteInstance> queryResult) {
assertEquals(2, queryResult.size());
boolean selfExist = false, otherExist = false;
for (RemoteInstance instance : queryResult) {
Address queryAddress = instance.getAddress();
if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
selfExist = true;
} else if (queryAddress.equals(otherAddress) && !queryAddress.isSelf()) {
otherExist = true;
}
}
assertTrue(selfExist);
assertTrue(otherExist);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import mousio.etcd4j.EtcdClient;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author zhangwei
*/
public class ITClusterModuleEtcdProviderFunctionalTest {
private String etcdAddress;
@Before
public void before() {
String etcdHost = System.getProperty("etcd.host");
String port = System.getProperty("etcd.port");
assertTrue(!StringUtil.isEmpty(etcdHost) && !StringUtil.isEmpty(port));
etcdAddress = etcdHost + ":" + port;
}
@Test
public void registerRemote() throws Exception {
final String serviceName = "register_remote";
ModuleProvider provider = createProvider(serviceName);
Address selfAddress = new Address("127.0.0.1", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfInternal() throws Exception {
final String serviceName = "register_remote_internal";
ModuleProvider provider =
createProvider(serviceName, "127.0.1.2", 1000);
Address selfAddress = new Address("127.0.0.2", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
ClusterModuleEtcdConfig config = (ClusterModuleEtcdConfig) provider.createConfigBeanIfAbsent();
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(config.getInternalComHost(), queryAddress.getHost());
assertEquals(config.getInternalComPort(), queryAddress.getPort());
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfReceiver() throws Exception {
final String serviceName = "register_remote_receiver";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
// Mixed or Aggregator
Address selfAddress = new Address("127.0.0.3", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(providerA).registerRemote(instance);
// Receiver
List<RemoteInstance> remoteInstances = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertFalse(queryAddress.isSelf());
}
@Test
public void registerRemoteOfCluster() throws Exception {
final String serviceName = "register_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
}
@Test
public void unregisterRemoteOfCluster() throws Exception {
final String serviceName = "unregister_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// unregister A
EtcdClient client = Whitebox.getInternalState(providerA, "client");
client.close();
// only B
remoteInstancesOfB = queryRemoteNodes(providerB, 1, 120);
assertEquals(1, remoteInstancesOfB.size());
Address address = remoteInstancesOfB.get(0).getAddress();
assertEquals(address, addressB);
assertTrue(addressB.isSelf());
}
private ClusterModuleEtcdProvider createProvider(String serviceName) throws ModuleStartException {
return createProvider(serviceName, null, 0);
}
private ClusterModuleEtcdProvider createProvider(String serviceName, String internalComHost, int internalComPort) throws ModuleStartException {
ClusterModuleEtcdProvider provider = new ClusterModuleEtcdProvider();
ClusterModuleEtcdConfig config = (ClusterModuleEtcdConfig) provider.createConfigBeanIfAbsent();
config.setHostPort(etcdAddress);
config.setServiceName(serviceName);
if (!StringUtil.isEmpty(internalComHost)) {
config.setInternalComHost(internalComHost);
}
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
return provider;
}
private ClusterRegister getClusterRegister(ModuleProvider provider) {
return provider.getService(ClusterRegister.class);
}
private ClusterNodesQuery getClusterNodesQuery(ModuleProvider provider) {
return provider.getService(ClusterNodesQuery.class);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals) throws InterruptedException {
return queryRemoteNodes(provider, goals, 20);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals, int cyclic) throws InterruptedException {
do {
List<RemoteInstance> instances = getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
return instances;
} else {
Thread.sleep(1000);
}
} while (--cyclic > 0);
return Collections.EMPTY_LIST;
}
private void validateServiceInstance(Address selfAddress, Address otherAddress, List<RemoteInstance> queryResult) {
assertEquals(2, queryResult.size());
boolean selfExist = false, otherExist = false;
for (RemoteInstance instance : queryResult) {
Address queryAddress = instance.getAddress();
if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
selfExist = true;
} else if (queryAddress.equals(otherAddress) && !queryAddress.isSelf()) {
otherExist = true;
}
}
assertTrue(selfExist);
assertTrue(otherExist);
}
}
...@@ -63,4 +63,100 @@ ...@@ -63,4 +63,100 @@
</exclusions> </exclusions>
</dependency> </dependency>
</dependencies> </dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<showLogs>true</showLogs>
<logDate>default</logDate>
<verbose>true</verbose>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
<images>
<image>
<name>nacos/nacos-server:${nacos.version}</name>
<alias>cluster-nacos-plugin-integration-test-nacos</alias>
<run>
<env>
<MODE>standalone</MODE>
</env>
<ports>
<port>nacos.port:8848</port>
</ports>
<wait>
<log>Nacos started successfully</log>
<time>120000</time>
</wait>
</run>
</image>
</images>
</configuration>
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<nacos.address>
${docker.hostname}:${nacos.port}
</nacos.address>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project> </project>
...@@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; ...@@ -27,7 +27,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
/** /**
* @author caoyixiong * @author caoyixiong
...@@ -50,13 +49,11 @@ public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery { ...@@ -50,13 +49,11 @@ public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
List<Instance> instances = namingService.selectInstances(config.getServiceName(), true); List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);
if (CollectionUtils.isNotEmpty(instances)) { if (CollectionUtils.isNotEmpty(instances)) {
instances.forEach(instance -> { instances.forEach(instance -> {
if (Objects.nonNull(selfAddress)) { Address address = new Address(instance.getIp(), instance.getPort(), false);
if (selfAddress.getHost().equals(instance.getIp()) && selfAddress.getPort() == instance.getPort()) { if (address.equals(selfAddress)) {
result.add(new RemoteInstance(new Address(instance.getIp(), instance.getPort(), true))); address.setSelf(true);
} else {
result.add(new RemoteInstance(new Address(instance.getIp(), instance.getPort(), false)));
}
} }
result.add(new RemoteInstance(address));
}); });
} }
} catch (NacosException e) { } catch (NacosException e) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.naming.NamingService;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author zhangwei
*/
public class ITClusterModuleNacosProviderFunctionalTest {
private String nacosAddress;
@Before
public void before() {
nacosAddress = System.getProperty("nacos.address");
assertFalse(StringUtil.isEmpty(nacosAddress));
}
@Test
public void registerRemote() throws Exception {
final String serviceName = "register_remote";
ModuleProvider provider = createProvider(serviceName);
Address selfAddress = new Address("127.0.0.1", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfReceiver() throws Exception {
final String serviceName = "register_remote_receiver";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
// Mixed or Aggregator
Address selfAddress = new Address("127.0.0.3", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(providerA).registerRemote(instance);
// Receiver
List<RemoteInstance> remoteInstances = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertFalse(queryAddress.isSelf());
}
@Test
public void registerRemoteOfCluster() throws Exception {
final String serviceName = "register_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
}
@Test
public void deregisterRemoteOfCluster() throws Exception {
final String serviceName = "deregister_remote_cluster";
ModuleProvider providerA = createProvider(serviceName);
ModuleProvider providerB = createProvider(serviceName);
Address addressA = new Address("127.0.0.6", 1000, true);
Address addressB = new Address("127.0.0.7", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// deregister A
ClusterRegister register = getClusterRegister(providerA);
NamingService namingServiceA = Whitebox.getInternalState(register, "namingService");
namingServiceA.deregisterInstance(serviceName, addressA.getHost(), addressA.getPort());
// only B
remoteInstancesOfB = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstancesOfB.size());
Address address = remoteInstancesOfB.get(0).getAddress();
assertEquals(addressB, address);
assertTrue(address.isSelf());
}
private ClusterModuleNacosProvider createProvider(String servicName) throws ModuleStartException {
ClusterModuleNacosProvider provider = new ClusterModuleNacosProvider();
ClusterModuleNacosConfig config = (ClusterModuleNacosConfig) provider.createConfigBeanIfAbsent();
config.setHostPort(nacosAddress);
config.setServiceName(servicName);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
return provider;
}
private ClusterRegister getClusterRegister(ModuleProvider provider) {
return provider.getService(ClusterRegister.class);
}
private ClusterNodesQuery getClusterNodesQuery(ModuleProvider provider) {
return provider.getService(ClusterNodesQuery.class);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals) throws InterruptedException {
int i = 20;
do {
List<RemoteInstance> instances = getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
return instances;
} else {
Thread.sleep(1000);
}
} while (--i > 0);
return Collections.EMPTY_LIST;
}
private void validateServiceInstance(Address selfAddress, Address otherAddress, List<RemoteInstance> queryResult) {
assertEquals(2, queryResult.size());
boolean selfExist = false, otherExist = false;
for (RemoteInstance instance : queryResult) {
Address queryAddress = instance.getAddress();
if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
selfExist = true;
} else if (queryAddress.equals(otherAddress) && !queryAddress.isSelf()) {
otherExist = true;
}
}
assertTrue(selfExist);
assertTrue(otherExist);
}
}
...@@ -32,7 +32,6 @@ import java.util.Collections; ...@@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
...@@ -85,7 +84,7 @@ public class NacosCoordinatorTest { ...@@ -85,7 +84,7 @@ public class NacosCoordinatorTest {
List<Instance> instances = mockInstance(); List<Instance> instances = mockInstance();
when(namingService.selectInstances(anyString(), anyBoolean())).thenReturn(instances); when(namingService.selectInstances(anyString(), anyBoolean())).thenReturn(instances);
List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes(); List<RemoteInstance> remoteInstances = coordinator.queryRemoteNodes();
assertTrue(remoteInstances.isEmpty()); assertEquals(remoteInstances.size(), instances.size());
} }
@Test @Test
......
...@@ -51,4 +51,99 @@ ...@@ -51,4 +51,99 @@
<artifactId>curator-test</artifactId> <artifactId>curator-test</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<profiles>
<profile>
<id>CI-with-IT</id>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<sourceMode>all</sourceMode>
<logDate>default</logDate>
<verbose>true</verbose>
<showLogs>true</showLogs>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
</configuration>
<executions>
<execution>
<id>prepare-zookeeper</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<name>zookeeper:${zookeeper.image.version}</name>
<alias>cluster-zookeeper-plugin-integration-test-zookeeper</alias>
<run>
<ports>
<port>zk-port:2181</port>
</ports>
<wait>
<log>binding to port</log>
<time>30000</time>
</wait>
</run>
</image>
</images>
</configuration>
</execution>
<execution>
<id>prepare-zookeeper-start</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>gmaven-plugin</artifactId>
<version>${gmaven-plugin.version}</version>
<executions>
<execution>
<id>add-default-properties</id>
<phase>initialize</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<providerSelection>2.0</providerSelection>
<source>
project.properties.setProperty('docker.hostname', 'localhost')
log.info("Docker hostname is " + project.properties['docker.hostname'])
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<zk.address>
${docker.hostname}:${zk-port}
</zk.address>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project> </project>
\ No newline at end of file
...@@ -70,16 +70,17 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { ...@@ -70,16 +70,17 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
.watchInstances(true) .watchInstances(true)
.serializer(new SWInstanceSerializer()).build(); .serializer(new SWInstanceSerializer()).build();
ZookeeperCoordinator coordinator;
try { try {
client.start(); client.start();
client.blockUntilConnected(); client.blockUntilConnected();
serviceDiscovery.start(); serviceDiscovery.start();
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
ZookeeperCoordinator coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
this.registerServiceImplementation(ClusterRegister.class, coordinator); this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator); this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
} }
......
...@@ -32,25 +32,28 @@ import org.slf4j.*; ...@@ -32,25 +32,28 @@ import org.slf4j.*;
public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery { public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class); private static final Logger logger = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private static final String REMOTE_NAME_PATH = "remote";
private final ClusterModuleZookeeperConfig config; private final ClusterModuleZookeeperConfig config;
private final ServiceDiscovery<RemoteInstance> serviceDiscovery; private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private volatile ServiceCache<RemoteInstance> serviceCache; private final ServiceCache<RemoteInstance> serviceCache;
private volatile Address selfAddress; private volatile Address selfAddress;
ZookeeperCoordinator(ClusterModuleZookeeperConfig config, ServiceDiscovery<RemoteInstance> serviceDiscovery) { ZookeeperCoordinator(ClusterModuleZookeeperConfig config, ServiceDiscovery<RemoteInstance> serviceDiscovery) throws Exception {
this.config = config; this.config = config;
this.serviceDiscovery = serviceDiscovery; this.serviceDiscovery = serviceDiscovery;
this.serviceCache = serviceDiscovery.serviceCacheBuilder().name(REMOTE_NAME_PATH).build();
this.serviceCache.start();
} }
@Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException { @Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
try { try {
String remoteNamePath = "remote";
if (needUsingInternalAddr()) { if (needUsingInternalAddr()) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true)); remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
} }
ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder() ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
.name(remoteNamePath) .name(REMOTE_NAME_PATH)
.id(UUID.randomUUID().toString()) .id(UUID.randomUUID().toString())
.address(remoteInstance.getAddress().getHost()) .address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort()) .port(remoteInstance.getAddress().getPort())
...@@ -59,12 +62,6 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery ...@@ -59,12 +62,6 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceDiscovery.registerService(thisInstance); serviceDiscovery.registerService(thisInstance);
serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(remoteNamePath)
.build();
serviceCache.start();
this.selfAddress = remoteInstance.getAddress(); this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString()); TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
} catch (Exception e) { } catch (Exception e) {
...@@ -74,19 +71,16 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery ...@@ -74,19 +71,16 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
@Override public List<RemoteInstance> queryRemoteNodes() { @Override public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(20); List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(20);
if (Objects.nonNull(serviceCache)) { List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances(); serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
serviceInstances.forEach(serviceInstance -> { if (instance.getAddress().equals(selfAddress)) {
RemoteInstance instance = serviceInstance.getPayload(); instance.getAddress().setSelf(true);
if (instance.getAddress().equals(selfAddress)) { } else {
instance.getAddress().setSelf(true); instance.getAddress().setSelf(false);
} else { }
instance.getAddress().setSelf(false); remoteInstanceDetails.add(instance);
} });
remoteInstanceDetails.add(instance);
});
}
return remoteInstanceDetails; return remoteInstanceDetails;
} }
......
...@@ -18,60 +18,39 @@ ...@@ -18,60 +18,39 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.io.IOException; import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import java.util.List; import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.curator.test.TestingServer; import org.junit.Test;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address; import static org.junit.Assert.assertEquals;
import org.apache.skywalking.oap.server.library.module.*; import static org.junit.Assert.assertTrue;
import org.junit.*;
/** /**
* @author peng-yongsheng * @author peng-yongsheng zhangwei
*/ */
public class ClusterModuleZookeeperProviderTestCase { public class ClusterModuleZookeeperProviderTest {
private TestingServer server; private ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider();
@Before @Test
public void before() throws Exception { public void name() {
server = new TestingServer(12181, true); assertEquals("zookeeper", provider.name());
server.start();
} }
@Test @Test
public void testStart() throws ServiceNotProvidedException, ModuleStartException, ServiceRegisterException, InterruptedException { public void module() {
ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider(); assertEquals(ClusterModule.class, provider.module());
ClusterModuleZookeeperConfig moduleConfig = (ClusterModuleZookeeperConfig)provider.createConfigBeanIfAbsent(); }
moduleConfig.setHostPort(server.getConnectString());
moduleConfig.setBaseSleepTimeMs(3000);
moduleConfig.setMaxRetries(4);
provider.prepare();
provider.start();
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
RemoteInstance remoteInstance = new RemoteInstance(new Address("ProviderAHost", 1000, true));
moduleRegister.registerRemote(remoteInstance);
for (int i = 0; i < 20; i++) {
List<RemoteInstance> detailsList = clusterNodesQuery.queryRemoteNodes();
if (detailsList.size() == 0) {
Thread.sleep(500);
continue;
}
Assert.assertEquals(1, detailsList.size());
Assert.assertEquals("ProviderAHost", detailsList.get(0).getAddress().getHost());
Assert.assertEquals(1000, detailsList.get(0).getAddress().getPort());
}
@Test
public void createConfigBeanIfAbsent() {
ModuleConfig moduleConfig = provider.createConfigBeanIfAbsent();
assertTrue(moduleConfig instanceof ClusterModuleZookeeperConfig);
} }
@After @Test
public void after() throws IOException { public void requiredModules() {
server.stop(); String[] modules = provider.requiredModules();
assertEquals(0, modules.length);
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author zhangwei
*/
public class ITClusterModuleZookeeperProviderFunctionalTest {
private String zkAddress;
@Before
public void before() {
zkAddress = System.getProperty("zk.address");
assertFalse(StringUtil.isEmpty(zkAddress));
}
@Test
public void registerRemote() throws Exception {
final String namespace = "register_remote";
ModuleProvider provider = createProvider(namespace);
Address selfAddress = new Address("127.0.0.1", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfInternal() throws Exception {
final String namespace = "register_remote_internal";
ModuleProvider provider =
createProvider(namespace, "127.0.1.2", 1000);
Address selfAddress = new Address("127.0.0.2", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(provider).registerRemote(instance);
List<RemoteInstance> remoteInstances = queryRemoteNodes(provider, 1);
ClusterModuleZookeeperConfig config = (ClusterModuleZookeeperConfig) provider.createConfigBeanIfAbsent();
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(config.getInternalComHost(), queryAddress.getHost());
assertEquals(config.getInternalComPort(), queryAddress.getPort());
assertTrue(queryAddress.isSelf());
}
@Test
public void registerRemoteOfReceiver() throws Exception {
final String namespace = "register_remote_receiver";
ModuleProvider providerA = createProvider(namespace);
ModuleProvider providerB = createProvider(namespace);
// Mixed or Aggregator
Address selfAddress = new Address("127.0.0.3", 1000, true);
RemoteInstance instance = new RemoteInstance(selfAddress);
getClusterRegister(providerA).registerRemote(instance);
// Receiver
List<RemoteInstance> remoteInstances = queryRemoteNodes(providerB, 1);
assertEquals(1, remoteInstances.size());
Address queryAddress = remoteInstances.get(0).getAddress();
assertEquals(selfAddress, queryAddress);
assertFalse(queryAddress.isSelf());
}
@Test
public void registerRemoteOfCluster() throws Exception {
final String namespace = "register_remote_cluster";
ModuleProvider providerA = createProvider(namespace);
ModuleProvider providerB = createProvider(namespace);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
}
@Test
public void unregisterRemoteOfCluster() throws Exception {
final String namespace = "unregister_remote_cluster";
ModuleProvider providerA = createProvider(namespace);
ModuleProvider providerB = createProvider(namespace);
Address addressA = new Address("127.0.0.4", 1000, true);
Address addressB = new Address("127.0.0.5", 1000, true);
RemoteInstance instanceA = new RemoteInstance(addressA);
RemoteInstance instanceB = new RemoteInstance(addressB);
getClusterRegister(providerA).registerRemote(instanceA);
getClusterRegister(providerB).registerRemote(instanceB);
List<RemoteInstance> remoteInstancesOfA = queryRemoteNodes(providerA, 2);
validateServiceInstance(addressA, addressB, remoteInstancesOfA);
List<RemoteInstance> remoteInstancesOfB = queryRemoteNodes(providerB, 2);
validateServiceInstance(addressB, addressA, remoteInstancesOfB);
// unregister A
ClusterRegister register = getClusterRegister(providerA);
ServiceDiscovery<RemoteInstance> discoveryA = Whitebox.getInternalState(register, "serviceDiscovery");
discoveryA.close();
// only B
remoteInstancesOfB = queryRemoteNodes(providerB, 1, 120);
assertEquals(1, remoteInstancesOfB.size());
Address address = remoteInstancesOfB.get(0).getAddress();
assertEquals(address, addressB);
assertTrue(addressB.isSelf());
}
private ClusterModuleZookeeperProvider createProvider(String namespace) throws Exception {
return createProvider(namespace, null, 0);
}
private ClusterModuleZookeeperProvider createProvider(String namespace, String internalComHost, int internalComPort) throws Exception {
ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider();
ClusterModuleZookeeperConfig moduleConfig = (ClusterModuleZookeeperConfig) provider.createConfigBeanIfAbsent();
moduleConfig.setHostPort(zkAddress);
moduleConfig.setBaseSleepTimeMs(3000);
moduleConfig.setMaxRetries(3);
if (!StringUtil.isEmpty(namespace)) {
moduleConfig.setNameSpace(namespace);
}
if (!StringUtil.isEmpty(internalComHost)) {
moduleConfig.setInternalComHost(internalComHost);
}
if (internalComPort > 0) {
moduleConfig.setInternalComPort(internalComPort);
}
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
return provider;
}
private ClusterRegister getClusterRegister(ModuleProvider provider) {
return provider.getService(ClusterRegister.class);
}
private ClusterNodesQuery getClusterNodesQuery(ModuleProvider provider) {
return provider.getService(ClusterNodesQuery.class);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals) throws InterruptedException {
return queryRemoteNodes(provider, goals, 20);
}
private List<RemoteInstance> queryRemoteNodes(ModuleProvider provider, int goals, int cyclic) throws InterruptedException {
do {
List<RemoteInstance> instances = getClusterNodesQuery(provider).queryRemoteNodes();
if (instances.size() == goals) {
return instances;
} else {
Thread.sleep(1000);
}
} while (--cyclic > 0);
return Collections.EMPTY_LIST;
}
private void validateServiceInstance(Address selfAddress, Address otherAddress, List<RemoteInstance> queryResult) {
assertEquals(2, queryResult.size());
boolean selfExist = false, otherExist = false;
for (RemoteInstance instance : queryResult) {
Address queryAddress = instance.getAddress();
if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
selfExist = true;
} else if (queryAddress.equals(otherAddress) && !queryAddress.isSelf()) {
otherExist = true;
}
}
assertTrue(selfExist);
assertTrue(otherExist);
}
}
...@@ -52,14 +52,13 @@ public class ZookeeperCoordinatorTest { ...@@ -52,14 +52,13 @@ public class ZookeeperCoordinatorTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
config.setHostPort(address.getHost() + ":" + address.getPort());
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
when(serviceDiscovery.serviceCacheBuilder()).thenReturn(cacheBuilder);
when(cacheBuilder.name("remote")).thenReturn(cacheBuilder); when(cacheBuilder.name("remote")).thenReturn(cacheBuilder);
when(cacheBuilder.build()).thenReturn(serviceCache); when(cacheBuilder.build()).thenReturn(serviceCache);
doNothing().when(serviceCache).start(); doNothing().when(serviceCache).start();
doNothing().when(serviceDiscovery).registerService(any()); doNothing().when(serviceDiscovery).registerService(any());
when(serviceDiscovery.serviceCacheBuilder()).thenReturn(cacheBuilder);
config.setHostPort(address.getHost() + ":" + address.getPort());
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
} }
@Test @Test
......
...@@ -26,10 +26,6 @@ ...@@ -26,10 +26,6 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>configuration-zookeeper</artifactId> <artifactId>configuration-zookeeper</artifactId>
<properties>
<zookeeper.image.version>3.5</zookeeper.image.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册