提交 becf7080 编写于 作者: G Gao Hongtao

Add kubernetes as a new cluster manager.

# 1447
Use `GET /api/v1/watch/namespaces/{namespace}/pods` api to watch pod's
containerIp which help collector containers to discovery each other.
上级 e7ce4d75
......@@ -54,6 +54,8 @@
<shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<elasticsearch.version>6.3.1</elasticsearch.version>
<joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version>
</properties>
<dependencies>
......@@ -224,6 +226,11 @@
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>${kubernetes.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-cluster-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-kubernetes-plugin</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* The configuration of the module of cluster.kubernetes
*
* @author gaohongtao
*/
public class ClusterModuleKubernetesConfig extends ModuleConfig {
private int watchTimeoutSeconds;
private String namespace;
private String labelSelector;
private String uidEnvName;
public int getWatchTimeoutSeconds() {
return watchTimeoutSeconds;
}
public void setWatchTimeoutSeconds(int watchTimeoutSeconds) {
this.watchTimeoutSeconds = watchTimeoutSeconds;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getLabelSelector() {
return labelSelector;
}
public void setLabelSelector(String labelSelector) {
this.labelSelector = labelSelector;
}
public String getUidEnvName() {
return uidEnvName;
}
public void setUidEnvName(String uidEnvName) {
this.uidEnvName = uidEnvName;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
/**
* Use kubernetes to manage all instances in Skywalking cluster.
*
* @author gaohongtao
*/
public class ClusterModuleKubernetesProvider extends ModuleProvider {
private final ClusterModuleKubernetesConfig config;
public ClusterModuleKubernetesProvider() {
super();
this.config = new ClusterModuleKubernetesConfig();
}
@Override public String name() {
return "kubernetes";
}
@Override public Class<? extends ModuleDefine> module() {
return ClusterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
KubernetesCoordinator coordinator = new KubernetesCoordinator(
new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()),
new UidEnvSupplier(config.getUidEnvName()));
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override public void start() {
}
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
/**
* The event of watch.
*
* @author gaohongtao
*/
public class Event {
private final String type;
private final String uid;
private final String host;
public Event(final String type, final String uid, final String host) {
this.type = type;
this.uid = uid;
this.host = host;
}
String getType() {
return type;
}
String getUid() {
return uid;
}
String getHost() {
return host;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to
* construct the list of {@link RemoteInstance}.
*
* @author gaohongtao
*/
public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery {
private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class);
private final String uid;
private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>();
private final ReusableWatch<Event> watch;
private int port;
KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
this.watch = watch;
this.uid = uidSupplier.get();
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
this.port = remoteInstance.getPort();
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
}
private void submitTask(final ListeningExecutorService service) {
watch.initOrReset();
ListenableFuture<?> watchFuture = service.submit(newWatch());
Futures.addCallback(watchFuture, new FutureCallback<Object>() {
@Override public void onSuccess(@Nullable Object ignored) {
submitTask(service);
}
@Override public void onFailure(@Nullable Throwable throwable) {
logger.debug("Generate remote nodes error", throwable);
submitTask(service);
}
});
}
private Callable<Object> newWatch() {
return () -> {
generateRemoteNodes();
return null;
};
}
private void generateRemoteNodes() {
for (Event event : watch) {
logger.debug("Received event {} {}-{}", event.getType(), event.getUid(), event.getHost());
switch (event.getType()) {
case "ADDED":
case "MODIFIED":
cache.put(event.getUid(), new RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
break;
case "DELETED":
cache.remove(event.getUid());
break;
default:
throw new RuntimeException(String.format("Unknown event %s", event.getType()));
}
}
}
@Override public List<RemoteInstance> queryRemoteNodes() {
return Lists.newArrayList(cache.values());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
/**
* This watch can init or reset internal state.
*
* @param <T> event of watch
* @author gaohongtao
*/
public interface ReusableWatch<T> extends Iterable<T> {
/**
* Reset internal state.
*/
void initOrReset();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
import com.google.common.collect.Iterators;
import com.google.common.reflect.TypeToken;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
/**
* Watch the api {@literal https://v1-9.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.9/#watch-64}.
*
* @author gaohongtao
*/
public class NamespacedPodListWatch implements ReusableWatch<Event> {
private final CoreV1Api api = new CoreV1Api();
private final String namespace;
private final String labelSelector;
private final int watchTimeoutSeconds;
private Watch<V1Pod> watch;
public NamespacedPodListWatch(final String namespace, final String labelSelector, final int watchTimeoutSeconds) {
this.namespace = namespace;
this.labelSelector = labelSelector;
this.watchTimeoutSeconds = watchTimeoutSeconds;
}
@Override public void initOrReset() {
ApiClient client;
try {
client = Config.defaultClient();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
client.getHttpClient().setReadTimeout(watchTimeoutSeconds, TimeUnit.SECONDS);
Configuration.setDefaultApiClient(client);
try {
watch = Watch.createWatch(
client,
api.listNamespacedPodCall(namespace, null, null, null,
null, labelSelector, Integer.MAX_VALUE,null,null, Boolean.TRUE,
null, null),
new TypeToken<Watch.Response<V1Pod>>() { }.getType());
} catch (ApiException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override public Iterator<Event> iterator() {
return Iterators.transform(watch.iterator(), response -> {
if (response == null) {
throw new NullPointerException("Original event is null");
}
return new Event(response.type, response.object.getMetadata().getUid(), response.object.getStatus().getPodIP());
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
import java.util.function.Supplier;
/**
* Supply uid from environment variable.
*
* @author gaohongtao
*/
public class UidEnvSupplier implements Supplier<String> {
private final String uidEnvName;
public UidEnvSupplier(final String uidEnvName) {
this.uidEnvName = uidEnvName == null ? "" : uidEnvName;
}
@Override public String get() {
return System.getenv(uidEnvName);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ClusterModuleKubernetesProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ClusterModuleKubernetesProviderTest {
private ClusterModuleKubernetesProvider provider;
@Before
public void setUp() {
provider = new ClusterModuleKubernetesProvider();
}
@Test
public void assertName() {
assertThat(provider.name(), is("kubernetes"));
}
@Test
public void assertModule() {
assertTrue(provider.module().isAssignableFrom(ClusterModule.class));
}
@Test
public void assertCreateConfigBeanIfAbsent() {
assertTrue(ClusterModuleKubernetesConfig.class.isInstance(provider.createConfigBeanIfAbsent()));
}
@Test
public void assertPrepare() throws ServiceNotProvidedException {
provider.prepare();
ClusterRegister register = provider.getService(ClusterRegister.class);
ClusterNodesQuery query = provider.getService(ClusterNodesQuery.class);
assertSame(register, query);
assertTrue(KubernetesCoordinator.class.isInstance(register));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
public class KubernetesCoordinatorTest {
private KubernetesCoordinator coordinator;
@Test
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
}
@Test
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.3"));
}
@Test
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
}
@Test
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
public class PlainWatch implements ReusableWatch<Event> {
public static PlainWatch create(final int size, final String... args) {
List<Event> events = new ArrayList<>(args.length / 3);
for (int i = 0; i < args.length; i++) {
events.add(new Event(args[i++], args[i++], args[i]));
}
return new PlainWatch(events, size);
}
private final List<Event> events;
private final int size;
private final CountDownLatch latch = new CountDownLatch(1);
private Iterator<Event> iterator;
private int count;
private PlainWatch(final List<Event> events, final int size) {
this.events = events;
this.size = size;
}
@Override public void initOrReset() {
final Iterator<Event> internal = events.subList(count, events.size()).iterator();
iterator = new Iterator<Event>() {
public boolean hasNext() {
boolean result = count < size && internal.hasNext();
if (!result) {
latch.countDown();
}
return result;
}
public Event next() {
if (!this.hasNext()) {
throw new NoSuchElementException();
} else {
++count;
return internal.next();
}
}
public void remove() {
internal.remove();
}
};
}
@Override public Iterator<Event> iterator() {
return iterator;
}
public void await() throws InterruptedException {
latch.await();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -32,6 +32,7 @@
<modules>
<module>cluster-zookeeper-plugin</module>
<module>cluster-standalone-plugin</module>
<module>cluster-kubernetes-plugin</module>
</modules>
<dependencies>
......
......@@ -29,6 +29,16 @@ public class RemoteInstance {
private int port;
private boolean self = false;
public RemoteInstance() {
}
public RemoteInstance(String host, int port, boolean self) {
this.host = host;
this.port = port;
this.self = self;
}
public String getHost() {
return host;
}
......
......@@ -15,18 +15,23 @@
# limitations under the License.
cluster:
standalone:
standalone:
# zookeeper:
# hostPort: localhost:2181
# # Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
# kubernetes:
# watchTimeoutSeconds: 60
# namespace: default
# labelSelector: app=collector,release=skywalking
# uidEnvName: SKYWALKING_COLLECTOR_UID
core:
default:
restHost: localhost
restHost: 0.0.0.0
restPort: 12800
restContextPath: /
gRPCHost: localhost
gRPCHost: 0.0.0.0
gRPCPort: 11800
storage:
elasticsearch:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册