未验证 提交 b635e254 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Provide Meter(metrics) system (#4694)

Here are the basic and simple usages of the MeterSystem APIs.

## Meter Creation
New meter could be created based on metrics name, function, scope and data type. 
1. Metrics name is still the storage entity name.
2. Functions are every similar with the OAL function, just accepting different input.
```java
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
```
NOTICE, this creation should only be called in the `module#prepare` stage, otherwise, `Can't create new metrics anymore` exception will be raised after the **CORE** module `start` stage finished. You may find out, there is a chance `create` could be executed successfully in your own `module#start` stage, but it it just because of the sequence of provider loaded by the class loader, **no guarantee, so please don't do that**.

## Runtime Calculation
`AcceptableValue` is the object created at the runtime to accept new metrics value.
```java
         final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
                value.accept(MeterEntity.newService("abc"), 5L);
                value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
                service.doStreamingCalculation(value);
            }
        }, 2, 2, TimeUnit.SECONDS);
```

## Meter Functions
Right now, only `avg` function has been implemented. I submit this PR as soon as possible to get your feedback. I will add more functions.

## Notice
1. Make slow trace query available in the sampled record. Logically, they are the same thing. The UI doesn't need to concern about the trace as a special case.
2. Endpoint dependency will be removed from the dashboard. Because no query available for it. We will provide a new page for that in 8.1
3. Comparison page will be removed due to dashboard is powerful enough to replace it.
上级 776d5080
......@@ -20,7 +20,7 @@ in Cloud Native architecture.
The core features are following.
- Service, service instance, endpoint metrics analysis
- Root cause analysis. Profile the code on the runtime.
- Root cause analysis. Profile the code on the runtime. Read [Apache SkyWalking: Use Profiling to Fix the Blind Spot of Distributed Tracing](https://thenewstack.io/apache-skywalking-use-profiling-to-fix-the-blind-spot-of-distributed-tracing/).
- Service topology map analysis
- Service, service instance and endpoint dependency analysis
- Slow services and endpoints detected
......@@ -30,18 +30,17 @@ The core features are following.
- Alarm
<img src="http://skywalking.apache.org/assets/frame.jpeg?u=20190518"/>
<img src="http://skywalking.apache.org/assets/frame-v8.jpg?u=20200423"/>
SkyWalking supports to collect telemetry (traces and metrics) data from multiple sources
and multiple formats,
including
1. Java, [.NET Core](https://github.com/SkyAPM/SkyAPM-dotnet), [NodeJS](https://github.com/SkyAPM/SkyAPM-nodejs) and [PHP](https://github.com/SkyAPM/SkyAPM-php-sdk) auto-instrument agents in SkyWalking format
1. Java, [.NET Core](https://github.com/SkyAPM/SkyAPM-dotnet), [NodeJS](https://github.com/SkyAPM/SkyAPM-nodejs) and [PHP](https://github.com/SkyAPM/SkyAPM-php-sdk) auto-instrument agents.
1. [Go agent](https://github.com/tetratelabs/go2sky).
1. [LUA agent](https://github.com/apache/skywalking-nginx-lua), especially for Nginx, OpenResty.
1. Envoy gRPC Access Log Service (ALS) format in Istio controlled service mesh.
1. Istio telemetry format.
1. Envoy Metrics Service format.
1. Zipkin v1/v2 and Jaeger gRPC format with limited topology and metrics analysis.(Experimental)
1. Service Mesh Observability, including Envoy gRPC Access Log Service (ALS) format in Istio controlled service mesh, Istio telemetry.
1. Metrics system, including Prometheus, Spring Sleuth(Micrometer).
1. Zipkin v1/v2 and Jaeger gRPC format with limited topology and metrics analysis.(Experimental).
# Document
[8.x dev](docs/README.md) | [7.0](https://github.com/apache/skywalking/blob/v7.0.0/docs/README.md) | [6.6](https://github.com/apache/skywalking/blob/v6.6.0/docs/README.md), [6.5](https://github.com/apache/skywalking/blob/v6.5.0/docs/README.md).
......
# Meter System
Meter system is another streaming calculation mode, especially for metrics data. In the [OAL](oal.md), there are clear
[Scope Definitions](scope-definitions.md), including native objects. Meter system is focusing on the data type itself,
and provides more flexible to the end user to define the scope entity.
The meter system is open to different receivers and fetchers in the backend,
follow the [backend setup document](../setup/backend/backend-setup.md) for more details.
Every metrics is declared in the meter system should include following attribute
1. **Metrics Name**. An unique name globally, should avoid overlap the OAL variable names.
1. **Function Name**. The function used for this metrics, distributed aggregation, value calculation and down sampling calculation
based on the function implementation. Also, the data structure is determined by the function too, such as function Avg is for Long.
1. **Scope Type**. Unlike inside the OAL, there are plenty of logic scope definitions, in meter system, only type is required.
Type values include service, instance, and endpoint, like we introduced in the Overview.
The values of scope entity name, such as service name, are required when metrics data generated with the metrics data value.
NOTICE, the metrics must be declared in the bootstrap stage, no runtime changed.
Meter System supports following binding functions
- **Avg**, calculate the avg value for every entity in the same metrics name.
......@@ -5,10 +5,10 @@ It is a modern APM, specially designed for cloud native, container based distrib
## Why use SkyWalking?
SkyWalking provides solutions for observing and monitoring distributed systems, in many different scenarios. First of all,
like traditional approaches, SkyWalking provides auto instrument agents for services, such as Java, C#
and Node.js. Skywalking also offers a community contributed manual instrument SDK for Go (with calls out for Python and
C++ SDK contributions). In multilanguage, continuously deployed environments, cloud native infrastructures grow more powerful
but also more complex. SkyWalking's service mesh receiver allows SkyWalking to receive telemetry data from service mesh frameworks
like traditional approaches, SkyWalking provides auto instrument agents for services, such as Java, C#, Node.js, Go, PHP and Nginx LUA.
(with calls out for Python and C++ SDK contributions).
In multilanguage, continuously deployed environments, cloud native infrastructures grow more powerful but also more complex.
SkyWalking's service mesh receiver allows SkyWalking to receive telemetry data from service mesh frameworks
such as Istio/Envoy and Linkerd, allowing users to understanding the entire distributed system.
SkyWalking provides observability capabilities for **service**(s), **service instance**(s), **endpoint**(s). The terms Service,
......@@ -23,18 +23,19 @@ Instance and Endpoint are used everywhere today, so it is worth defining their s
SkyWalking allows users to understand the topology relationship between Services and Endpoints, to view the metrics of every
Service/Service Instance/Endpoint and to set alarm rules.
In addition, you can integrate distributed tracing useing SkyWalking native agents and SDKs with Zipkin, Jaeger
and OpenCensus.
In addition, you can integrate
1. Other distributed tracing useing SkyWalking native agents and SDKs with Zipkin, Jaeger and OpenCensus.
1. Other metrics systems, such as Prometheus, Sleuth(Micrometer).
## Architecture
SkyWalking is logically split into four parts: Probes, Platform backend, Storage and UI.
<img src="http://skywalking.apache.org/assets/frame.jpeg"/>
<img src="http://skywalking.apache.org/assets/frame-v8.jpg?u=20200423"/>
- **Probe**s collect data and reformat them for SkyWalking requirements (different probes support different sources).
- **Platform backend**, supports data aggregation, analysis and drives process flow from probes to the UI. It also provides
the pluggable capabilities for incoming formats (like Zipkin's), storage implementors and cluster management. You even can
customize aggregation and analysis by using [Observability Analysis Language](oal.md).
- **Platform backend**, supports data aggregation, analysis and drives process flow from probes to the UI. The analysis includes
SkyWalking natives traces and metrics, 3rd party, including Istio and Envoy telemetry, Zipkin trace format, etc. You even can
customize aggregation and analysis by using [Observability Analysis Language for native metrics](oal.md) and [Meter System for extension metrics](meter.md).
- **Storage** houses SkyWalking data through an open/plugable interface. You can choose an existing implementation, such as
ElasticSearch, H2 or a MySQL cluster managed by Sharding-Sphere, or implement your own. Patches for new storage implementors
welcome!
......
# Open Fetcher
Fetcher is a concept in SkyWalking backend. It uses pulling mode rather than [receiver](backend-receivers.md), which
read the data from the target systems. This mode is typically in some metrics SDKs, such as Prometheus.
## Prometheus Fetcher
```yaml
prometheus-fetcher:
selector: ${SW_PROMETHEUS_FETCHER:default}
default:
active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false}
```
TODO: More detail should be added when this fetcher provided.
\ No newline at end of file
# Choose receiver
Receiver is a concept in SkyWalking backend. All modules, which are responsible for receiving telemetry
or tracing data from other being monitored system, are all being called **Receiver**. Although today, most of
receivers are using gRPC or HTTPRestful to provide service, actually, whether listening mode or pull mode
could be receiver. Such as a receiver could base on pull data from remote, like Kakfa MQ.
or tracing data from other being monitored system, are all being called **Receiver**. If you are looking for the pull mode,
Take a look at [fetcher document](backend-fetcher.md).
We have following receivers, and `default` implementors are provided in our Apache distribution.
1. **receiver-trace**. gRPC and HTTPRestful services to accept SkyWalking format traces.
......@@ -10,10 +9,10 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **service-mesh**. gRPC services accept data from inbound mesh probes.
1. **receiver-jvm**. gRPC services accept JVM metrics data.
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics.
1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver-profile**. gRPC services accept profile task status and snapshot reporter.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
1. **receiver_jaeger**. See [details](#jaeger-receiver).
1. **receiver-profile**. gRPC services accept profile task status and snapshot reporter.
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......
......@@ -80,9 +80,11 @@ Read this before you try to initial a new cluster.
1. [Deploy in kubernetes](backend-k8s.md). Guide you to build and use SkyWalking image, and deploy in k8s.
1. [Choose storage](backend-storage.md). As we know, in default quick start, backend is running with H2
DB. But clearly, it doesn't fit the product env. In here, you could find what other choices do you have.
Choose the one you like, we are also welcome anyone to contribute new storage implementor,
Choose the ones you like, we are also welcome anyone to contribute new storage implementor.
1. [Set receivers](backend-receivers.md). You could choose receivers by your requirements, most receivers
are harmless, at least our default receivers are. You would set and active all receivers provided.
1. [Open fetchers](backend-fetcher.md). You could open different fetchers to read metrics from the target applications.
These ones works like receivers, but in pulling mode, typically like Prometheus.
1. [Token authentication](backend-token-auth.md). You could add token authentication mechanisms to avoid `OAP` receiving untrusted data.
1. Do [trace sampling](trace-sampling.md) at backend. This sample keep the metrics accurate, only don't save some of traces
in storage based on rate.
......
......@@ -46,10 +46,6 @@
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
......
......@@ -23,10 +23,11 @@ import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
public class MetricsHolder {
private static Map<String, Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>> REGISTER = new HashMap<>();
private static Map<String, Class<? extends Metrics>> REGISTER = new HashMap<>();
public static void init() throws IOException {
ClassPath classpath = ClassPath.from(MetricsHolder.class.getClassLoader());
......@@ -38,16 +39,16 @@ public class MetricsHolder {
MetricsFunction metricsFunction = aClass.getAnnotation(MetricsFunction.class);
REGISTER.put(
metricsFunction.functionName(),
(Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>) aClass
(Class<? extends Metrics>) aClass
);
}
}
}
public static Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> find(
public static Class<? extends Metrics> find(
String functionName) {
String func = functionName;
Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> metricsClass = REGISTER.get(
Class<? extends Metrics> metricsClass = REGISTER.get(
func);
if (metricsClass == null) {
throw new IllegalArgumentException("Can't find metrics, " + func);
......
......@@ -45,6 +45,7 @@
<module>server-configuration</module>
<module>server-bootstrap</module>
<module>server-tools</module>
<module>server-fetcher-plugin</module>
</modules>
<properties>
......
......@@ -17,7 +17,8 @@
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
......@@ -123,6 +124,14 @@
</dependency>
<!-- receiver module -->
<!-- fetcher module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>prometheus-fetcher-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- fetcher module -->
<!-- storage module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
......
......@@ -194,6 +194,11 @@ envoy-metric:
default:
alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""}
prometheus-fetcher:
selector: ${SW_PROMETHEUS_FETCHER:default}
default:
active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false}
receiver_zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
default:
......
......@@ -74,6 +74,10 @@
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
......@@ -71,6 +72,8 @@ public class CoreModule extends ModuleDefine {
classes.add(IWorkerInstanceGetter.class);
classes.add(IWorkerInstanceSetter.class);
classes.add(MeterSystem.class);
addServerInterface(classes);
addReceiverInterface(classes);
addInsideService(classes);
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationSe
import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
......@@ -154,6 +155,9 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
this.registerServiceImplementation(MeterSystem.class, meterSystem);
AnnotationScan oalDisable = new AnnotationScan();
oalDisable.registerListener(DisableRegister.INSTANCE);
oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
......@@ -252,6 +256,7 @@ public class CoreModuleProvider extends ModuleProvider {
@Override
public void start() throws ModuleStartException {
MeterSystem.closeMeterCreationChannel();
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@RequiredArgsConstructor
@Getter
public class StreamDefinition {
private final String name;
private final int scopeId;
private final Class<? extends StorageBuilder> builder;
private final Class<? extends StreamProcessor> processor;
public static StreamDefinition from(Stream stream) {
return new StreamDefinition(stream.name(), stream.scopeId(), stream.builder(), stream.processor());
}
}
......@@ -21,12 +21,14 @@ package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import joptsimple.internal.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -60,6 +62,10 @@ public class SegmentRecord extends Record {
private String traceId;
@Setter
@Getter
@Column(columnName = TopN.STATEMENT)
private String statement;
@Setter
@Getter
@Column(columnName = SERVICE_ID)
private String serviceId;
@Setter
......@@ -84,7 +90,7 @@ public class SegmentRecord extends Record {
private long endTime;
@Setter
@Getter
@Column(columnName = LATENCY)
@Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
private int latency;
@Setter
@Getter
......@@ -108,9 +114,14 @@ public class SegmentRecord extends Record {
@Override
public Map<String, Object> data2Map(SegmentRecord storageData) {
storageData.statement = Strings.join(new String[] {
storageData.endpointName,
storageData.traceId
}, " - ");
Map<String, Object> map = new HashMap<>();
map.put(SEGMENT_ID, storageData.getSegmentId());
map.put(TRACE_ID, storageData.getTraceId());
map.put(TopN.STATEMENT, storageData.getStatement());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
......@@ -134,6 +145,7 @@ public class SegmentRecord extends Record {
SegmentRecord record = new SegmentRecord();
record.setSegmentId((String) dbMap.get(SEGMENT_ID));
record.setTraceId((String) dbMap.get(TRACE_ID));
record.setStatement((String) dbMap.get(TopN.STATEMENT));
record.setServiceId((String) dbMap.get(SERVICE_ID));
record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID));
record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
/**
* MeterEntity represents the entity in the meter system.
*/
@EqualsAndHashCode
@ToString
public class MeterEntity {
private ScopeType scopeType;
private String serviceName;
private String instanceName;
private String endpointName;
private MeterEntity(final ScopeType scopeType,
final String serviceName,
final String instanceName,
final String endpointName) {
this.scopeType = scopeType;
this.serviceName = serviceName;
this.instanceName = instanceName;
this.endpointName = endpointName;
}
public String id() {
switch (scopeType) {
case SERVICE:
// In Meter system, only normal service, because we don't conjecture any node.
return IDManager.ServiceID.buildId(serviceName, true);
case SERVICE_INSTANCE:
return IDManager.ServiceInstanceID.buildId(
IDManager.ServiceID.buildId(serviceName, true), instanceName);
case ENDPOINT:
return IDManager.EndpointID.buildId(IDManager.ServiceID.buildId(serviceName, true), endpointName);
default:
throw new UnexpectedException("Unexpected scope type of entity " + this.toString());
}
}
/**
* Create a service level meter entity.
*/
public static MeterEntity newService(String serviceName) {
return new MeterEntity(ScopeType.SERVICE, serviceName, null, null);
}
/**
* Create a service instance level meter entity.
*/
public static MeterEntity newServiceInstance(String serviceName, String serviceInstance) {
return new MeterEntity(ScopeType.SERVICE_INSTANCE, serviceName, serviceInstance, null);
}
/**
* Create an endpoint level meter entity.
*/
public static MeterEntity newEndpoint(String serviceName, String endpointName) {
return new MeterEntity(ScopeType.ENDPOINT, serviceName, null, endpointName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtConstructor;
import javassist.CtNewConstructor;
import javassist.CtNewMethod;
import javassist.NotFoundException;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* MeterSystem provides the API way to create {@link MetricsStreamProcessor} rather than manual analysis metrics or OAL
* script.
*
* @since 8.0.0
*/
@Slf4j
public class MeterSystem implements Service {
private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic.";
private static ModuleManager MANAGER;
private static ClassPool CLASS_POOL;
private static List<NewMeter> TO_BE_CREATED_METERS = new ArrayList<>();
private static Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
/**
* Host the dynamic meter prototype classes. These classes could be create dynamically through {@link
* Object#clone()} in the runtime;
*/
private static Map<String, MeterDefinition> METER_PROTOTYPES = new HashMap<>();
private static MeterSystem METER_SYSTEM;
private static boolean METER_CREATABLE = true;
private MeterSystem() {
}
public synchronized static MeterSystem meterSystem(final ModuleManager manager) {
if (METER_SYSTEM != null) {
return METER_SYSTEM;
}
MANAGER = manager;
CLASS_POOL = ClassPool.getDefault();
ClassPath classpath = null;
try {
classpath = ClassPath.from(MeterSystem.class.getClassLoader());
} catch (IOException e) {
throw new UnexpectedException("Load class path failure.");
}
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) {
Class<?> functionClass = classInfo.load();
if (functionClass.isAnnotationPresent(MeterFunction.class)) {
MeterFunction metricsFunction = functionClass.getAnnotation(MeterFunction.class);
if (!AcceptableValue.class.isAssignableFrom(functionClass)) {
throw new IllegalArgumentException(
"Function " + functionClass.getCanonicalName() + " doesn't implement AcceptableValue.");
}
FUNCTION_REGISTER.put(
metricsFunction.functionName(),
(Class<? extends MeterFunction>) functionClass
);
}
}
METER_SYSTEM = new MeterSystem();
return METER_SYSTEM;
}
/**
* Create streaming calculation of the given metrics name. This methods is synchronized due to heavy implementation
* including creating dynamic class. Don't use this in concurrency runtime.
*
* @param metricsName The name used as the storage eneity and in the query stage.
* @param functionName The function provided through {@link MeterFunction}.
* @return true if created, false if it exists.
* @throws IllegalArgumentException if the parameter can't match the expectation.
* @throws UnexpectedException if binary code manipulation fails or stream core failure.
*/
public synchronized <T> boolean create(String metricsName,
String functionName,
ScopeType type,
Class<T> dataType) throws IllegalArgumentException {
if (!METER_CREATABLE) {
throw new IllegalStateException("Can't create new metrics anymore");
}
final NewMeter newMeter = new NewMeter(metricsName, functionName, type, dataType);
if (TO_BE_CREATED_METERS.contains(newMeter)) {
return false;
}
TO_BE_CREATED_METERS.add(newMeter);
return true;
}
/**
* Close the {@link #create(String, String, ScopeType, Class)} channel, and build the model and streaming
* definitions.
*/
public static void closeMeterCreationChannel() {
METER_CREATABLE = false;
TO_BE_CREATED_METERS.forEach(newMeter -> {
String metricsName = newMeter.metricsName;
String functionName = newMeter.functionName;
ScopeType type = newMeter.type;
Class<?> dataType = newMeter.dataType;
/**
* Create a new meter class dynamically.
*/
final Class<? extends MeterFunction> meterFunction = FUNCTION_REGISTER.get(functionName);
if (meterFunction == null) {
throw new IllegalArgumentException("Function " + functionName + " can't be found.");
}
boolean foundDataType = false;
String acceptance = null;
for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
Type[] arguments = parameterizedType.getActualTypeArguments();
if (arguments[0].equals(dataType)) {
foundDataType = true;
} else {
acceptance = arguments[0].getTypeName();
}
}
}
if (!foundDataType) {
throw new IllegalArgumentException("Function " + functionName
+ " requires <" + acceptance + "> in AcceptableValue"
+ " but using " + dataType.getName() + " in the creation");
}
final CtClass parentClass;
try {
parentClass = CLASS_POOL.get(meterFunction.getCanonicalName());
if (!Metrics.class.isAssignableFrom(meterFunction)) {
throw new IllegalArgumentException(
"Function " + functionName + " doesn't inherit from Metrics.");
}
} catch (NotFoundException e) {
throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist.");
}
final String className = formatName(metricsName);
CtClass metricsClass = CLASS_POOL.makeClass(METER_CLASS_PACKAGE + className, parentClass);
/**
* Create empty construct
*/
try {
CtConstructor defaultConstructor = CtNewConstructor.make(
"public " + className + "() {}", metricsClass);
metricsClass.addConstructor(defaultConstructor);
} catch (CannotCompileException e) {
log.error("Can't add empty constructor in " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
/**
* Generate `AcceptableValue<T> createNew()` method.
*/
try {
metricsClass.addMethod(CtNewMethod.make(
""
+ "public org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue createNew() {"
+ " return new " + METER_CLASS_PACKAGE + className + "();"
+ " }"
, metricsClass));
} catch (CannotCompileException e) {
log.error("Can't generate createNew method for " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
Class targetClass;
try {
targetClass = metricsClass.toClass(MeterSystem.class.getClassLoader(), null);
AcceptableValue prototype = (AcceptableValue) targetClass.newInstance();
METER_PROTOTYPES.put(metricsName, new MeterDefinition(type, prototype, dataType));
log.debug("Generate metrics class, " + metricsClass.getName());
MetricsStreamProcessor.getInstance().create(
MANAGER,
new StreamDefinition(
metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class),
targetClass
);
} catch (CannotCompileException | IllegalAccessException | InstantiationException e) {
log.error("Can't compile/load/init " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
});
}
/**
* Create an {@link AcceptableValue} instance for streaming calculation. AcceptableValue instance is stateful,
* shouldn't do {@link AcceptableValue#accept(MeterEntity, Object)} once it is pushed into {@link
* #doStreamingCalculation(AcceptableValue)}.
*
* @param metricsName A defined metrics name. Use {@link #create(String, String, ScopeType, Class)} to define a new
* one.
* @param dataType class type of the input of {@link AcceptableValue}
* @return usable an {@link AcceptableValue} instance.
*/
public <T> AcceptableValue<T> buildMetrics(String metricsName,
Class<T> dataType) {
MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName);
if (meterDefinition == null) {
throw new IllegalArgumentException("Uncreated metrics " + metricsName);
}
if (!meterDefinition.getDataType().equals(dataType)) {
throw new IllegalArgumentException(
"Unmatched metrics data type, request for " + dataType.getName()
+ ", but defined as " + meterDefinition.getDataType());
}
return meterDefinition.getMeterPrototype().createNew();
}
/**
* Active the {@link MetricsStreamProcessor#in(Metrics)} for streaming calculation.
*
* @param acceptableValue should only be created through {@link #create(String, String, ScopeType, Class)}
*/
public void doStreamingCalculation(AcceptableValue acceptableValue) {
MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
}
private static String formatName(String metricsName) {
return metricsName.toLowerCase();
}
@RequiredArgsConstructor
@EqualsAndHashCode
public static class NewMeter {
private final String metricsName;
private final String functionName;
private final ScopeType type;
private final Class<?> dataType;
}
@RequiredArgsConstructor
@Getter
private static class MeterDefinition {
private final ScopeType scopeType;
private final AcceptableValue meterPrototype;
private final Class<?> dataType;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
public enum ScopeType {
SERVICE(DefaultScopeDefine.SERVICE),
SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
ENDPOINT(DefaultScopeDefine.ENDPOINT);
@Getter
private final int scopeId;
ScopeType(final int scopeId) {
this.scopeId = scopeId;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* Indicate this function accepting the data of type T.
*/
public interface AcceptableValue<T> {
void accept(MeterEntity entity, T value);
/**
* @return a new instance based on the implementation, it should be the same class.
*/
AcceptableValue<T> createNew();
/**
* @return builder
*/
Class<? extends StorageBuilder> builder();
void setTimeBucket(long timeBucket);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MeterFunction(functionName = "avg")
@EqualsAndHashCode(of = {
"entityId",
"timeBucket"
})
public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
@Override
public Metrics toHour() {
Avg metrics = (Avg) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
}
@Override
public Metrics toDay() {
Avg metrics = (Avg) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
}
@Override
public int remoteHashCode() {
return entityId.hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
this.count = remoteData.getDataLongs(0);
this.summation = remoteData.getDataLongs(1);
setTimeBucket(remoteData.getDataLongs(2));
this.entityId = remoteData.getDataStrings(0);
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataLongs(count);
remoteBuilder.addDataLongs(summation);
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + entityId;
}
@Override
public void accept(final MeterEntity entity, final Long value) {
this.entityId = entity.id();
this.summation += value;
this.count += 1;
}
@Override
public Class<? extends StorageBuilder> builder() {
return AvgStorageBuilder.class;
}
public static class AvgStorageBuilder implements StorageBuilder<Avg> {
@Override
public Avg map2Data(final Map<String, Object> dbMap) {
Avg metrics = new Avg() {
@Override
public AcceptableValue<Long> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setSummation(((Number) dbMap.get(SUMMATION)).longValue());
metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final Avg storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SUMMATION, storageData.getSummation());
map.put(VALUE, storageData.getValue());
map.put(COUNT, storageData.getCount());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Meter function indicate this class is used in SkyWalking meter system. The meter system accepts data from any number
* based metrics ecosystem, typically like Prometheus and Micrometer Application Monitoring
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MeterFunction {
String functionName();
}
......@@ -37,11 +37,11 @@ public abstract class LongAvgMetrics extends Metrics implements LongValueHolder
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
private long summation;
protected long summation;
@Getter
@Setter
@Column(columnName = COUNT, storageOnly = true)
private long count;
protected long count;
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
......
......@@ -30,6 +30,7 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
......@@ -91,18 +92,24 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
* @param stream definition of the metrics class.
* @param metricsClass data type of the streaming calculation.
*/
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
}
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream,
Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.getName())) {
return;
}
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
try {
metricsDAO = storageDAO.newMetricsDao(stream.builder().newInstance());
metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " metrics DAO failure.", e);
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
}
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
......@@ -128,25 +135,25 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Hour), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Day), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
transWorker = new MetricsTransWorker(
moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker);
moduleDefineHolder, stream.getName(), hourPersistentWorker, dayPersistentWorker);
}
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Minute), false);
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
String remoteReceiverWorkerName = stream.name() + "_rec";
String remoteReceiverWorkerName = stream.getName() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceSetter.class);
......@@ -154,7 +161,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
moduleDefineHolder, remoteWorker, stream.name());
moduleDefineHolder, remoteWorker, stream.getName());
entryWorkers.put(metricsClass, aggregateWorker);
}
......
......@@ -26,6 +26,6 @@ public class CoreModuleTest {
public void testOpenServiceList() {
CoreModule coreModule = new CoreModule();
Assert.assertEquals(28, coreModule.services().length);
Assert.assertEquals(29, coreModule.services().length);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server-fetcher-plugin</artifactId>
<packaging>pom</packaging>
<modules>
<module>prometheus-fetcher-plugin</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-fetcher-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>prometheus-fetcher-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.fetcher.prometheus.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class PrometheusFetcherModule extends ModuleDefine {
public static final String NAME = "prometheus-fetcher";
public PrometheusFetcherModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Getter
public class PrometheusFetcherConfig extends ModuleConfig {
private boolean active;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
public class PrometheusFetcherProvider extends ModuleProvider {
private final PrometheusFetcherConfig config;
public PrometheusFetcherProvider() {
config = new PrometheusFetcherConfig();
}
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return PrometheusFetcherModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (config.isActive()) {
// TODO. This is only a demo about creating new metrics
// We should create it based on metrics configuration.
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
}
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
if (config.isActive()) {
// TODO. This is only a demo about fetching the data and push into the calculation stream.
final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
value.accept(MeterEntity.newService("abc"), 5L);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
service.doStreamingCalculation(value);
}
}, 2, 2, TimeUnit.SECONDS);
}
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.fetcher.prometheus.provider.PrometheusFetcherProvider
\ No newline at end of file
......@@ -31,7 +31,9 @@ import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
......@@ -50,15 +52,27 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
final Duration duration,
final List<KeyValue> additionalConditions) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket()));
final RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket());
boolean asc = false;
if (condition.getOrder().equals(Order.ASC)) {
asc = true;
}
if (additionalConditions != null && additionalConditions.size() > 0) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> {
boolQuery.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
});
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
sourceBuilder.query(queryBuilder);
}
sourceBuilder.aggregation(
AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
......
......@@ -30,7 +30,9 @@ import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
......@@ -52,15 +54,28 @@ public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
final Duration duration,
final List<KeyValue> additionalConditions) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket()));
final RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket());
boolean asc = false;
if (condition.getOrder().equals(Order.ASC)) {
asc = true;
}
if (additionalConditions != null && additionalConditions.size() > 0) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
additionalConditions.forEach(additionalCondition -> {
boolQuery.must()
.add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
});
boolQuery.must().add(queryBuilder);
sourceBuilder.query(boolQuery);
} else {
sourceBuilder.query(queryBuilder);
}
sourceBuilder.aggregation(
AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
......
......@@ -23,6 +23,7 @@ import java.util.Collections;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
......@@ -107,6 +108,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
this.registerServiceImplementation(MeterSystem.class, meterSystem);
CoreModuleConfig moduleConfig = new CoreModuleConfig();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
this.registerServiceImplementation(
......@@ -156,6 +160,8 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
@Override
public void start() throws ModuleStartException {
MeterSystem.closeMeterCreationChannel();
try {
annotationScan.scan();
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册