未验证 提交 92c0cb85 编写于 作者: W wankai123 提交者: GitHub

Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in MAL...

Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in MAL for k8s to relate pods and services. (#6608)
上级 073fbbca
......@@ -59,6 +59,7 @@ Release Notes.
* Add functions in MAL to filter metrics according to the metric value.
* Optimize the self monitoring grafana dashboard.
* Enhance the export service.
* Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in MAL for k8s to relate pods and services.
#### UI
* Update selector scroller to show in all pages.
......
......@@ -57,6 +57,31 @@ For example, this filters all instance_trace_count samples for values >= 33:
```
instance_trace_count.valueGreaterEqual(33)
```
### Tag manipulator
MAL provides tag manipulators to change(add/delete/update) tags and their values.
#### K8s
MAL supports using the metadata of k8s to manipulate the tags and their values.
This feature requires OAP Server to have the authority to access the K8s's `API Server`.
##### retagByK8sMeta
`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName)`. Add a new tag to the sample family based on an existing label's value. Provide several internal converting types, including
- K8sRetagType.Pod2Service
Add a tag to the sample by using `service` as the key, `$serviceName.$namespace` as the value, by the given value of the tag key, which represents the name of a pod.
For example:
```
container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh} 2
```
Expression:
```
container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')
```
Output:
```
container_cpu_usage_seconds_total{container=my-nginx, cpu=total, pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
```
### Binary operators
......@@ -186,6 +211,7 @@ Examples:
#### time
`time()`. returns the number of seconds since January 1, 1970 UTC.
## Down Sampling Operation
MAL should instruct meter-system how to do downsampling for metrics. It doesn't only refer to aggregate raw samples to
`minute` level, but also hints data from `minute` to higher levels, for instance, `hour` and `day`.
......
......@@ -33,7 +33,6 @@
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
......@@ -42,6 +41,11 @@
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>${kubernetes.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingContext;
import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
......@@ -219,6 +220,10 @@ public class Analyzer {
}
}
createMetric(ctx.getScopeType(), metricType.literal, ctx.getDownsampling());
if (ctx.isRetagByK8sMeta()) {
K8sInfoRegistry.getInstance().start();
}
}
private void createMetric(final ScopeType scopeType,
......
......@@ -21,7 +21,9 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
import groovy.util.DelegatingScript;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.control.customizers.ImportCustomizer;
/**
* DSL combines methods to parse groovy based DSL expression.
......@@ -37,6 +39,9 @@ public final class DSL {
public static Expression parse(final String expression) {
CompilerConfiguration cc = new CompilerConfiguration();
cc.setScriptBaseClass(DelegatingScript.class.getName());
ImportCustomizer icz = new ImportCustomizer();
icz.addImport("K8sRetagType", K8sRetagType.class.getName());
cc.addCompilationCustomizers(icz);
GroovyShell sh = new GroovyShell(new Binding(), cc);
DelegatingScript script = (DelegatingScript) sh.parse(expression);
return new Expression(expression, script);
......
......@@ -61,7 +61,6 @@ public class ExpressionParsingContext implements Closeable {
List<String> samples;
boolean isHistogram;
int[] percentiles;
Set<String> aggregationLabels;
......@@ -72,6 +71,11 @@ public class ExpressionParsingContext implements Closeable {
ScopeType scopeType;
/**
* Mark whether the retagByK8sMeta func in expressions is active
*/
boolean isRetagByK8sMeta;
/**
* Get labels no scope related.
*
......
......@@ -38,6 +38,7 @@ import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEn
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceEntityDescription;
import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
......@@ -310,6 +311,18 @@ public class SampleFamily {
);
}
/* k8s retags*/
public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, String existingLabelName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = true);
if (this == EMPTY) {
return EMPTY;
}
return SampleFamily.build(this.context, type.execute(samples, newLabelName, existingLabelName));
}
public SampleFamily histogram() {
return histogram("le", this.context.defaultHistogramBucketUnit);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Map;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
public enum K8sRetagType implements Retag {
Pod2Service {
@Override
public Sample[] execute(final Sample[] ss, final String newLabelName, final String existingLabelName) {
Sample[] samples = Arrays.stream(ss).map(sample -> {
String podName = sample.getLabels().get(existingLabelName);
if (!Strings.isNullOrEmpty(podName)) {
String serviceName = K8sInfoRegistry.getInstance().findServiceName(podName);
if (!Strings.isNullOrEmpty(serviceName)) {
Map<String, String> labels = Maps.newHashMap(sample.getLabels());
labels.put(newLabelName, serviceName);
return sample.toBuilder().labels(ImmutableMap.copyOf(labels)).build();
}
}
return sample;
}).toArray(Sample[]::new);
return samples;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
public interface Retag {
Sample[] execute(Sample[] ss, String newLabelName, String existingLabelName);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.meter.analyzer.k8s;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1EndpointsList;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.models.V1Pod;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
@Slf4j
public class K8sInfoRegistry {
private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final Map<String/* ip */, V1Pod> ipPodMap = new ConcurrentHashMap<>();
private final Map<String/* ip */, String/* serviceName.namespace */> ipServiceMap = new ConcurrentHashMap<>();
private final Map<String/* podName */, String /* serviceName.namespace */> podServiceMap = new ConcurrentHashMap<>();
private ExecutorService executor;
public static K8sInfoRegistry getInstance() {
return INSTANCE;
}
private void init() {
executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("K8sInfoRegistry-%d")
.setDaemon(true)
.build()
);
}
@SneakyThrows
public void start() {
if (isStarted.compareAndSet(false, true)) {
init();
final ApiClient apiClient = Config.defaultClient();
apiClient.setHttpClient(apiClient.getHttpClient()
.newBuilder()
.readTimeout(0, TimeUnit.SECONDS)
.build());
Configuration.setDefaultApiClient(apiClient);
final CoreV1Api coreV1Api = new CoreV1Api();
final SharedInformerFactory factory = new SharedInformerFactory(executor);
listenEndpointsEvents(coreV1Api, factory);
listenPodEvents(coreV1Api, factory);
factory.startAllRegisteredInformers();
}
}
private void listenEndpointsEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
factory.sharedIndexInformerFor(
params -> coreV1Api.listEndpointsForAllNamespacesCall(
null,
null,
null,
null,
null,
null,
params.resourceVersion,
300,
params.watch,
null
),
V1Endpoints.class,
V1EndpointsList.class
).addEventHandler(new ResourceEventHandler<V1Endpoints>() {
@Override
public void onAdd(final V1Endpoints endpoints) {
addEndpoints(endpoints);
}
@Override
public void onUpdate(final V1Endpoints oldEndpoints, final V1Endpoints newEndpoints) {
addEndpoints(newEndpoints);
}
@Override
public void onDelete(final V1Endpoints endpoints, final boolean deletedFinalStateUnknown) {
removeEndpoints(endpoints);
}
});
}
private void listenPodEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) {
factory.sharedIndexInformerFor(
params -> coreV1Api.listPodForAllNamespacesCall(
null,
null,
null,
null,
null,
null,
params.resourceVersion,
300,
params.watch,
null
),
V1Pod.class,
V1PodList.class
).addEventHandler(new ResourceEventHandler<V1Pod>() {
@Override
public void onAdd(final V1Pod pod) {
addPod(pod);
}
@Override
public void onUpdate(final V1Pod oldPod, final V1Pod newPod) {
addPod(newPod);
}
@Override
public void onDelete(final V1Pod pod, final boolean deletedFinalStateUnknown) {
removePod(pod);
}
});
}
private void addPod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ofNullable(status.getPodIP()).ifPresent(
ip -> ipPodMap.put(ip, pod))
);
recompose();
}
private void removePod(final V1Pod pod) {
ofNullable(pod.getStatus()).ifPresent(
status -> ipPodMap.remove(status.getPodIP())
);
ofNullable(pod.getMetadata()).ifPresent(
metadata -> podServiceMap.remove(pod.getMetadata().getName())
);
}
private void addEndpoints(final V1Endpoints endpoints) {
V1ObjectMeta endpointsMetadata = endpoints.getMetadata();
if (isNull(endpointsMetadata)) {
log.error("Endpoints metadata is null: {}", endpoints);
return;
}
final String namespace = endpointsMetadata.getNamespace();
final String name = endpointsMetadata.getName();
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.put(address.getIp(), name + "." + namespace)
))
));
recompose();
}
private void removeEndpoints(final V1Endpoints endpoints) {
ofNullable(endpoints.getSubsets()).ifPresent(subsets -> subsets.forEach(
subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> addresses.forEach(
address -> ipServiceMap.remove(address.getIp())
))
));
recompose();
}
private void recompose() {
ipPodMap.forEach((ip, pod) -> {
final String namespaceService = ipServiceMap.get(ip);
if (isNullOrEmpty(namespaceService)) {
podServiceMap.remove(ip);
return;
}
final V1ObjectMeta podMetadata = pod.getMetadata();
if (isNull(podMetadata)) {
log.warn("Pod metadata is null, {}", pod);
return;
}
podServiceMap.put(pod.getMetadata().getName(), namespaceService);
});
}
public String findServiceName(String podName) {
return this.podServiceMap.get(podName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.meter.analyzer.dsl;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collection;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import static com.google.common.collect.ImmutableMap.of;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(Parameterized.class)
public class K8sTagTest {
@Parameterized.Parameter
public String name;
@Parameterized.Parameter(1)
public ImmutableMap<String, SampleFamily> input;
@Parameterized.Parameter(2)
public String expression;
@Parameterized.Parameter(3)
public Result want;
@Parameterized.Parameter(4)
public boolean isThrow;
@Parameterized.Parameters(name = "{index}: {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{
"Pod2Service",
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh"))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-mbczh",
"service", "nginx-service.default"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
.value(1)
.build()
).build()),
false,
},
{
"Pod2Service_no_pod",
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-pod"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
.value(1)
.build()
).build()),
false,
},
{
"Pod2Service_no_service",
of("container_cpu_usage_seconds_total", SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of("container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx"
))
.value(1)
.build()
).build()),
"container_cpu_usage_seconds_total.retagByK8sMeta('service' , K8sRetagType.Pod2Service , 'pod')",
Result.success(SampleFamilyBuilder.newBuilder(
Sample.builder()
.labels(
of(
"container", "my-nginx", "cpu", "total", "pod", "my-nginx-5dc4865748-no-service"
))
.value(2)
.build(),
Sample.builder()
.labels(
of(
"container", "kube-state-metrics", "cpu", "total", "pod",
"kube-state-metrics-6f979fd498-z7xwx",
"service", "kube-state-metrics.kube-system"
))
.value(1)
.build()
).build()),
false,
},
});
}
@Before
public void setup() {
Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
Mockito.spy(K8sInfoRegistry.getInstance())
);
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-mbczh")).thenReturn(
"nginx-service.default");
when(K8sInfoRegistry.getInstance().findServiceName("kube-state-metrics-6f979fd498-z7xwx")).thenReturn(
"kube-state-metrics.kube-system");
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-pod")).thenReturn(
null);
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-service")).thenReturn(
null);
}
@Test
public void test() {
Expression e = DSL.parse(expression);
Result r = null;
try {
r = e.run(input);
} catch (Throwable t) {
if (isThrow) {
return;
}
log.error("Test failed", t);
fail("Should not throw anything");
}
if (isThrow) {
fail("Should throw something");
}
assertThat(r, is(want));
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册