未验证 提交 f513726f 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Fix potential gRPC connection leak(not closed) for the channels among OAP instances. (#5995)

* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.

* Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
上级 be197dbf
......@@ -20,6 +20,8 @@ Release Notes.
* Add the rule name field to alarm record storage entity as a part of ID, to support multiple alarm rules triggered for one entity. The scope id has been removed from the ID.
* Fix MAL concurrent execution issues.
* Fix group name can't be query in the GraphQL.
* Fix potential gRPC connection leak(not closed) for the channels among OAP instances.
* Filter OAP instances(unassigned in booting stage) of the empty IP in KubernetesCoordinator.
#### UI
* Fix un-removed tags in trace query.
......
......@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodStatus;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
......@@ -66,23 +65,25 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
List<V1Pod> pods = NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
if (log.isDebugEnabled()) {
List<String> uidList = pods
.stream()
.map(item -> item.getMetadata().getUid())
.collect(Collectors.toList());
.stream()
.map(item -> item.getMetadata().getUid())
.collect(Collectors.toList());
log.debug("[kubernetes cluster pods uid list]:{}", uidList.toString());
}
if (port == -1) {
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
}
List<RemoteInstance> remoteInstances = pods.stream()
List<RemoteInstance> remoteInstances =
pods.stream()
.filter(pod -> StringUtil.isNotBlank(pod.getStatus().getPodIP()))
.map(pod -> new RemoteInstance(
new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
new Address(pod.getStatus().getPodIP(), port, pod.getMetadata().getUid().equals(uid))))
.collect(Collectors.toList());
healthChecker.health();
return remoteInstances;
} catch (Throwable e) {
healthChecker.unHealth(e);
throw new ServiceQueryException(e.getMessage());
throw new ServiceQueryException(e.getMessage());
}
}
......@@ -100,8 +101,11 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
private void initHealthChecker() {
if (healthChecker == null) {
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge("cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
healthChecker = metricCreator.createHealthCheckerGauge(
"cluster_k8s", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
}
......
......@@ -65,9 +65,10 @@ public class RemoteClientManager implements Service {
/**
* Initial the manager for all remote communication clients.
*
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
* @param trustedCAFile SslContext to verify server certificates.
* @param trustedCAFile SslContext to verify server certificates.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder,
int remoteTimeout,
......@@ -80,7 +81,8 @@ public class RemoteClientManager implements Service {
* Initial the manager for all remote communication clients.
*
* Initial the manager for all remote communication clients.
* @param moduleDefineHolder for looking up other modules
*
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(final ModuleDefineHolder moduleDefineHolder, final int remoteTimeout) {
......@@ -103,7 +105,10 @@ public class RemoteClientManager implements Service {
gauge = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
.createGauge(
"cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);
}
try {
if (Objects.isNull(clusterNodesQuery)) {
......@@ -187,13 +192,24 @@ public class RemoteClientManager implements Service {
* @param remoteInstances Remote instance collection by query cluster config.
*/
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
.collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
final Map<Address, RemoteClientAction> remoteClientCollection =
this.usingClients.stream()
.collect(Collectors.toMap(
RemoteClient::getAddress,
client -> new RemoteClientAction(
client, Action.Close)
));
final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
.collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
final Map<Address, RemoteClientAction> latestRemoteClients =
remoteInstances.stream()
.collect(Collectors.toMap(
RemoteInstance::getAddress,
remote -> new RemoteClientAction(
null, Action.Create)
));
final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
final Set<Address> unChangeAddresses = Sets.intersection(
remoteClientCollection.keySet(), latestRemoteClients.keySet());
unChangeAddresses.stream()
.filter(remoteClientCollection::containsKey)
......@@ -230,7 +246,10 @@ public class RemoteClientManager implements Service {
remoteClientCollection.values()
.stream()
.filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
.filter(remoteClientAction ->
remoteClientAction.getAction().equals(Action.Close)
&& !remoteClientAction.getRemoteClient().getAddress().isSelf()
)
.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册