未验证 提交 56fe8a4b 编写于 作者: X xbkaishui 提交者: GitHub

Cluster manager health v2 (#5784)

上级 a9d756cd
......@@ -22,6 +22,7 @@ Release Notes.
* Support keeping collecting the slowly segments in the sampling mechanism.
* Support choose files to active the meter analyzer.
* Improve Kubernetes service registry for ALS analysis.
* Add health checker for cluster management
* Improve the queryable tags generation. Remove the duplicated tags to reduce the storage payload.
* Fix the excessive timeout period set by the kubernetes-client.
* Fix deadlock problem when using elasticsearch-client-7.0.0.
......
......@@ -90,8 +90,7 @@ public class ClusterModuleConsulProvider extends ModuleProvider {
} catch (ConnectStringParseException | ConsulException e) {
throw new ModuleStartException(e.getMessage(), e);
}
ConsulCoordinator coordinator = new ConsulCoordinator(config, client);
ConsulCoordinator coordinator = new ConsulCoordinator(getManager(), config, client);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
......
......@@ -27,21 +27,33 @@ import com.orbitz.consul.model.agent.Registration;
import com.orbitz.consul.model.health.ServiceHealth;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery {
private final ModuleDefineHolder manager;
private final Consul client;
private final String serviceName;
private final ClusterModuleConsulConfig config;
private volatile Address selfAddress;
private HealthCheckMetrics healthChecker;
public ConsulCoordinator(ClusterModuleConsulConfig config, Consul client) {
public ConsulCoordinator(final ModuleDefineHolder manager, final ClusterModuleConsulConfig config, final Consul client) {
this.manager = manager;
this.config = config;
this.client = client;
this.serviceName = config.getServiceName();
......@@ -49,22 +61,32 @@ public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery {
@Override
public List<RemoteInstance> queryRemoteNodes() {
HealthClient healthClient = client.healthClient();
// Discover only "passing" nodes
List<ServiceHealth> nodes = healthClient.getHealthyServiceInstances(serviceName).getResponse();
List<RemoteInstance> remoteInstances = new ArrayList<>();
if (CollectionUtils.isNotEmpty(nodes)) {
nodes.forEach(node -> {
if (!Strings.isNullOrEmpty(node.getService().getAddress())) {
Address address = new Address(node.getService().getAddress(), node.getService().getPort(), false);
if (address.equals(selfAddress)) {
address.setSelf(true);
try {
initHealthChecker();
HealthClient healthClient = client.healthClient();
// Discover only "passing" nodes
List<ServiceHealth> nodes = healthClient.getHealthyServiceInstances(serviceName).getResponse();
if (CollectionUtils.isNotEmpty(nodes)) {
nodes.forEach(node -> {
if (!Strings.isNullOrEmpty(node.getService().getAddress())) {
Address address = new Address(node.getService().getAddress(), node.getService().getPort(), false);
if (address.equals(selfAddress)) {
address.setSelf(true);
}
remoteInstances.add(new RemoteInstance(address));
}
remoteInstances.add(new RemoteInstance(address));
}
});
});
}
ClusterHealthStatus healthStatus = OAPNodeChecker.isHealth(remoteInstances);
if (healthStatus.isHealth()) {
this.healthChecker.health();
} else {
this.healthChecker.unHealth(healthStatus.getReason());
}
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceQueryException(e.getMessage());
}
return remoteInstances;
}
......@@ -74,23 +96,36 @@ public class ConsulCoordinator implements ClusterRegister, ClusterNodesQuery {
if (needUsingInternalAddr()) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
}
try {
initHealthChecker();
AgentClient agentClient = client.agentClient();
AgentClient agentClient = client.agentClient();
this.selfAddress = remoteInstance.getAddress();
this.selfAddress = remoteInstance.getAddress();
Registration registration = ImmutableRegistration.builder()
.id(remoteInstance.getAddress().toString())
.name(serviceName)
.address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort())
.check(Registration.RegCheck.grpc(remoteInstance.getAddress()
.getHost() + ":" + remoteInstance
.getAddress()
.getPort(), 5)) // registers with a TTL of 5 seconds
.build();
Registration registration = ImmutableRegistration.builder()
.id(remoteInstance.getAddress().toString())
.name(serviceName)
.address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort())
.check(Registration.RegCheck.grpc(remoteInstance.getAddress()
.getHost() + ":" + remoteInstance
.getAddress()
.getPort(), 5)) // registers with a TTL of 5 seconds
.build();
agentClient.register(registration);
healthChecker.health();
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
}
agentClient.register(registration);
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_consul", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
private boolean needUsingInternalAddr() {
......
......@@ -26,10 +26,18 @@ import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
......@@ -53,6 +61,21 @@ public class ClusterModuleConsulProviderTest {
private ClusterModuleConsulProvider provider = new ClusterModuleConsulProvider();
@Mock
private ModuleManager moduleManager;
@Mock
private NoneTelemetryProvider telemetryProvider;
@Before
public void before() {
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
provider.setManager(moduleManager);
}
@Test
public void name() {
assertEquals("consul", provider.name());
......
......@@ -30,14 +30,18 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -49,7 +53,7 @@ public class ConsulCoordinatorTest {
private ClusterModuleConsulConfig consulConfig = new ClusterModuleConsulConfig();
private ConsulCoordinator coordinator;
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
private ConsulResponse<List<ServiceHealth>> consulResponse;
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
......@@ -63,11 +67,10 @@ public class ConsulCoordinatorTest {
@Before
public void setUp() {
consulConfig.setServiceName(SERVICE_NAME);
coordinator = new ConsulCoordinator(consulConfig, consul);
ModuleDefineHolder manager = mock(ModuleDefineHolder.class);
coordinator = new ConsulCoordinator(manager, consulConfig, consul);
Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
consulResponse = mock(ConsulResponse.class);
HealthClient healthClient = mock(HealthClient.class);
......@@ -75,6 +78,8 @@ public class ConsulCoordinatorTest {
when(consul.healthClient()).thenReturn(healthClient);
when(consul.agentClient()).thenReturn(agentClient);
doNothing().when(healthChecker).health();
}
@Test
......
......@@ -30,21 +30,43 @@ import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.net.ssl.*", "javax.security.*"})
public class ITClusterModuleConsulProviderFunctionalTest {
private String consulAddress;
@Mock
private ModuleManager moduleManager;
@Mock
private NoneTelemetryProvider telemetryProvider;
@Before
public void before() {
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
consulAddress = System.getProperty("consul.address");
assertFalse(StringUtil.isEmpty(consulAddress));
}
......@@ -179,7 +201,7 @@ public class ITClusterModuleConsulProviderFunctionalTest {
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
provider.setManager(moduleManager);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
......
......@@ -62,19 +62,16 @@ public class ClusterModuleEtcdProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
List<URI> uris = EtcdUtils.parse(config);
//TODO check isSSL
client = new EtcdClient(uris.toArray(new URI[] {}));
EtcdCoordinator coordinator = new EtcdCoordinator(config, client);
EtcdCoordinator coordinator = new EtcdCoordinator(getManager(), config, client);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override
public void start() throws ServiceNotProvidedException {
}
@Override
......
......@@ -25,33 +25,39 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger LOGGER = LoggerFactory.getLogger(EtcdCoordinator.class);
private static final Integer KEY_TTL = 45;
private ClusterModuleEtcdConfig config;
private EtcdClient client;
private volatile Address selfAddress;
private final ModuleDefineHolder manager;
private final ClusterModuleEtcdConfig config;
private final EtcdClient client;
private final String serviceName;
private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private volatile Address selfAddress;
private HealthCheckMetrics healthChecker;
private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private static final Integer KEY_TTL = 45;
public EtcdCoordinator(ClusterModuleEtcdConfig config, EtcdClient client) {
public EtcdCoordinator(final ModuleDefineHolder manager, final ClusterModuleEtcdConfig config, final EtcdClient client) {
this.manager = manager;
this.config = config;
this.client = client;
this.serviceName = config.getServiceName();
......@@ -59,9 +65,9 @@ public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> res = new ArrayList<>();
List<RemoteInstance> remoteInstances = new ArrayList<>();
try {
initHealthChecker();
EtcdKeysResponse response = client.get(serviceName + "/").send().get();
List<EtcdKeysResponse.EtcdNode> nodes = response.getNode().getNodes();
......@@ -73,15 +79,20 @@ public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
if (!address.equals(selfAddress)) {
address.setSelf(false);
}
res.add(new RemoteInstance(address));
remoteInstances.add(new RemoteInstance(address));
});
}
} catch (Exception e) {
ClusterHealthStatus healthStatus = OAPNodeChecker.isHealth(remoteInstances);
if (healthStatus.isHealth()) {
this.healthChecker.health();
} else {
this.healthChecker.unHealth(healthStatus.getReason());
}
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new RuntimeException(e);
}
return res;
return remoteInstances;
}
@Override
......@@ -98,6 +109,7 @@ public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
.port(selfAddress.getPort())
.build();
try {
initHealthChecker();
client.putDir(serviceName).send();
String key = buildKey(serviceName, selfAddress, remoteInstance);
String json = new Gson().toJson(endpoint);
......@@ -105,7 +117,9 @@ public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
//check register.
promise.get();
renew(client, key, json);
} catch (Exception e) {
healthChecker.health();
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
......@@ -136,4 +150,11 @@ public class EtcdCoordinator implements ClusterRegister, ClusterNodesQuery {
private boolean needUsingInternalAddr() {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_etcd", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
}
......@@ -18,21 +18,25 @@
package org.apache.skywalking.oap.server.cluster.plugin.etcd;
import com.google.gson.Gson;
import java.net.URI;
import java.util.List;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
public class ITClusterEtcdPluginTest {
......@@ -42,9 +46,9 @@ public class ITClusterEtcdPluginTest {
private EtcdClient client;
private EtcdCoordinator coordinator;
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
private Gson gson = new Gson();
private EtcdCoordinator coordinator;
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
......@@ -62,7 +66,10 @@ public class ITClusterEtcdPluginTest {
etcdConfig = new ClusterModuleEtcdConfig();
etcdConfig.setServiceName(SERVICE_NAME);
client = new EtcdClient(URI.create(baseUrl));
coordinator = new EtcdCoordinator(etcdConfig, client);
doNothing().when(healthChecker).health();
ModuleDefineHolder manager = mock(ModuleDefineHolder.class);
coordinator = new EtcdCoordinator(manager, etcdConfig, client);
Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
}
@After
......
......@@ -26,22 +26,36 @@ import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ITClusterModuleEtcdProviderFunctionalTest {
private String etcdAddress;
private ModuleManager moduleManager = mock(ModuleManager.class);
private NoneTelemetryProvider telemetryProvider = mock(NoneTelemetryProvider.class);
@Before
public void before() {
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
String etcdHost = System.getProperty("etcd.host");
String port = System.getProperty("etcd.port");
assertTrue(!StringUtil.isEmpty(etcdHost) && !StringUtil.isEmpty(port));
......@@ -177,7 +191,7 @@ public class ITClusterModuleEtcdProviderFunctionalTest {
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
provider.setManager(moduleManager);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
......
......@@ -57,7 +57,6 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
coordinator = new KubernetesCoordinator(getManager(), config);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
......
......@@ -21,18 +21,25 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodStatus;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
......@@ -42,10 +49,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery {
private final ModuleDefineHolder manager;
private volatile int port = -1;
private final String uid;
private volatile int port = -1;
private HealthCheckMetrics healthChecker;
public KubernetesCoordinator(final ModuleDefineHolder manager,
final ClusterModuleKubernetesConfig config) {
......@@ -55,41 +61,56 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<V1Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
if (log.isDebugEnabled()) {
List<String> uidList = pods
.stream()
.map(item -> item.getMetadata().getUid())
.collect(Collectors.toList());
log.debug("[kubernetes cluster pods uid list]:{}", uidList.toString());
}
if (port == -1) {
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
try {
initHealthChecker();
List<V1Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
if (log.isDebugEnabled()) {
List<String> uidList = pods
.stream()
.map(item -> item.getMetadata().getUid())
.collect(Collectors.toList());
log.debug("[kubernetes cluster pods uid list]:{}", uidList.toString());
}
if (port == -1) {
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
}
List<RemoteInstance> remoteInstances = pods.stream()
.map(pod -> new RemoteInstance(
new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
.collect(Collectors.toList());
healthChecker.health();
return remoteInstances;
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceQueryException(e.getMessage());
}
return pods.stream()
.map(pod -> new RemoteInstance(
new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
.collect(Collectors.toList());
}
@Override
public void registerRemote(final RemoteInstance remoteInstance) throws ServiceRegisterException {
this.port = remoteInstance.getAddress().getPort();
try {
initHealthChecker();
this.port = remoteInstance.getAddress().getPort();
healthChecker.health();
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
}
private List<V1Pod> selfPod() {
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
private List<V1Pod> selfPod() {
V1Pod v1Pod = new V1Pod();
v1Pod.setMetadata(new V1ObjectMeta());
v1Pod.setStatus(new V1PodStatus());
v1Pod.getMetadata().setUid(uid);
v1Pod.getStatus().setPodIP("127.0.0.1");
return Collections.singletonList(v1Pod);
}
}
......@@ -21,10 +21,19 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
......@@ -36,6 +45,21 @@ public class ClusterModuleKubernetesProviderTest {
private ClusterModuleKubernetesProvider provider = new ClusterModuleKubernetesProvider();
@Mock
private ModuleManager moduleManager;
@Mock
private NoneTelemetryProvider telemetryProvider;
@Before
public void before() {
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
provider.setManager(moduleManager);
}
@Test
public void name() {
assertEquals("kubernetes", provider.name());
......@@ -53,7 +77,7 @@ public class ClusterModuleKubernetesProviderTest {
}
@Test
public void prepare() throws Exception {
public void prepare() {
provider.prepare();
}
......
......@@ -25,11 +25,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
......@@ -43,6 +48,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
......@@ -51,6 +58,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class KubernetesCoordinatorTest {
private KubernetesCoordinator coordinator;
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
public static final String LOCAL_HOST = "127.0.0.1";
public static final Integer GRPC_PORT = 8454;
......@@ -62,11 +70,12 @@ public class KubernetesCoordinatorTest {
@Before
public void prepare() throws IllegalAccessException {
coordinator = new KubernetesCoordinator(getManager(), new ClusterModuleKubernetesConfig());
Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
MemberModifier.field(KubernetesCoordinator.class, "uid").set(coordinator, String.valueOf(SELF_UID));
selfAddress = new Address(LOCAL_HOST, GRPC_PORT, true);
informer = PowerMockito.mock(NamespacedPodListInformer.class);
Whitebox.setInternalState(NamespacedPodListInformer.class, "INFORMER", informer);
doNothing().when(healthChecker).health();
}
@Test
......@@ -102,7 +111,9 @@ public class KubernetesCoordinatorTest {
CoreModuleConfig config = PowerMockito.mock(CoreModuleConfig.class);
when(config.getGRPCHost()).thenReturn(LOCAL_HOST);
when(config.getGRPCPort()).thenReturn(GRPC_PORT);
moduleManagerTesting.put(TelemetryModule.NAME, coreModuleDefine);
coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
coreModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, new MetricsCreatorNoop());
return moduleManagerTesting;
}
......
......@@ -79,14 +79,13 @@ public class ClusterModuleNacosProvider extends ModuleProvider {
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
NacosCoordinator coordinator = new NacosCoordinator(namingService, config);
NacosCoordinator coordinator = new NacosCoordinator(getManager(), namingService, config);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override
public void start() throws ServiceNotProvidedException {
}
@Override
......
......@@ -18,36 +18,46 @@
package org.apache.skywalking.oap.server.cluster.plugin.nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
private final ModuleDefineHolder manager;
private final NamingService namingService;
private final ClusterModuleNacosConfig config;
private volatile Address selfAddress;
private HealthCheckMetrics healthChecker;
public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) {
public NacosCoordinator(final ModuleDefineHolder manager, final NamingService namingService, final ClusterModuleNacosConfig config) {
this.manager = manager;
this.namingService = namingService;
this.config = config;
}
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> result = new ArrayList<>();
List<RemoteInstance> remoteInstances = new ArrayList<>();
try {
initHealthChecker();
List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);
if (CollectionUtils.isNotEmpty(instances)) {
instances.forEach(instance -> {
......@@ -55,13 +65,20 @@ public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
if (address.equals(selfAddress)) {
address.setSelf(true);
}
result.add(new RemoteInstance(address));
remoteInstances.add(new RemoteInstance(address));
});
}
} catch (NacosException e) {
throw new ServiceQueryException(e.getErrMsg());
ClusterHealthStatus healthStatus = OAPNodeChecker.isHealth(remoteInstances);
if (healthStatus.isHealth()) {
this.healthChecker.health();
} else {
this.healthChecker.unHealth(healthStatus.getReason());
}
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceQueryException(e.getMessage());
}
return result;
return remoteInstances;
}
@Override
......@@ -72,8 +89,11 @@ public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
String host = remoteInstance.getAddress().getHost();
int port = remoteInstance.getAddress().getPort();
try {
initHealthChecker();
namingService.registerInstance(config.getServiceName(), host, port);
} catch (Exception e) {
healthChecker.health();
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
this.selfAddress = remoteInstance.getAddress();
......@@ -82,4 +102,11 @@ public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
private boolean needUsingInternalAddr() {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_nacos", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
}
......@@ -27,26 +27,48 @@ import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.security.*", "javax.net.ssl.*"})
public class ITClusterModuleNacosProviderFunctionalTest {
private String nacosAddress;
private String username;
private String password;
@Mock
private ModuleManager moduleManager;
@Mock
private NoneTelemetryProvider telemetryProvider;
@Before
public void before() {
username = "nacos";
password = "nacos";
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
nacosAddress = System.getProperty("nacos.address");
assertFalse(StringUtil.isEmpty(nacosAddress));
}
......@@ -168,6 +190,7 @@ public class ITClusterModuleNacosProviderFunctionalTest {
config.setHostPort(nacosAddress);
config.setServiceName(servicName);
provider.setManager(moduleManager);
config.setUsername(username);
config.setPassword(password);
......@@ -195,7 +218,7 @@ public class ITClusterModuleNacosProviderFunctionalTest {
if (internalComPort > 0) {
config.setInternalComPort(internalComPort);
}
provider.setManager(moduleManager);
provider.prepare();
provider.start();
provider.notifyAfterCompleted();
......
......@@ -26,13 +26,17 @@ import java.util.Collections;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -41,7 +45,7 @@ public class NacosCoordinatorTest {
private NamingService namingService = mock(NamingService.class);
private ClusterModuleNacosConfig nacosConfig = new ClusterModuleNacosConfig();
private NacosCoordinator coordinator;
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
private Address remoteAddress = new Address("10.0.0.1", 1000, false);
private Address selfRemoteAddress = new Address("10.0.0.2", 1001, true);
......@@ -51,8 +55,11 @@ public class NacosCoordinatorTest {
@Before
public void setUp() throws NacosException {
doNothing().when(healthChecker).health();
nacosConfig.setServiceName(SERVICE_NAME);
coordinator = new NacosCoordinator(namingService, nacosConfig);
ModuleDefineHolder manager = mock(ModuleDefineHolder.class);
coordinator = new NacosCoordinator(manager, namingService, nacosConfig);
Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
}
@Test
......
......@@ -29,6 +29,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
......@@ -56,6 +57,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
private final ClusterModuleZookeeperConfig config;
private CuratorFramework client;
private ServiceDiscovery<RemoteInstance> serviceDiscovery;
private ZookeeperCoordinator coordinator;
public ClusterModuleZookeeperProvider() {
super();
......@@ -124,13 +126,11 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
.watchInstances(true)
.serializer(new SWInstanceSerializer())
.build();
ZookeeperCoordinator coordinator;
try {
client.start();
client.blockUntilConnected();
serviceDiscovery.start();
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
coordinator = new ZookeeperCoordinator(getManager(), config, serviceDiscovery);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
......@@ -150,6 +150,6 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[0];
return new String[]{CoreModule.NAME};
}
}
......@@ -22,29 +22,38 @@ import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private static final String REMOTE_NAME_PATH = "remote";
private final ModuleDefineHolder manager;
private final ClusterModuleZookeeperConfig config;
private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private final ServiceCache<RemoteInstance> serviceCache;
private volatile Address selfAddress;
private HealthCheckMetrics healthChecker;
ZookeeperCoordinator(ClusterModuleZookeeperConfig config,
ServiceDiscovery<RemoteInstance> serviceDiscovery) throws Exception {
ZookeeperCoordinator(final ModuleDefineHolder manager, final ClusterModuleZookeeperConfig config,
final ServiceDiscovery<RemoteInstance> serviceDiscovery) throws Exception {
this.manager = manager;
this.config = config;
this.serviceDiscovery = serviceDiscovery;
this.serviceCache = serviceDiscovery.serviceCacheBuilder().name(REMOTE_NAME_PATH).build();
......@@ -54,6 +63,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
@Override
public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
try {
initHealthChecker();
if (needUsingInternalAddr()) {
remoteInstance = new RemoteInstance(new Address(config.getInternalComHost(), config.getInternalComPort(), true));
}
......@@ -73,28 +83,49 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceDiscovery.registerService(thisInstance);
this.selfAddress = remoteInstance.getAddress();
} catch (Exception e) {
this.healthChecker.health();
} catch (Throwable e) {
this.healthChecker.unHealth(e);
throw new ServiceRegisterException(e.getMessage());
}
}
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(20);
List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
if (instance.getAddress().equals(selfAddress)) {
instance.getAddress().setSelf(true);
List<RemoteInstance> remoteInstances = new ArrayList<>(20);
try {
initHealthChecker();
List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
if (instance.getAddress().equals(selfAddress)) {
instance.getAddress().setSelf(true);
} else {
instance.getAddress().setSelf(false);
}
remoteInstances.add(instance);
});
ClusterHealthStatus healthStatus = OAPNodeChecker.isHealth(remoteInstances);
if (healthStatus.isHealth()) {
this.healthChecker.health();
} else {
instance.getAddress().setSelf(false);
this.healthChecker.unHealth(healthStatus.getReason());
}
remoteInstanceDetails.add(instance);
});
return remoteInstanceDetails;
} catch (Throwable e) {
this.healthChecker.unHealth(e);
throw new ServiceQueryException(e.getMessage());
}
return remoteInstances;
}
private boolean needUsingInternalAddr() {
return !Strings.isNullOrEmpty(config.getInternalComHost()) && config.getInternalComPort() > 0;
}
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_zookeeper", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
}
......@@ -48,6 +48,6 @@ public class ClusterModuleZookeeperProviderTest {
@Test
public void requiredModules() {
String[] modules = provider.requiredModules();
assertEquals(0, modules.length);
assertEquals(1, modules.length);
}
}
......@@ -20,27 +20,50 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.Collections;
import java.util.List;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.security.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*", "org.w3c.*"})
public class ITClusterModuleZookeeperProviderFunctionalTest {
private String zkAddress;
@Mock
private ModuleManager moduleManager;
@Mock
private NoneTelemetryProvider telemetryProvider;
@Before
public void before() {
public void init() {
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
.thenReturn(new MetricsCreatorNoop());
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
zkAddress = System.getProperty("zk.address");
assertFalse(StringUtil.isEmpty(zkAddress));
}
......@@ -162,7 +185,7 @@ public class ITClusterModuleZookeeperProviderFunctionalTest {
private ClusterModuleZookeeperProvider createProvider(String namespace, String internalComHost,
int internalComPort) throws Exception {
ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider();
provider.setManager(moduleManager);
ClusterModuleZookeeperConfig moduleConfig = (ClusterModuleZookeeperConfig) provider.createConfigBeanIfAbsent();
moduleConfig.setHostPort(zkAddress);
moduleConfig.setBaseSleepTimeMs(3000);
......
......@@ -25,9 +25,12 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -45,6 +48,8 @@ public class ZookeeperCoordinatorTest {
private ServiceCacheBuilder cacheBuilder = mock(ServiceCacheBuilder.class);
private HealthCheckMetrics healthChecker = mock(HealthCheckMetrics.class);
private ServiceCache serviceCache = mock(ServiceCache.class);
private ZookeeperCoordinator coordinator;
......@@ -61,7 +66,10 @@ public class ZookeeperCoordinatorTest {
doNothing().when(serviceDiscovery).registerService(any());
when(serviceDiscovery.serviceCacheBuilder()).thenReturn(cacheBuilder);
config.setHostPort(address.getHost() + ":" + address.getPort());
coordinator = new ZookeeperCoordinator(config, serviceDiscovery);
doNothing().when(healthChecker).health();
ModuleDefineHolder manager = mock(ModuleDefineHolder.class);
coordinator = new ZookeeperCoordinator(manager, config, serviceDiscovery);
Whitebox.setInternalState(coordinator, "healthChecker", healthChecker);
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cluster;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class ClusterHealthStatus {
private boolean health;
private String reason;
public static final ClusterHealthStatus HEALTH = new ClusterHealthStatus(true, null);
public static ClusterHealthStatus unHealth(String reason) {
return new ClusterHealthStatus(false, reason);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cluster;
import com.google.common.collect.Sets;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class OAPNodeChecker {
private static final Set<String> ILLEGAL_NODE_ADDRESS_IN_CLUSTER_MODE = Sets.newHashSet("127.0.0.1", "localhost");
public static boolean hasIllegalNodeAddress(List<RemoteInstance> remoteInstances) {
if (CollectionUtils.isEmpty(remoteInstances)) {
return false;
}
Set<String> remoteAddressSet = remoteInstances.stream().map(remoteInstance ->
remoteInstance.getAddress().getHost()).collect(Collectors.toSet());
return !Sets.intersection(ILLEGAL_NODE_ADDRESS_IN_CLUSTER_MODE, remoteAddressSet).isEmpty();
}
/**
* Check the remote instance healthiness, set health to false for bellow conditions:
* 1.can't get the instance list
* 2.can't get itself
* 3.check for illegal node in cluster mode such as 127.0.0.1, localhost
*
* @param remoteInstances all the remote instances from cluster
* @return true health false unHealth
*/
public static ClusterHealthStatus isHealth(List<RemoteInstance> remoteInstances) {
if (CollectionUtils.isEmpty(remoteInstances)) {
return ClusterHealthStatus.unHealth("can't get the instance list");
}
List<RemoteInstance> selfInstances = remoteInstances.stream().
filter(remoteInstance -> remoteInstance.getAddress().isSelf()).collect(Collectors.toList());
if (CollectionUtils.isEmpty(selfInstances)) {
return ClusterHealthStatus.unHealth("can't get itself");
}
if (remoteInstances.size() > 1 && hasIllegalNodeAddress(remoteInstances)) {
return ClusterHealthStatus.unHealth("find illegal node in cluster mode such as 127.0.0.1, localhost");
}
return ClusterHealthStatus.HEALTH;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cluster;
import com.google.common.collect.Lists;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class OAPNodeCheckerTest {
@Test
public void hasIllegalNodeAddressWithNull() {
boolean flag = OAPNodeChecker.hasIllegalNodeAddress(null);
Assert.assertFalse(flag);
}
@Test
public void hasIllegalNodeAddressWithEmptySet() {
boolean flag = OAPNodeChecker.hasIllegalNodeAddress(Lists.newArrayList());
Assert.assertFalse(flag);
}
@Test
public void hasIllegalNodeAddressTrue() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("127.0.0.1", 8899, true)));
remoteInstances.add(new RemoteInstance(new Address("123.23.4.2", 8899, true)));
boolean flag = OAPNodeChecker.hasIllegalNodeAddress(remoteInstances);
Assert.assertTrue(flag);
}
@Test
public void hasIllegalNodeAddressFalse() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("123.23.4.2", 8899, true)));
boolean flag = OAPNodeChecker.hasIllegalNodeAddress(remoteInstances);
Assert.assertFalse(flag);
}
@Test
public void unHealthWithEmptyInstance() {
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(Lists.newArrayList());
Assert.assertFalse(clusterHealthStatus.isHealth());
}
@Test
public void unHealthWithNullInstance() {
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(null);
Assert.assertFalse(clusterHealthStatus.isHealth());
}
@Test
public void unHealthWithEmptySelfInstance() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("192.168.0.1", 8892, false)));
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(remoteInstances);
Assert.assertFalse(clusterHealthStatus.isHealth());
}
@Test
public void unHealthWithIllegalNodeInstance() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("192.168.0.1", 8892, true)));
remoteInstances.add(new RemoteInstance(new Address("127.0.0.1", 8892, true)));
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(remoteInstances);
Assert.assertFalse(clusterHealthStatus.isHealth());
}
@Test
public void healthWithOnlySelf() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("127.0.0.1", 8899, true)));
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(remoteInstances);
Assert.assertTrue(clusterHealthStatus.isHealth());
}
@Test
public void healthWithSelfAndNodes() {
List<RemoteInstance> remoteInstances = new ArrayList<>();
remoteInstances.add(new RemoteInstance(new Address("192.168.0.1", 8899, true)));
remoteInstances.add(new RemoteInstance(new Address("192.168.0.2", 8899, false)));
ClusterHealthStatus clusterHealthStatus = OAPNodeChecker.isHealth(remoteInstances);
Assert.assertTrue(clusterHealthStatus.isHealth());
}
}
\ No newline at end of file
......@@ -33,6 +33,11 @@ public class DelegatedHealthChecker implements HealthChecker {
Optional.ofNullable(delegated.get()).ifPresent(d -> d.unHealth(t));
}
@Override
public void unHealth(String reason) {
Optional.ofNullable(delegated.get()).ifPresent(d -> d.unHealth(reason));
}
public void register(HealthChecker healthChecker) {
delegated.set(healthChecker);
}
......
......@@ -34,4 +34,11 @@ public interface HealthChecker {
* @param t details of unhealthy status
*/
void unHealth(Throwable t);
/**
* It's unHealth.
*
* @param reason details reason of unhealthy status
*/
void unHealth(String reason);
}
......@@ -42,4 +42,10 @@ public class HealthCheckMetrics implements HealthChecker {
log.error("Health check fails", t);
metrics.setValue(1);
}
@Override
public void unHealth(String reason) {
log.warn("Health check fails. reason: {}", reason);
metrics.setValue(1);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册