提交 0951b943 编写于 作者: wu-sheng's avatar wu-sheng 提交者: Gao Hongtao

Make k8s-Coordinator works in role mode. (#2347)

* Make k8s-Coordinator back in role mode.

* Start query API server after the core started.

* Do lock when port has been intialized.

* Rename service.

* Fix text cases.

* Declare ConfigService.class in core module.

* Implement in an easier way.

* Envoy print whole metric, now in debug.

* Forcedly fix envoy timestamp issue.
上级 eb19715f
......@@ -38,5 +38,11 @@
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Use kubernetes to manage all instances in Skywalking cluster.
......@@ -34,6 +30,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class ClusterModuleKubernetesProvider extends ModuleProvider {
private final ClusterModuleKubernetesConfig config;
private KubernetesCoordinator coordinator;
public ClusterModuleKubernetesProvider() {
super();
......@@ -53,7 +50,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException {
KubernetesCoordinator coordinator = new KubernetesCoordinator(
coordinator = new KubernetesCoordinator(getManager(),
new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()),
new UidEnvSupplier(config.getUidEnvName()));
this.registerServiceImplementation(ClusterRegister.class, coordinator);
......@@ -65,7 +62,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
}
@Override public void notifyAfterCompleted() {
coordinator.start();
}
@Override public String[] requiredModules() {
......
......@@ -18,20 +18,22 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import org.slf4j.*;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to
* construct the list of {@link RemoteInstance}.
* Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
* {@link RemoteInstance}.
*
* @author gaohongtao
*/
......@@ -39,26 +41,33 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
private final ModuleDefineHolder manager;
private final String uid;
private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>();
private final ReusableWatch<Event> watch;
private int port;
private volatile int port = -1;
KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
KubernetesCoordinator(ModuleDefineHolder manager,
final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
this.manager = manager;
this.watch = watch;
this.uid = uidSupplier.get();
TelemetryRelatedContext.INSTANCE.setId(uid);
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
this.port = remoteInstance.getAddress().getPort();
public void start() {
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
this.port = remoteInstance.getAddress().getPort();
}
private void submitTask(final ListeningExecutorService service) {
watch.initOrReset();
ListenableFuture<?> watchFuture = service.submit(newWatch());
......@@ -102,7 +111,19 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
}
@Override public List<RemoteInstance> queryRemoteNodes() {
logger.debug("Query kubernetes remote nodes: {}", cache);
return Lists.newArrayList(cache.values());
final List<RemoteInstance> list = new ArrayList<>();
cache.values().forEach(instance -> {
Address address = instance.getAddress();
if (port == -1) {
logger.debug("Query kubernetes remote, port hasn't init, try to init");
ConfigService service = manager.find(CoreModule.NAME).provider().getService(ConfigService.class);
port = service.getGRPCPort();
logger.debug("Query kubernetes remote, port is set at {}", port);
}
list.add(new RemoteInstance(new Address(address.getHost(), port, address.isSelf())));
});
logger.debug("Query kubernetes remote nodes: {}", list);
return list;
}
}
......@@ -19,12 +19,18 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.Test;
import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
public class KubernetesCoordinatorTest {
......@@ -33,7 +39,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
......@@ -43,7 +50,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
......@@ -53,7 +61,8 @@ public class KubernetesCoordinatorTest {
@Test
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
......@@ -63,10 +72,52 @@ public class KubernetesCoordinatorTest {
@Test
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertModifiedInReceiverRole() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
}
@Test
public void assertDeletedInReceiverRole() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertErrorInReceiverRole() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1");
coordinator.start();
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
public ModuleDefineHolder getManager() {
ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting();
ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine);
CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class);
when(config.getGRPCHost()).thenReturn("127.0.0.1");
when(config.getGRPCPort()).thenReturn(8454);
coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config));
return moduleManagerTesting;
}
}
\ No newline at end of file
......@@ -28,9 +28,8 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author peng-yongsheng
......@@ -45,6 +44,7 @@ public class CoreModule extends ModuleDefine {
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
classes.add(ConfigService.class);
classes.add(DownsamplingConfigService.class);
classes.add(IComponentLibraryCatalogService.class);
......
......@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
*/
@Getter
public class CoreModuleConfig extends ModuleConfig {
@Setter private String role;
@Setter private String role = "Mixed";
@Setter private String nameSpace;
@Setter private String restHost;
@Setter private int restPort;
......
......@@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider {
jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
jettyServer.initialize();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
......@@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) {
if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.config;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author wusheng
*/
@Getter
public class ConfigService implements Service {
private String gRPCHost;
private int gRPCPort;
public ConfigService(CoreModuleConfig moduleConfig) {
this.gRPCHost = moduleConfig.getGRPCHost();
this.gRPCPort = moduleConfig.getGRPCPort();
}
}
......@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.Const;
public class Address implements Comparable<Address> {
private final String host;
private final int port;
@Setter private boolean isSelf;
@Setter private volatile boolean isSelf;
public Address(String host, int port, boolean isSelf) {
this.host = host;
......
......@@ -66,10 +66,13 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
private int serviceInstanceId = Const.NONE;
@Override public void onNext(StreamMetricsMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Received msg {}", message);
}
if (isFirst) {
isFirst = false;
StreamMetricsMessage.Identifier identifier = message.getIdentifier();
logger.debug("Received identifier msg {}", identifier);
Node node = identifier.getNode();
if (node != null) {
String nodeId = node.getId();
......@@ -110,6 +113,20 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
timestamp = metric.getTimestampMs();
value = metric.getGauge().getValue();
if (timestamp > 1000000000000000000L) {
/**
* Several versions of envoy in istio.deps send timestamp in nanoseconds,
* instead of milliseconds(protocol says).
*
* Sadly, but have to fix it forcedly.
*
* An example of timestamp is '1552303033488741055', clearly it is not in milliseconds.
*
* This should be removed in the future.
*/
timestamp /= 1_000_000;
}
EnvoyInstanceMetric metricSource = new EnvoyInstanceMetric();
metricSource.setServiceId(serviceId);
metricSource.setServiceName(serviceName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册