未验证 提交 75f03acb 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Adopt Zabbix protocol (#6176)

Happy Chinese New Year !!
上级 c1736b6f
......@@ -78,6 +78,8 @@ jobs:
class: org.apache.skywalking.e2e.meter.MeterE2E
- name: Alarm
class: org.apache.skywalking.e2e.alarm.AlarmE2E
- name: Zabbix
class: org.apache.skywalking.e2e.zabbix.ZabbixE2E
steps:
- uses: actions/checkout@v2
with:
......
......@@ -18,6 +18,7 @@ Release Notes.
* Add a new concept "Event" and its implementations to collect events.
* Add some defensive codes for NPE and bump up Kubernetes client version to expose exception stack trace.
* Update the `timestamp` field type for `LogQuery`.
* Support Zabbix protocol to receive agent metrics.
#### UI
* Update selector scroller to show in all pages.
......
......@@ -41,7 +41,7 @@ including
1. LUA agent especially for Nginx, OpenResty.
1. Browser agent.
1. Service Mesh Observability. Control panel and data panel.
1. Metrics system, including Prometheus, OpenTelemetry, Spring Sleuth(Micrometer).
1. Metrics system, including Prometheus, OpenTelemetry, Spring Sleuth(Micrometer), Zabbix.
1. Logs.
1. Zipkin v1/v2 and Jaeger gRPC format with limited topology and metrics analysis.(Experimental).
......
......@@ -56,6 +56,7 @@
<include>fetcher-prom-rules/*.yaml</include>
<include>envoy-metrics-rules/*.yaml</include>
<include>meter-analyzer-config/*.yaml</include>
<include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include>
</includes>
......
......@@ -56,6 +56,7 @@
<include>fetcher-prom-rules/*.yaml</include>
<include>envoy-metrics-rules/*.yaml</include>
<include>meter-analyzer-config/*.yaml</include>
<include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include>
</includes>
......
......@@ -17,6 +17,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-log**. gRPC services accept log data.
1. **configuration-discovery**. gRPC services handle configurationDiscovery.
1. **receiver-event**. gRPC services to handle events data.
1. **receiver-zabbix**. See [details](backend-zabbix.md).
1. Experimental receivers. All following receivers are in the POC stage, not production ready.
1. **receiver_zipkin**. See [details](#zipkin-receiver). (Experimental)
1. **receiver_jaeger**. See [details](#jaeger-receiver). (Experimental)
......
# Zabbix Receiver
Zabbix receiver is accepting the metrics of [Zabbix Agent Active Checks protocol](https://www.zabbix.com/documentation/current/manual/appendix/items/activepassive#active_checks) format into the [Meter System](./../../concepts-and-designs/meter.md).
Zabbix Agent is base on GPL-2.0 License.
## Module define
```yaml
receiver-zabbix:
selector: ${SW_RECEIVER_ZABBIX:default}
default:
# Export tcp port, Zabbix agent could connected and transport data
port: 10051
# Bind to host
host: 0.0.0.0
# Enable config when receive agent request
activeFiles: agent
```
## Configuration file
Zabbix receiver is configured via a configuration file. The configuration file defines everything related to receiving
from agents, as well as which rule files to load.
OAP can load the configuration at bootstrap. If the new configuration is not well-formed, OAP fails to start up. The files
are located at `$CLASSPATH/zabbix-rules`.
The file is written in YAML format, defined by the scheme described below. Square brackets indicate that a parameter is optional.
An example for zabbix agent configuration could be found [here](../../../../test/e2e/e2e-test/docker/zabbix/zabbix_agentd.conf).
You could find the Zabbix agent detail items from [Zabbix Agent documentation](https://www.zabbix.com/documentation/current/manual/config/items/itemtypes/zabbix_agent).
### Configuration file
```yaml
# insert metricPrefix into metric name: <metricPrefix>_<raw_metric_name>
metricPrefix: <string>
# expSuffix is appended to all expression in this file.
expSuffix: <string>
# Datasource from Zabbix Item keys.
requiredZabbixItemKeys:
- <zabbix item keys>
# Support agent entities information.
entities:
# Allow hostname patterns to build metrics.
hostPatterns:
- <regex string>
# Customized metrics label before parse to meter system.
labels:
[- <labels> ]
# Metrics rule allow you to recompute queries.
metrics:
[ - <metrics_rules> ]
```
#### <labels>
```yaml
# Define the label name. The label value must query from `value` or `fromItem` attribute.
name: <string>
# Appoint value to label.
[value: <string>]
# Query label value from Zabbix Agent Item key.
[fromItem: <string>]
```
#### <metric_rules>
```yaml
# The name of rule, which combinates with a prefix 'meter_' as the index/table name in storage.
name: <string>
# MAL expression.
exp: <string>
```
More about MAL, please refer to [mal.md](../../concepts-and-designs/mal.md).
......@@ -173,6 +173,10 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| receiver-jvm| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-clr| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-profile| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-zabbix| default| Read [receiver doc](backend-zabbix.md) for more details | - | - |
| - | - | port| Exported tcp port, Zabbix agent could connect and transport data| SW_RECEIVER_ZABBIX_PORT | 10051 |
| - | - | host| Bind to host| SW_RECEIVER_ZABBIX_HOST | 0.0.0.0 |
| - | - | activeFiles| Enable config when receive agent request| SW_RECEIVER_ZABBIX_ACTIVE_FILES | agent |
| service-mesh| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| envoy-metric| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| - | - | acceptMetricsService | Open Envoy Metrics Service analysis | SW_ENVOY_METRIC_SERVICE | true|
......
......@@ -28,7 +28,6 @@ import groovy.lang.Closure;
import io.vavr.Function2;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.util.function.DoubleBinaryOperator;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.EqualsAndHashCode;
......@@ -48,6 +47,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.DoubleBinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......
......@@ -141,6 +141,11 @@
<artifactId>skywalking-event-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-zabbix-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- fetcher module -->
......@@ -277,6 +282,7 @@
<exclude>meter-analyzer-config/</exclude>
<exclude>otel-oc-rules/</exclude>
<exclude>ui-initialized-templates/</exclude>
<exclude>zabbix-rules/</exclude>
</excludes>
</configuration>
</plugin>
......
......@@ -266,6 +266,13 @@ receiver-profile:
selector: ${SW_RECEIVER_PROFILE:default}
default:
receiver-zabbix:
selector: ${SW_RECEIVER_ZABBIX:default}
default:
port: ${SW_RECEIVER_ZABBIX_PORT:10051}
host: ${SW_RECEIVER_ZABBIX_HOST:0.0.0.0}
activeFiles: ${SW_RECEIVER_ZABBIX_ACTIVE_FILES:agent}
service-mesh:
selector: ${SW_SERVICE_MESH:default}
default:
......
......@@ -154,7 +154,7 @@ templates:
"chartType": "ChartLine",
"aggregation": "/",
"aggregationNum": "1024",
"unit": "KB/s"
"unit": "KB/s - OTEL, ops - Zabbix"
},
{
"width": "4",
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
metricPrefix: meter_vm
expSuffix: tag({tags -> tags.host = 'vm::' + tags.host}).service(['host'])
entities:
hostPatterns:
- .+
labels:
requiredZabbixItemKeys:
# cpu
- system.cpu.load[all,avg15]
- system.cpu.load[all,avg1]
- system.cpu.load[all,avg5]
- system.cpu.util[,guest]
- system.cpu.util[,guest_nice]
- system.cpu.util[,idle]
- system.cpu.util[,interrupt]
- system.cpu.util[,iowait]
- system.cpu.util[,nice]
- system.cpu.util[,softirq]
- system.cpu.util[,steal]
- system.cpu.util[,system]
- system.cpu.util[,user]
# memory
- vm.memory.size[available]
- vm.memory.size[pavailable]
- vm.memory.size[total]
- vm.memory.size[pavailable]
# swap
- system.swap.size[,free]
- system.swap.size[,total]
- system.swap.size[,pused]
# file
- vfs.fs.inode[/,pused]
- vfs.fs.size[/,total]
- vfs.fs.size[/,used]
- vfs.dev.read[,ops,avg1]
- vfs.dev.write[,ops,avg1]
metrics:
# cpu
- name: cpu_load1
exp: system_cpu_load.tagEqual('2', 'avg1').avg(['host']) * 100
- name: cpu_load5
exp: system_cpu_load.tagEqual('2', 'avg5').avg(['host']) * 100
- name: cpu_load15
exp: system_cpu_load.tagEqual('2', 'avg15').avg(['host']) * 100
- name: cpu_average_used
exp: system_cpu_util.avg(['2', 'host'])
- name: cpu_total_percentage
exp: system_cpu_util.tagNotEqual('2', 'idle').sum(['host'])
# memory
- name: memory_total
exp: vm_memory_size.tagEqual('1', 'total').avg(['host'])
- name: memory_available
exp: vm_memory_size.tagEqual('1', 'available').avg(['host'])
- name: memory_used
exp: vm_memory_size.tagEqual('1', 'total').avg(['host']) - vm_memory_size.tagEqual('1', 'available').avg(['host'])
# swap
- name: memory_swap_free
exp: system_swap_size.tagEqual('2', 'free').avg(['host'])
- name: memory_swap_total
exp: system_swap_size.tagEqual('2', 'total').avg(['host'])
- name: memory_swap_percentage
exp: system_swap_size.tagEqual('2', 'pused')
# file
- name: filesystem_percentage
exp: vfs_fs_inode.tagEqual('2', 'pused').avg(['1', 'host'])
- name: vfs_fs_size
exp: vfs_fs_size.avg(['1', '2', 'host'])
- name: disk_read
# `* 1024` is for adapting to `divide 1024` by the VM UI template configuration, which converts `byte`->`kb` because of OTEL/Prometheus Node exporter.
exp: vfs_dev_read * 1024
- name: disk_written
# `* 1024` is for adapting to `divide 1024` by the VM UI template configuration, which converts `byte`->`kb` because of OTEL/Prometheus Node exporter.
exp: vfs_dev_write * 1024
......@@ -45,6 +45,7 @@
<module>skywalking-log-recevier-plugin</module>
<module>configuration-discovery-receiver-plugin</module>
<module>skywalking-event-receiver-plugin</module>
<module>skywalking-zabbix-receiver-plugin</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-zabbix-receiver-plugin</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>meter-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class ZabbixReceiverModule extends ModuleDefine {
public static final String NAME = "receiver-zabbix";
public ZabbixReceiverModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider;
import com.google.common.collect.ImmutableMap;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.text.StringTokenizer;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.config.ZabbixConfig;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
/**
* Management all Zabbix metrics
*/
@Slf4j
public class ZabbixMetrics {
private final List<ZabbixConfig> originalConfigs;
/**
* All enabled service and instance group
*/
private List<InstanceGroup> allServices = new ArrayList<>();
public ZabbixMetrics(List<ZabbixConfig> originalConfigs, MeterSystem meterSystem) {
this.originalConfigs = originalConfigs;
initConfigs(meterSystem);
}
/**
* Get all key names when Zabbix agent queried
*/
public Set<String> getAllMonitorMetricNames(String hostName) {
// Find instance group
return findInstanceGroup(hostName).map(InstanceGroup::getEnabledKeys).orElse(null);
}
/**
* Receive agent data and convert to meter system
*/
public ConvertStatics convertMetrics(List<ZabbixRequest.AgentData> agentDataList) {
if (CollectionUtils.isEmpty(agentDataList)) {
return ConvertStatics.EMPTY;
}
return agentDataList.stream()
// Group by host
.collect(Collectors.groupingBy(ZabbixRequest.AgentData::getHost)).entrySet().stream()
// Convert every agent data list
.map(e -> findInstanceGroup(e.getKey()).map(instanceGroup -> instanceGroup.convertToMeter(e.getValue())).orElse(null))
.filter(Objects::nonNull)
// Merge all statics
.reduce(ConvertStatics::merge)
.orElse(ConvertStatics.EMPTY);
}
private Optional<InstanceGroup> findInstanceGroup(String hostName) {
// Find service group, support using cache
return allServices.stream().filter(group -> group.matchesWithHostName(hostName)).findAny();
}
private void initConfigs(MeterSystem meterSystem) {
// Temporary instance group cache, encase generate multiple instance group
HashMap<String, InstanceGroup> tmpGroupCache = new HashMap<>();
// Each config entities
originalConfigs.forEach(c ->
c.getEntities().getHostPatterns().forEach(instance ->
tmpGroupCache.computeIfAbsent(instance, ins -> {
InstanceGroup instanceGroup = new InstanceGroup(ins, meterSystem);
allServices.add(instanceGroup);
return instanceGroup;
}).appendMetrics(c)));
}
/**
* Metrics convert to meter system statics
*/
@Builder
@Getter
public static class ConvertStatics {
public static final ConvertStatics EMPTY = ConvertStatics.builder().build();
private int total;
private int success;
private int failed;
private double useTime;
public ConvertStatics merge(ConvertStatics statics) {
this.total += statics.total;
this.success += statics.success;
this.failed += statics.failed;
this.useTime += statics.useTime;
return this;
}
}
/**
* The group of instances according to hostPatterns defined in Zabbix rule file
*/
private static class InstanceGroup {
static final InstanceGroup EMPTY = new InstanceGroup("", null);
private final Pattern instancePattern;
private final MeterSystem meterSystem;
@Getter
private Set<String> enabledKeys;
private List<MetricConvert> metricConverts;
private List<ZabbixConfig.EntityLabel> labels;
public InstanceGroup(String instancePattern, MeterSystem meterSystem) {
this.instancePattern = Pattern.compile(instancePattern);
this.meterSystem = meterSystem;
this.enabledKeys = new HashSet<>();
this.metricConverts = new ArrayList<>();
this.labels = new ArrayList<>();
}
public void appendMetrics(ZabbixConfig config) {
// Append metrics to converters
metricConverts.add(new MetricConvert(config, meterSystem));
// Append labels and add to item keys
if (CollectionUtils.isNotEmpty(config.getEntities().getLabels())) {
labels.addAll(config.getEntities().getLabels());
config.getEntities().getLabels().stream().filter(l -> StringUtils.isNotBlank(l.getFromItem()))
.forEach(l -> enabledKeys.add(l.getFromItem()));
}
// Append all metric keys
enabledKeys.addAll(config.getRequiredZabbixItemKeys());
}
public boolean matchesWithHostName(String hostName) {
Matcher matcher = instancePattern.matcher(hostName);
return matcher.matches();
}
public ConvertStatics convertToMeter(List<ZabbixRequest.AgentData> dataList) {
if (log.isDebugEnabled()) {
log.debug("Receive zabbix agent data: {}", dataList);
}
StopWatch stopWatch = new StopWatch();
Collection<SampleFamily> sampleFamilies = null;
try {
stopWatch.start();
// Parse config labels
Map<String, String> configLabels = parseConfigLabels(dataList);
// Build metrics
ImmutableMap<String, SampleFamily> families = dataList.stream()
// Correct state
.filter(d -> d.getState() == 0 && NumberUtils.isParsable(d.getValue()))
// Parse data to list
.map(this::parseAgentData)
.map(b -> b.build(configLabels))
// Combine to sample family
.collect(Collectors.groupingBy(Sample::getName))
.entrySet().stream().collect(toImmutableMap(
Map.Entry::getKey,
e -> SampleFamilyBuilder.newBuilder(e.getValue().stream().toArray(Sample[]::new)).build()));
sampleFamilies = families.values();
// Each all converters
metricConverts.forEach(converter -> converter.toMeter(families));
} finally {
stopWatch.stop();
}
return ConvertStatics.builder()
.total(sampleFamilies.size())
// Setting all as success
.success(sampleFamilies.size())
.useTime(((double) stopWatch.getTime()) / 1000)
.build();
}
/**
* Parsing config labels from original value or agent data
*/
private Map<String, String> parseConfigLabels(List<ZabbixRequest.AgentData> dataList) {
if (CollectionUtils.isEmpty(labels)) {
return Collections.emptyMap();
}
return labels.stream().map(label -> {
// Exists Value
if (StringUtil.isNotBlank(label.getValue())) {
return Tuple.of(label.getName(), label.getValue());
} else if (StringUtil.isNotBlank(label.getFromItem())) {
// Searching from Agent data
return dataList.stream()
.filter(d -> Objects.equals(d.getKey(), label.getFromItem())).findFirst()
.map(d -> Tuple.of(label.getName(), d.getValue())).orElse(null);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
}
/**
* Parsing Zabbix agent data to sample builder
*/
private SampleBuilder parseAgentData(ZabbixRequest.AgentData data) {
String keyName = data.getKey();
SampleBuilder.SampleBuilderBuilder builder = SampleBuilder.builder();
if (keyName.contains("[") && keyName.endsWith("]")) {
String key = StringUtils.substringBefore(keyName, "[");
// Split params, support quote mode, label name start at 1
StringTokenizer tokenizer = new StringTokenizer(
StringUtils.substringAfter(keyName.substring(0, keyName.length() - 1), "["), ',', '\"');
tokenizer.setIgnoreEmptyTokens(false);
int inx = 1;
ImmutableMap.Builder<String, String> paramBuilder = ImmutableMap.builder();
while (tokenizer.hasNext()) {
paramBuilder.put(String.valueOf(inx++), tokenizer.next());
}
builder.name(key).labels(paramBuilder.build());
} else {
builder.name(keyName).labels(ImmutableMap.of());
}
return builder.hostName(data.getHost())
.timestamp(TimeUnit.SECONDS.toMillis(data.getClock()))
.value(Double.parseDouble(data.getValue()))
.build();
}
}
@Builder
@Data
private static class SampleBuilder {
private final String name;
private final String hostName;
private final long timestamp;
private final ImmutableMap<String, String> labels;
private final double value;
public Sample build(Map<String, String> configLabels) {
return Sample.builder()
.name(escapedName(name))
.labels(ImmutableMap.<String, String>builder()
// Put original labels
.putAll(labels)
// Put config labels
.putAll(configLabels)
// Put report instance to labels
.put("host", hostName)
.build())
.value(value)
.timestamp(timestamp).build();
}
// Returns the escaped name of the given one, with "." replaced by "_"
private String escapedName(final String name) {
return name.replaceAll("\\.", "_");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
@Getter
public class ZabbixModuleConfig extends ModuleConfig {
/**
* Export tcp port
*/
private int port = 10051;
/**
* Bind to host
*/
private String host = "0.0.0.0";
public static final String CONFIG_PATH = "zabbix-rules";
/**
* active receive configs, files split by ","
*/
private String activeFiles = Const.EMPTY_STRING;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider;
import com.google.common.base.Splitter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.zabbix.module.ZabbixReceiverModule;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.config.ZabbixConfig;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.config.ZabbixConfigs;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixServer;
import java.util.Collections;
import java.util.List;
public class ZabbixReceiverProvider extends ModuleProvider {
private ZabbixModuleConfig moduleConfig;
private List<ZabbixConfig> configs;
private ZabbixMetrics zabbixMetrics;
public ZabbixReceiverProvider() {
this.moduleConfig = new ZabbixModuleConfig();
}
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return ZabbixReceiverModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return moduleConfig;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
configs = ZabbixConfigs.loadConfigs(ZabbixModuleConfig.CONFIG_PATH,
StringUtil.isEmpty(moduleConfig.getActiveFiles()) ? Collections.emptyList() : Splitter.on(",").splitToList(moduleConfig.getActiveFiles()));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
if (CollectionUtils.isNotEmpty(configs)) {
// Init metrics
zabbixMetrics = new ZabbixMetrics(configs, getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class));
// Bind receiver server
ZabbixServer zabbixServer = new ZabbixServer(moduleConfig, zabbixMetrics);
try {
zabbixServer.start();
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {
CoreModule.NAME
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.config;
import lombok.Data;
import org.apache.skywalking.oap.meter.analyzer.MetricRuleConfig;
import java.util.List;
@Data
public class ZabbixConfig implements MetricRuleConfig {
private String metricPrefix;
private String expSuffix;
private Entities entities;
private List<String> requiredZabbixItemKeys;
private List<Metric> metrics;
@Override
public List<? extends RuleConfig> getMetricsRules() {
return metrics;
}
@Data
public static class Entities {
private List<String> hostPatterns;
private List<EntityLabel> labels;
}
@Data
public static class EntityLabel {
private String name;
private String fromItem;
private String value;
}
@Data
public static class Metric implements RuleConfig {
private String name;
private String exp;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
public class ZabbixConfigs {
public static List<ZabbixConfig> loadConfigs(String path, List<String> fileNames) throws ModuleStartException {
if (CollectionUtils.isEmpty(fileNames)) {
return Collections.emptyList();
}
File[] configs;
try {
configs = ResourceUtils.getPathFiles(path);
} catch (FileNotFoundException e) {
throw new ModuleStartException("Load zabbix configs failed", e);
}
return Arrays.stream(configs).filter(File::isFile)
.map(f -> {
String fileName = f.getName();
int dotIndex = fileName.lastIndexOf('.');
fileName = (dotIndex == -1) ? fileName : fileName.substring(0, dotIndex);
if (!fileNames.contains(fileName)) {
return null;
}
try (Reader r = new FileReader(f)) {
return new Yaml().loadAs(r, ZabbixConfig.class);
} catch (IOException e) {
log.warn("Reading file {} failed", f, e);
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
/**
* Decode content is not the Zabbix protocol
*/
public class ZabbixErrorProtocolException extends Exception {
public ZabbixErrorProtocolException(final String message) {
super(message);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import io.netty.channel.CombinedChannelDuplexHandler;
public class ZabbixProtocolDataCodec extends CombinedChannelDuplexHandler<ZabbixProtocolDecoder, ZabbixProtocolEncoder> {
public ZabbixProtocolDataCodec() {
init(new ZabbixProtocolDecoder(), new ZabbixProtocolEncoder());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequestJsonDeserializer;
import java.util.List;
@Slf4j
public class ZabbixProtocolDecoder extends ByteToMessageDecoder {
private static final int HEADER_LEN = 9;
private static final byte[] PROTOCOL = new byte[] {'Z', 'B', 'X', 'D'};
private final Gson requestParser = new GsonBuilder()
.registerTypeAdapter(ZabbixRequest.class, new ZabbixRequestJsonDeserializer()).create();
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
try {
// Decode header and get payload
String payload = decodeToPayload(channelHandlerContext, byteBuf);
if (payload == null) {
return;
}
// Parse content and add to list
ZabbixRequest request = requestParser.fromJson(payload, ZabbixRequest.class);
list.add(request);
} catch (Exception e) {
errorProtocol(channelHandlerContext, byteBuf, "Parsing zabbix request data error", e);
}
}
/**
* Decode protocol to payload string
*/
public String decodeToPayload(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws InterruptedException, ZabbixErrorProtocolException {
int readable = byteBuf.readableBytes();
int baseIndex = byteBuf.readerIndex();
if (readable < HEADER_LEN) {
byteBuf.readerIndex(baseIndex);
return null;
}
// Read header
ByteBuf headerBuf = byteBuf.readSlice(HEADER_LEN);
if (headerBuf.getByte(0) != PROTOCOL[0] || headerBuf.getByte(1) != PROTOCOL[1]
|| headerBuf.getByte(2) != PROTOCOL[2] || headerBuf.getByte(3) != PROTOCOL[3]) {
throw new ZabbixErrorProtocolException("header is not right");
}
// Only support communications protocol
if (headerBuf.getByte(4) != 1) {
throw new ZabbixErrorProtocolException("header flags only support communications protocol");
}
// Check payload
int dataLength = headerBuf.getByte(5) & 0xFF
| (headerBuf.getByte(6) & 0xFF) << 8
| (headerBuf.getByte(7) & 0xFF) << 16
| (headerBuf.getByte(8) & 0xFF) << 24;
int totalLength = HEADER_LEN + dataLength + 4;
// If not receive all data, reset buffer and re-decode after content receive finish
if (readable < totalLength) {
byteBuf.readerIndex(baseIndex);
return null;
}
if (dataLength <= 0) {
throw new ZabbixErrorProtocolException("content could not be empty");
}
// Skip protocol extensions
byteBuf.skipBytes(4);
// Reading content
ByteBuf payload = byteBuf.readSlice(dataLength);
return payload.toString(Charsets.UTF_8);
}
/**
* Close connection if protocol error
*/
private void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
log.warn("Receive message is not Zabbix protocol, reason: {}", reason, ex);
// Skip all content
byteBuf.skipBytes(byteBuf.readableBytes());
// Close connection
context.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponse;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponseJsonSerializer;
import java.util.List;
public class ZabbixProtocolEncoder extends MessageToMessageEncoder<ZabbixResponse> {
private final Gson gson = new GsonBuilder()
.registerTypeAdapter(ZabbixResponse.class, new ZabbixResponseJsonSerializer()).create();
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ZabbixResponse zabbixResponse, List<Object> list) throws Exception {
String responsePayload = gson.toJson(zabbixResponse);
// Build header
int payloadLength = responsePayload.length();
byte[] header = new byte[] {
'Z', 'B', 'X', 'D', '\1',
(byte) (payloadLength & 0xFF),
(byte) (payloadLength >> 8 & 0xFF),
(byte) (payloadLength >> 16 & 0xFF),
(byte) (payloadLength >> 24 & 0xFF),
'\0', '\0', '\0', '\0'};
// Build and write ByteBuf
ByteBuf buffer = channelHandlerContext.alloc().buffer(header.length + payloadLength);
buffer.writeBytes(header);
buffer.writeBytes(responsePayload.getBytes(Charsets.UTF_8));
buffer.retain();
channelHandlerContext.writeAndFlush(buffer);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixMetrics;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixProtocolType;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponse;
import java.util.Collections;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Handle request on received the Zabbix data
*/
@Slf4j
public class ZabbixProtocolHandler extends SimpleChannelInboundHandler<ZabbixRequest> {
private final ZabbixMetrics metrics;
public ZabbixProtocolHandler(ZabbixMetrics metrics) {
this.metrics = metrics;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ZabbixRequest msg) {
if (msg.getType() == ZabbixProtocolType.ACTIVE_CHECKS) {
ZabbixResponse response = new ZabbixResponse();
response.setType(msg.getType());
String hostName = msg.getActiveChecks().getHostName();
// Get all active tasks
response.setActiveChecks(Optional.of(metrics.getAllMonitorMetricNames(hostName))
.map(s -> s.stream().map(key ->
ZabbixResponse.ActiveChecks.builder().delay(60).lastlogsize(0).key(key).mtime(0).build()
).collect(Collectors.toList())).orElse(Collections.emptyList()));
ctx.writeAndFlush(response);
} else {
ZabbixResponse response = new ZabbixResponse();
response.setType(msg.getType());
// Convert metrics to the meter system
ZabbixMetrics.ConvertStatics convertStatics;
try {
convertStatics = metrics.convertMetrics(msg.getAgentDataList());
} catch (Exception e) {
log.warn("Convert the Zabbix metrics error", e);
convertStatics = ZabbixMetrics.ConvertStatics.builder().total(1).failed(1).build();
}
response.setAgentData(ZabbixResponse.AgentData.builder()
.info(String.format("processed: %d; failed: %d; total: %d; seconds spent: %f",
convertStatics.getSuccess(),
convertStatics.getFailed(),
convertStatics.getTotal(),
convertStatics.getUseTime()))
.build());
ctx.writeAndFlush(response);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixMetrics;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixModuleConfig;
@Slf4j
public class ZabbixServer {
private final ZabbixModuleConfig config;
private final ZabbixMetrics metrics;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
private Channel serverChannel;
public ZabbixServer(final ZabbixModuleConfig config,
final ZabbixMetrics metrics) {
this.config = config;
this.metrics = metrics;
}
/**
* Start zabbix receive server
*/
public void start() throws Exception {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("TCP-BOSS-THREAD-%d").build());
this.workerGroup = new NioEventLoopGroup(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("TCP-WORKER-THREAD-%d").build());
ServerBootstrap bootstrap = new ServerBootstrap()
// All server using same group
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ZabbixServer.this.initChannel(channel);
}
});
serverChannel = bootstrap.bind(config.getHost(), config.getPort()).sync().channel();
log.info("Zabbix receiver started at port: {}", config.getPort());
}
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
// encoder and decoder
pipeline.addLast(new ZabbixProtocolDataCodec());
// handler
pipeline.addLast(new ZabbixProtocolHandler(metrics));
}
/**
* Stop zabbix receive server
*/
public void stop() {
serverChannel.close().syncUninterruptibly();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean;
import lombok.Getter;
import java.util.Objects;
/**
* Zabbix protocol type
*/
@Getter
public enum ZabbixProtocolType {
ACTIVE_CHECKS("active checks"),
AGENT_DATA("agent data")
;
private String name;
ZabbixProtocolType(String name) {
this.name = name;
}
/**
* Parse type by name
*/
public static ZabbixProtocolType parse(String name) {
for (ZabbixProtocolType type : ZabbixProtocolType.values()) {
if (Objects.equals(type.name, name)) {
return type;
}
}
return null;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean;
import lombok.Data;
import java.util.List;
@Data
public class ZabbixRequest {
/**
* Request type
*/
private ZabbixProtocolType type;
/**
* Active checks data
* @see ZabbixProtocolType#ACTIVE_CHECKS
*/
private ActiveChecks activeChecks;
/**
* Agent push data
* @see ZabbixProtocolType#AGENT_DATA
*/
private List<AgentData> agentDataList;
@Data
public static class ActiveChecks {
private String hostName;
}
@Data
public static class AgentData {
private String host;
private String key;
private String value;
private int id;
private long clock;
private long ns;
private int state;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.List;
/**
* Deserialize request from the Zabbix
*/
public class ZabbixRequestJsonDeserializer implements JsonDeserializer<ZabbixRequest> {
@Override
public ZabbixRequest deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
String requestTypeString = json.getAsJsonObject().get("request").getAsString();
// Check has support request type
ZabbixProtocolType requestType = ZabbixProtocolType.parse(requestTypeString);
if (requestType == null) {
throw new JsonParseException("Current request type is not support:" + requestTypeString);
}
// Build data
ZabbixRequest data = new ZabbixRequest();
data.setType(requestType);
if (requestType == ZabbixProtocolType.AGENT_DATA) {
data.setAgentDataList(context
.deserialize(json.getAsJsonObject().getAsJsonArray("data"),
new TypeToken<List<ZabbixRequest.AgentData>>() {
}.getType()));
} else if (requestType == ZabbixProtocolType.ACTIVE_CHECKS) {
ZabbixRequest.ActiveChecks checksData = new ZabbixRequest.ActiveChecks();
checksData.setHostName(json.getAsJsonObject().get("host").getAsString());
data.setActiveChecks(checksData);
}
return data;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import java.util.List;
@Data
public class ZabbixResponse {
/**
* Protocol type
*/
private ZabbixProtocolType type;
private List<ActiveChecks> activeChecks;
private AgentData agentData;
@Data
@Builder
public static class ActiveChecks {
private String key;
private int delay;
private int lastlogsize;
private int mtime;
}
@Getter
@Builder
public static class AgentData {
private String info;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import java.lang.reflect.Type;
/**
* Build the Zabbix response json
*/
public class ZabbixResponseJsonSerializer implements JsonSerializer<ZabbixResponse> {
@Override
public JsonElement serialize(ZabbixResponse src, Type typeOfSrc, JsonSerializationContext context) {
ZabbixProtocolType type = src.getType();
JsonObject response = new JsonObject();
response.addProperty("response", "success");
if (type == ZabbixProtocolType.ACTIVE_CHECKS) {
response.add("data", new Gson().toJsonTree(src.getActiveChecks()));
} else if (type == ZabbixProtocolType.AGENT_DATA) {
response.addProperty("info", src.getAgentData().getInfo());
}
return response;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.zabbix.module.ZabbixReceiverModule
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixReceiverProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import java.net.SocketTimeoutException;
import lombok.SneakyThrows;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixErrorProtocolException;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolDecoder;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolHandler;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixServer;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixProtocolType;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
public abstract class ZabbixBaseTest {
private static final String TCP_HOST = "0.0.0.0";
private static final int TCP_PORT = 10051;
protected ZabbixServer zabbixServer;
protected SocketClient socketClient;
protected ZabbixMetrics zabbixMetrics;
/**
* Customize the Zabbix metrics
*/
protected abstract ZabbixMetrics buildZabbixMetrics() throws Exception;
@Before
public void setupService() throws Throwable {
// Startup server
ZabbixModuleConfig config = new ZabbixModuleConfig();
config.setPort(TCP_PORT);
config.setHost(TCP_HOST);
zabbixMetrics = buildZabbixMetrics();
zabbixServer = new ZabbixServerWrapper(config, zabbixMetrics);
zabbixServer.start();
}
@After
public void cleanup() {
zabbixServer.stop();
}
/**
* Verify request error protocol
*/
public void assertWriteErrorProtocol(byte[] data) throws Throwable {
startupSocketClient();
try {
socketClient.socket.getOutputStream().write(data);
for (int i = 0; i < 10; i++) {
// No response
if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
return ;
}
TimeUnit.MILLISECONDS.sleep(500);
}
throw new IllegalStateException("Could not detect protocol error");
} finally {
stopSocketClient();
}
}
/**
* Assert need more input to server
*/
public void assertNeedMoreInput(byte[] data) throws Throwable {
startupSocketClient();
try {
socketClient.socket.getOutputStream().write(data);
try {
for (int i = 0; i < 10; i++) {
// No response
if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
return ;
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (SocketTimeoutException e) {
// Read timeout mean need more content
return;
}
throw new IllegalStateException("Could not detect need more input error");
} finally {
stopSocketClient();
}
}
/**
* Verify Active checks item names
*/
public void assertZabbixActiveChecksResponse(String body, String... itemNames) {
Assert.assertNotNull(body);
JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
JsonObject rootObject = bodyRoot.getAsJsonObject();
// Basic response status
Assert.assertEquals("success", rootObject.get("response").getAsString());
// Active Checks
Assert.assertNotNull(rootObject.get("data"));
JsonArray activeChecks = rootObject.getAsJsonArray("data");
Assert.assertEquals(itemNames.length, activeChecks.size());
for (String itemName : itemNames) {
boolean found = false;
for (JsonElement perCheck : activeChecks) {
JsonObject curCheck = perCheck.getAsJsonObject();
String itemKey = curCheck.get("key").getAsString();
if (Objects.equals(itemKey, itemName)) {
Assert.assertTrue(curCheck.get("delay").getAsInt() > 0);
Assert.assertTrue(curCheck.get("lastlogsize").getAsInt() >= 0);
Assert.assertTrue(curCheck.get("mtime").getAsInt() >= 0);
found = true;
}
}
if (!found) {
throw new AssertionError("Could not found " + itemName + " in Active Checks response");
}
}
}
/**
* Verify Zabbix agent data response
*/
public void assertZabbixAgentDataResponse(String body) {
Assert.assertNotNull(body);
JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
JsonObject rootObject = bodyRoot.getAsJsonObject();
// Basic response status
Assert.assertEquals("success", rootObject.get("response").getAsString());
// Agent data info
Assert.assertNotNull(rootObject.get("info"));
Assert.assertTrue(StringUtil.isNotEmpty(rootObject.get("info").getAsString()));
}
/**
* Verify Zabbix Active Checks request data
*/
public void assertZabbixActiveChecksRequest(int inx, String hostName) {
ZabbixRequest request = assertZabbixRequestBasic(inx, ZabbixProtocolType.ACTIVE_CHECKS);
Assert.assertNotNull(request.getActiveChecks());
Assert.assertEquals(hostName, request.getActiveChecks().getHostName());
}
/**
* Verify Zabbix Agent data request data
*/
public void assertZabbixAgentDataRequest(int inx, String hostName, String... keyNames) {
ZabbixRequest request = assertZabbixRequestBasic(inx, ZabbixProtocolType.AGENT_DATA);
List<ZabbixRequest.AgentData> agentDataList = request.getAgentDataList();
Assert.assertNotNull(agentDataList);
Assert.assertEquals(keyNames.length, agentDataList.size());
for (String keyName : keyNames) {
boolean found = false;
for (ZabbixRequest.AgentData agentData : agentDataList) {
if (Objects.equals(keyName, agentData.getKey())) {
Assert.assertEquals(hostName, agentData.getHost());
Assert.assertTrue(NumberUtils.isParsable(agentData.getValue()) ?
Double.parseDouble(agentData.getValue()) > 0 : StringUtil.isNotBlank(agentData.getValue()));
Assert.assertTrue(agentData.getId() > 0);
Assert.assertTrue(agentData.getClock() > 0);
Assert.assertTrue(agentData.getNs() > 0);
Assert.assertTrue(agentData.getState() == 0);
found = true;
}
}
if (!found) {
// Throw exception when key not found
throw new AssertionError("Could not found " + keyName + " in Agent data request");
}
}
}
/**
* Verify zabbix request basic info
*/
private ZabbixRequest assertZabbixRequestBasic(int inx, ZabbixProtocolType protocolType) {
List<ZabbixRequest> requests = socketClient.requests;
Assert.assertNotNull(requests);
Assert.assertTrue(requests.size() > inx);
ZabbixRequest request = requests.get(inx);
Assert.assertEquals(protocolType, request.getType());
return request;
}
/**
* Startup a new socket client to server
*/
protected void startupSocketClient() throws Throwable {
socketClient = Optional.ofNullable(this.socketClient).orElseGet(SocketClient::new);
socketClient.startup();
}
/**
* Close the client
*/
protected void stopSocketClient() {
Optional.ofNullable(socketClient).ifPresent(SocketClient::stop);
socketClient = null;
}
/**
* Connect to receiver server
*/
protected static class SocketClient {
private ZabbixProtocolHandler protocolHandler;
private Throwable spyHandlerException;
private Socket socket;
private List<ZabbixRequest> requests;
private void startup() throws Throwable {
if (socket != null) {
return;
}
socket = new Socket();
socket.setSoTimeout(2000);
socket.connect(new InetSocketAddress(TCP_HOST, TCP_PORT));
// Waiting for connection
while (!socket.isConnected() || (protocolHandler == null && spyHandlerException == null)) {
TimeUnit.SECONDS.sleep(1);
}
if (spyHandlerException != null) {
throw spyHandlerException;
}
// Intercept message received
requests = new ArrayList<>();
doAnswer((Answer<Object>) invocationOnMock -> {
requests.add(invocationOnMock.getArgument(1));
return invocationOnMock.callRealMethod();
}).when(protocolHandler).channelRead0(any(), any());
}
@SneakyThrows
private void stop() {
if (socket != null && socket.isConnected()) {
socket.close();
}
}
public void writeZabbixMessage(String message) throws IOException {
this.socket.getOutputStream().write(buildZabbixRequestData(message));
}
public static byte[] buildZabbixRequestData(String content) {
// Build header
byte[] payload = content.getBytes();
int payloadLength = payload.length;
byte[] header = new byte[] {
'Z', 'B', 'X', 'D', '\1',
(byte) (payloadLength & 0xFF),
(byte) (payloadLength >> 8 & 0xFF),
(byte) (payloadLength >> 16 & 0xFF),
(byte) (payloadLength >> 24 & 0xFF),
'\0', '\0', '\0', '\0'};
byte[] packet = new byte[header.length + payloadLength];
System.arraycopy(header, 0, packet, 0, header.length);
System.arraycopy(payload, 0, packet, header.length, payloadLength);
return packet;
}
/**
* Finding and spy the Zabbix handler
*/
private void spyHandler(SocketChannel channel) {
Object tailContext = Whitebox.getInternalState(channel.pipeline(), "tail");
Object handlerContext = Whitebox.getInternalState(tailContext, "prev");
ZabbixProtocolHandler handler = spyHandler(handlerContext, ZabbixProtocolHandler.class);
if (handler == null) {
throw new IllegalStateException("Unnable to find Zabbix protocol handler");
}
protocolHandler = handler;
}
private <T> T spyHandler(Object handlerContext, Class<T> handlerCls) {
if (handlerContext == null || handlerContext.getClass().getSimpleName().contains("HeadContext")) {
return null;
}
Object handler = Whitebox.getInternalState(handlerContext, "handler");
if (handler.getClass().equals(handlerCls)) {
Object realHandler = spy(handler);
Whitebox.setInternalState(handlerContext, "handler", realHandler);
return (T) realHandler;
} else {
return spyHandler(Whitebox.getInternalState(handlerContext, "prev"), handlerCls);
}
}
private byte[] readAllContent(InputStream inputStream) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(512);
byte[] buffer = new byte[512];
int len;
while ((len = inputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, len);
if (len != buffer.length) {
break;
}
}
return outputStream.toByteArray();
}
public String waitAndGetResponsePayload() throws InterruptedException, IOException, ZabbixErrorProtocolException {
ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
ByteBuf byteBuf = Unpooled.copiedBuffer(readAllContent(socket.getInputStream()));
return new ZabbixProtocolDecoder().decodeToPayload(channelHandlerContext, byteBuf);
}
}
/**
* Zabbix binder wrapper, support spy Zabbix message received data
*/
private class ZabbixServerWrapper extends ZabbixServer {
public ZabbixServerWrapper(ZabbixModuleConfig config, ZabbixMetrics zabbixMetrics) {
super(config, zabbixMetrics);
}
@Override
public void initChannel(SocketChannel channel) {
super.initChannel(channel);
try {
socketClient.spyHandler(channel);
} catch (Throwable e) {
socketClient.spyHandlerException = e;
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider;
import com.google.common.collect.Maps;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgLabeledFunction;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.config.ZabbixConfig;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.config.ZabbixConfigs;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class ZabbixMetricsTest extends ZabbixBaseTest {
protected CoreModuleProvider moduleProvider;
protected ModuleManager moduleManager;
protected MeterSystem meterSystem;
private List<AcceptableValue> values = new ArrayList<>();
@Override
public void setupService() throws Throwable {
moduleProvider = Mockito.mock(CoreModuleProvider.class);
moduleManager = Mockito.mock(ModuleManager.class);
// prepare the context
meterSystem = Mockito.spy(new MeterSystem(moduleManager));
Whitebox.setInternalState(MetricsStreamProcessor.class, "PROCESSOR",
Mockito.spy(MetricsStreamProcessor.getInstance()));
doNothing().when(MetricsStreamProcessor.getInstance()).create(any(), (StreamDefinition) any(), any());
CoreModule coreModule = Mockito.spy(CoreModule.class);
Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
when(moduleProvider.getService(MeterSystem.class)).thenReturn(meterSystem);
// prepare the meter functions
final HashMap<String, Class> map = Maps.newHashMap();
map.put("avg", AvgFunction.class);
map.put("avgLabeled", AvgLabeledFunction.class);
map.put("avgHistogram", AvgHistogramFunction.class);
map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class);
Whitebox.setInternalState(meterSystem, "functionRegister", map);
super.setupService();
}
@Override
protected ZabbixMetrics buildZabbixMetrics() throws Exception {
// Notifies meter system received metric
doAnswer(invocationOnMock -> {
values.add(invocationOnMock.getArgument(0, AcceptableValue.class));
return null;
}).when(meterSystem).doStreamingCalculation(any());
// load context
List<ZabbixConfig> zabbixConfigs = ZabbixConfigs.loadConfigs(ZabbixModuleConfig.CONFIG_PATH, Arrays.asList("agent"));
return new ZabbixMetrics(zabbixConfigs, meterSystem);
}
@Test
public void testReceiveMetrics() throws Throwable {
startupSocketClient();
// Verify Active Checks
socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}");
String activeChecksRespData = socketClient.waitAndGetResponsePayload();
assertZabbixActiveChecksRequest(0, "test-01");
assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
// Verify Agent data
socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg1]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg5]\",\"value\":\"2.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"3.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"agent.hostname\",\"value\":\"test-01-hostname\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}" +
"],\"clock\":1609588568,\"ns\":102244476}");
String agentDataRespData = socketClient.waitAndGetResponsePayload();
assertZabbixAgentDataRequest(1, "test-01", "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
assertZabbixAgentDataResponse(agentDataRespData);
// Verify meter system received data
Assert.assertEquals(1, values.size());
AvgLabeledFunction avgLabeledFunction = (AvgLabeledFunction) values.get(0);
String serviceId = IDManager.ServiceID.buildId("zabbix::test-01-hostname", true);
Assert.assertEquals(serviceId, avgLabeledFunction.getEntityId());
Assert.assertEquals(serviceId, avgLabeledFunction.getServiceId());
Assert.assertEquals(1, avgLabeledFunction.getSummation().get("avg1"), 0.0);
Assert.assertEquals(2, avgLabeledFunction.getSummation().get("avg5"), 0.0);
Assert.assertEquals(3, avgLabeledFunction.getSummation().get("avg15"), 0.0);
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg1"), 0.0);
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg5"), 0.0);
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg15"), 0.0);
stopSocketClient();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol;
import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixBaseTest;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.ZabbixMetrics;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
@Override
protected ZabbixMetrics buildZabbixMetrics() throws ModuleStartException, Exception {
ZabbixMetrics metrics = mock(ZabbixMetrics.class);
// mock zabbix metrics
when(metrics.getAllMonitorMetricNames(any())).thenReturn(ImmutableSet.<String>builder().add("system.cpu.load[all,avg15]").build());
when(metrics.convertMetrics(any())).thenReturn(ZabbixMetrics.ConvertStatics.EMPTY);
return metrics;
}
/**
* Test active tasks and agent data request and response
*/
@Test
public void testReceive() throws Throwable {
startupSocketClient();
// Verify Active Checks
socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}");
String activeChecksRespData = socketClient.waitAndGetResponsePayload();
assertZabbixActiveChecksRequest(0, "zabbix-test-agent");
assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg15]");
// Verify Agent data
socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}");
String agentDataRespData = socketClient.waitAndGetResponsePayload();
assertZabbixAgentDataRequest(1, "zabbix-test-agent", "system.cpu.load[all,avg15]");
assertZabbixAgentDataResponse(agentDataRespData);
stopSocketClient();
}
/**
* Test error protocol
*/
@Test
public void testErrorProtocol() throws Throwable {
// Simple header
for (int i = 1; i < 5; i++) {
assertNeedMoreInput(new byte[i]);
}
// Only header string
assertNeedMoreInput(new byte[] {'Z', 'B', 'X', 'D'});
// Header error
assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 0, 0, 0, 0});
assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 1, 0, 0, 0});
// Empty data
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData(""));
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{}"));
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{\"test\": 1}"));
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
metricPrefix: meter_agent
expSuffix: tag({tags -> tags.cur_host_name = 'zabbix::' + tags.cur_host_name}).service(['cur_host_name'])
entities:
hostPatterns:
- test.+
labels:
- name: temp_value
value: all
- name: cur_host_name
fromItem: agent.hostname
requiredZabbixItemKeys:
- system.cpu.load[all,avg1]
- system.cpu.load[all,avg5]
- system.cpu.load[all,avg15]
metrics:
- name: system_cpu_load
exp: system_cpu_load.avg(['2', 'cur_host_name'])
\ No newline at end of file
......@@ -44,6 +44,7 @@ import org.apache.skywalking.e2e.metrics.MetricsData;
import org.apache.skywalking.e2e.metrics.MetricsQuery;
import org.apache.skywalking.e2e.metrics.MultiMetricsData;
import org.apache.skywalking.e2e.metrics.ReadLabeledMetricsData;
import org.apache.skywalking.e2e.metrics.ReadLabeledMetricsQuery;
import org.apache.skywalking.e2e.metrics.ReadMetrics;
import org.apache.skywalking.e2e.metrics.ReadMetricsData;
import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
......@@ -343,7 +344,7 @@ public class SimpleQueryClient {
return Objects.requireNonNull(responseEntity.getBody()).getData().getReadMetricsValues();
}
public List<ReadMetrics> readLabeledMetrics(final ReadMetricsQuery query) throws Exception {
public List<ReadMetrics> readLabeledMetrics(final ReadLabeledMetricsQuery query) throws Exception {
final URL queryFileUrl = Resources.getResource("read-labeled-metrics.gql");
final String queryString = Resources.readLines(queryFileUrl, StandardCharsets.UTF_8)
.stream()
......@@ -354,7 +355,10 @@ public class SimpleQueryClient {
.replace("{end}", query.end())
.replace("{metricsName}", query.metricsName())
.replace("{serviceName}", query.serviceName())
.replace("{instanceName}", query.instanceName());
.replace("{instanceName}", query.instanceName())
.replace("{scope}", query.scope())
.replace("{labels}", query.labels().stream()
.map(s -> "\"" + s + "\"").collect(Collectors.joining(",")));
LOGGER.info("Query: {}", queryString);
final ResponseEntity<GQLResponse<ReadLabeledMetricsData>> responseEntity = restTemplate.exchange(
new RequestEntity<>(queryString, HttpMethod.POST, URI.create(endpointUrl)),
......
......@@ -18,11 +18,16 @@
package org.apache.skywalking.e2e.metrics;
import com.google.common.collect.ImmutableMap;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.skywalking.e2e.AbstractQuery;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Data
@Accessors(fluent = true)
@EqualsAndHashCode(callSuper = true)
......@@ -149,4 +154,10 @@ public class MetricsQuery extends AbstractQuery<MetricsQuery> {
METER_PROCESS_FILES_MAX
};
public static Map<String, List<String>> SIMPLE_ZABBIX_METERS = ImmutableMap.<String, List<String>>builder()
.put("meter_agent_system_cpu_util", Arrays.asList("idle"))
.put("meter_agent_vm_memory_size", Arrays.asList("total"))
.put("meter_agent_vfs_fs_size", Arrays.asList("/-total"))
.build();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.metrics;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.skywalking.e2e.AbstractQuery;
import java.util.List;
@Data
@Accessors(fluent = true)
@EqualsAndHashCode(callSuper = true)
public class ReadLabeledMetricsQuery extends AbstractQuery<ReadLabeledMetricsQuery> {
private String metricsName;
private String serviceName;
private String instanceName;
private String scope = "ServiceInstance";
private List<String> labels;
}
......@@ -24,17 +24,14 @@
"condition":{
"name":"{metricsName}",
"entity":{
"scope":"ServiceInstance",
"scope":"{scope}",
"serviceName":"{serviceName}",
"serviceInstanceName":"{instanceName}",
"normal":true
}
},
"labels":[
"50",
"70",
"90",
"99"
{labels}
]
}
}
\ No newline at end of file
......@@ -21,6 +21,7 @@ services:
expose:
- 11800
- 12800
- 10051
- 5005
networks:
- e2e
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
metricPrefix: meter_agent
expSuffix: tag({tags -> tags.host = 'vm::' + tags.host}).service(['host'])
entities:
hostPatterns:
- .+
labels:
requiredZabbixItemKeys:
- system.cpu.load[all,avg15]
- system.cpu.load[all,avg1]
- system.cpu.load[all,avg5]
- system.cpu.util[,guest]
- system.cpu.util[,guest_nice]
- system.cpu.util[,idle]
- system.cpu.util[,interrupt]
- system.cpu.util[,iowait]
- system.cpu.util[,nice]
- system.cpu.util[,softirq]
- system.cpu.util[,steal]
- system.cpu.util[,system]
- system.cpu.util[,user]
- system.swap.size[,free]
- system.swap.size[,total]
- vfs.fs.inode[/,pfree]
- vfs.fs.size[/,total]
- vfs.fs.size[/,used]
- vm.memory.size[available]
- vm.memory.size[total]
metrics:
- name: system_cpu_load
exp: system_cpu_load.avg(['2', 'host'])
- name: system_cpu_util
exp: system_cpu_util.avg(['2', 'host'])
- name: system_swap_size
exp: system_swap_size.avg(['2', 'host'])
- name: vfs_fs_inode
exp: vfs_fs_inode.avg(['1', '2', 'host'])
- name: vfs_fs_size
exp: vfs_fs_size.avg(['1', '2', 'host'])
- name: vm_memory_size
exp: vm_memory_size.avg(['1', 'host'])
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
oap:
extends:
file: ../base-compose.yml
service: oap
volumes:
- ./agent.yaml:/skywalking/config/zabbix-rules/agent.yaml
zabbix-client:
image: zabbix/zabbix-agent:latest
networks:
- e2e
volumes:
- ./zabbix_agentd.conf:/etc/zabbix/zabbix_agentd.d/zabbix_agentd.conf
depends_on:
oap:
condition: service_healthy
ui:
extends:
file: ../base-compose.yml
service: ui
depends_on:
oap:
condition: service_healthy
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
LogFile=/var/log/zabbix/zabbix_agentd.log
StartAgents=0
# Should setting to the SkyWalking backend Zabbix receiver port
ServerActive=oap:10051
Hostname=Zabbix-agent-docker-client
RefreshActiveChecks=60
\ No newline at end of file
......@@ -17,6 +17,7 @@
package org.apache.skywalking.e2e.kafka;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
......@@ -29,6 +30,7 @@ import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.Metrics;
import org.apache.skywalking.e2e.metrics.MetricsQuery;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.metrics.ReadLabeledMetricsQuery;
import org.apache.skywalking.e2e.metrics.ReadMetrics;
import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
import org.apache.skywalking.e2e.retryable.RetryableTest;
......@@ -200,8 +202,9 @@ public class KafkaE2E extends SkyWalkingTestAdapter {
for (String metricsName : ALL_SO11Y_LABELED_METRICS) {
LOGGER.info("verifying service instance response time: {}", instance);
final List<ReadMetrics> instanceMetrics = graphql.readLabeledMetrics(
new ReadMetricsQuery().stepByMinute().metricsName(metricsName)
new ReadLabeledMetricsQuery().stepByMinute().metricsName(metricsName)
.serviceName(service.getLabel()).instanceName(instance.getLabel())
.labels(Arrays.asList("50", "70", "90", "99"))
);
LOGGER.info("{}: {}", metricsName, instanceMetrics);
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.e2e.storage;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
......@@ -40,6 +41,7 @@ import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.Metrics;
import org.apache.skywalking.e2e.metrics.MetricsQuery;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.metrics.ReadLabeledMetricsQuery;
import org.apache.skywalking.e2e.metrics.ReadMetrics;
import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
import org.apache.skywalking.e2e.retryable.RetryableTest;
......@@ -262,8 +264,9 @@ public class StorageE2E extends SkyWalkingTestAdapter {
for (String metricsName : ALL_SO11Y_LABELED_METRICS) {
LOGGER.info("verifying service instance response time: {}", instance);
final List<ReadMetrics> instanceMetrics = graphql.readLabeledMetrics(
new ReadMetricsQuery().stepByMinute().metricsName(metricsName)
new ReadLabeledMetricsQuery().stepByMinute().metricsName(metricsName)
.serviceName(service.getLabel()).instanceName(instance.getLabel())
.labels(Arrays.asList("50", "70", "90", "99"))
);
LOGGER.info("{}: {}", metricsName, instanceMetrics);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.zabbix;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.e2e.UIConfigurationManagementClient;
import org.apache.skywalking.e2e.annotation.ContainerHostAndPort;
import org.apache.skywalking.e2e.annotation.DockerCompose;
import org.apache.skywalking.e2e.base.SkyWalkingE2E;
import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter;
import org.apache.skywalking.e2e.common.HostAndPort;
import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.Metrics;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.metrics.ReadLabeledMetricsQuery;
import org.apache.skywalking.e2e.metrics.ReadMetrics;
import org.apache.skywalking.e2e.retryable.RetryableTest;
import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesMatcher;
import org.apache.skywalking.e2e.service.ServicesQuery;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.DockerComposeContainer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.SIMPLE_ZABBIX_METERS;
import static org.apache.skywalking.e2e.utils.Times.now;
import static org.apache.skywalking.e2e.utils.Yamls.load;
@Slf4j
@SkyWalkingE2E
public class ZabbixE2E extends SkyWalkingTestAdapter {
@DockerCompose({"docker/zabbix/docker-compose.yml"})
private DockerComposeContainer<?> compose;
@ContainerHostAndPort(name = "ui", port = 8080)
private HostAndPort swWebappHostPort;
private UIConfigurationManagementClient graphql;
@BeforeAll
public void setUp() throws Exception {
graphql = new UIConfigurationManagementClient(swWebappHostPort.host(), swWebappHostPort.port());
}
@RetryableTest
void testMetrics() throws Exception {
List<Service> services = graphql.services(new ServicesQuery().start(startTime).end(now()));
services = services.stream().filter(s -> !s.getLabel().equals("oap::oap-server")).collect(Collectors.toList());
LOGGER.info("services: {}", services);
load("expected/zabbix/services.yml").as(ServicesMatcher.class).verify(services);
Service service = services.get(0);
for (Map.Entry<String, List<String>> entry : SIMPLE_ZABBIX_METERS.entrySet()) {
String meterName = entry.getKey();
List<String> labels = entry.getValue();
LOGGER.info("verifying zabbix meter: {}", meterName);
List<ReadMetrics> metrics = null;
try {
metrics = graphql.readLabeledMetrics(
new ReadLabeledMetricsQuery().stepByMinute().metricsName(meterName)
.serviceName(service.getLabel()).scope("Service").instanceName("")
.labels(labels)
);
} catch (Exception e) {
LOGGER.error("Error", e);
}
LOGGER.info("zabbix metrics: {}", metrics);
Metrics allValues = new Metrics();
for (ReadMetrics readMetrics : metrics) {
allValues.getValues().addAll(readMetrics.getValues().getValues());
}
final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
instanceRespTimeMatcher.verify(allValues);
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
services:
- key: not null
label: "vm::Zabbix-agent-docker-client"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册