未验证 提交 528ee6de 编写于 作者: K kezhenxu94 提交者: GitHub

OAL supports generating metrics from events (#7134)

* OAL supports generating metrics from events

* Add new dashboard for event metrics

* Add some docs for event metrics
上级 edf6fccf
...@@ -37,6 +37,7 @@ Release Notes. ...@@ -37,6 +37,7 @@ Release Notes.
* Upgrade snake yaml caused by CVE-2017-18640. * Upgrade snake yaml caused by CVE-2017-18640.
* Upgrade embed tomcat caused by CVE-2020-13935. * Upgrade embed tomcat caused by CVE-2020-13935.
* Upgrade commons-lang3 to avoid potential NPE in some JDK versions. * Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
* OAL supports generating metrics from events.
#### UI #### UI
......
...@@ -57,7 +57,7 @@ There are also cases where you would already have both the start time and end ti ...@@ -57,7 +57,7 @@ There are also cases where you would already have both the start time and end ti
## How to Configure Alarms for Events ## How to Configure Alarms for Events
Events are derived from metrics, and can be the source to trigger alarms. For example, if a specific event occurs for a Events derive from metrics, and can be the source to trigger alarms. For example, if a specific event occurs for a
certain times in a period, alarms can be triggered and sent. certain times in a period, alarms can be triggered and sent.
Every event has a default `value = 1`, when `n` events with the same name are reported, they are aggregated Every event has a default `value = 1`, when `n` events with the same name are reported, they are aggregated
...@@ -101,6 +101,16 @@ For more alarm configuration details, please refer to the [alarm doc](../setup/b ...@@ -101,6 +101,16 @@ For more alarm configuration details, please refer to the [alarm doc](../setup/b
**Note** that the `Unhealthy` event above is only for demonstration, they are not detected by default in SkyWalking, **Note** that the `Unhealthy` event above is only for demonstration, they are not detected by default in SkyWalking,
however, you can use the methods in [How to Report Events](#how-to-report-events) to report this kind of events. however, you can use the methods in [How to Report Events](#how-to-report-events) to report this kind of events.
## Correlation between events and metrics
SkyWalking UI visualizes the events in the dashboard when the event service / instance / endpoint matches the displayed
service / instance / endpoint.
By default, SkyWalking also generates some metrics for events by using [OAL](oal.md). The default metrics list of event
may change over time, you can find the complete list
in [event.oal](../../../oap-server/server-bootstrap/src/main/resources/oal/event.oal). If you want to generate you
custom metrics from events, please refer to [OAL](oal.md) about how to write OAL rules.
## Known Events ## Known Events
| Name | Type | When | Where | | Name | Type | When | Where |
......
...@@ -181,7 +181,7 @@ This calculates the metrics data from each request of the page in the browser ap ...@@ -181,7 +181,7 @@ This calculates the metrics data from each request of the page in the browser ap
### SCOPE `BrowserAppPagePerf` ### SCOPE `BrowserAppPagePerf`
This calculates the metrics data form each request of the page in the browser application (browser only). This calculates the metrics data from each request of the page in the browser application (browser only).
| Name | Remarks | Group Key | Type | | Name | Remarks | Group Key | Type |
|---|---|---|---| |---|---|---|---|
...@@ -201,3 +201,17 @@ This calculates the metrics data form each request of the page in the browser ap ...@@ -201,3 +201,17 @@ This calculates the metrics data form each request of the page in the browser ap
| ttlTime | Time to interact. | | int(in ms) | | ttlTime | Time to interact. | | int(in ms) |
| firstPackTime | First pack time. | | int(in ms) | | firstPackTime | First pack time. | | int(in ms) |
| fmpTime | First Meaningful Paint. | | int(in ms) | | fmpTime | First Meaningful Paint. | | int(in ms) |
### SCOPE `Event`
This calculates the metrics data from [events](event.md).
| Name | Remarks | Group Key | Type |
|---|---|---|---|
| name | The name of the event. | | string |
| service | The service name to which the event belongs to. | | string |
| serviceInstance | The service instance to which the event belongs to, if any. | | string|
| endpoint | The service endpoint to which the event belongs to, if any. | | string|
| type | The type of the event, `Normal` or `Error`. | | string|
| message | The message of the event. | | string |
| parameters | The parameters in the `message`, see [parameters](event.md#parameters). | | string |
...@@ -20,9 +20,11 @@ package org.apache.skywalking.oap.server.analyzer.event; ...@@ -20,9 +20,11 @@ package org.apache.skywalking.oap.server.analyzer.event;
import org.apache.skywalking.oap.server.analyzer.event.listener.EventRecordAnalyzerListener; import org.apache.skywalking.oap.server.analyzer.event.listener.EventRecordAnalyzerListener;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
public class EventAnalyzerModuleProvider extends ModuleProvider { public class EventAnalyzerModuleProvider extends ModuleProvider {
...@@ -51,7 +53,12 @@ public class EventAnalyzerModuleProvider extends ModuleProvider { ...@@ -51,7 +53,12 @@ public class EventAnalyzerModuleProvider extends ModuleProvider {
} }
@Override @Override
public void start() { public void start() throws ModuleStartException {
getManager().find(CoreModule.NAME)
.provider()
.getService(OALEngineLoaderService.class)
.load(EventOALDefine.INSTANCE);
analysisService.add(new EventRecordAnalyzerListener.Factory(getManager())); analysisService.add(new EventRecordAnalyzerListener.Factory(getManager()));
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.analyzer.event;
import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
/**
* OAL rules to calculate Event-specific metrics.
*/
public class EventOALDefine extends OALDefine {
public static final EventOALDefine INSTANCE = new EventOALDefine();
private EventOALDefine() {
super(
"oal/event.oal",
"org.apache.skywalking.oap.server.core.source"
);
}
}
...@@ -25,7 +25,8 @@ import org.apache.skywalking.oap.server.core.CoreModule; ...@@ -25,7 +25,8 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.event.Event; import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
...@@ -37,11 +38,14 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener { ...@@ -37,11 +38,14 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
private final NamingControl namingControl; private final NamingControl namingControl;
private final SourceReceiver sourceReceiver;
private final Event event = new Event(); private final Event event = new Event();
@Override @Override
public void build() { public void build() {
MetricsStreamProcessor.getInstance().in(event); MetricsStreamProcessor.getInstance().in(event);
sourceReceiver.receive(event);
} }
@Override @Override
...@@ -72,16 +76,20 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener { ...@@ -72,16 +76,20 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
public static class Factory implements EventAnalyzerListener.Factory { public static class Factory implements EventAnalyzerListener.Factory {
private final NamingControl namingControl; private final NamingControl namingControl;
private final SourceReceiver sourceReceiver;
public Factory(final ModuleManager moduleManager) { public Factory(final ModuleManager moduleManager) {
this.namingControl = moduleManager.find(CoreModule.NAME) this.namingControl = moduleManager.find(CoreModule.NAME)
.provider() .provider()
.getService(NamingControl.class); .getService(NamingControl.class);
this.sourceReceiver = moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class);
} }
@Override @Override
public EventAnalyzerListener create(final ModuleManager moduleManager) { public EventAnalyzerListener create(final ModuleManager moduleManager) {
return new EventRecordAnalyzerListener(namingControl); return new EventRecordAnalyzerListener(namingControl, sourceReceiver);
} }
} }
} }
...@@ -39,6 +39,7 @@ SRC_SERVICE_INSTANCE_CLR_CPU: 'ServiceInstanceCLRCPU'; ...@@ -39,6 +39,7 @@ SRC_SERVICE_INSTANCE_CLR_CPU: 'ServiceInstanceCLRCPU';
SRC_SERVICE_INSTANCE_CLR_GC: 'ServiceInstanceCLRGC'; SRC_SERVICE_INSTANCE_CLR_GC: 'ServiceInstanceCLRGC';
SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread'; SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread';
SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric'; SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric';
SRC_EVENT: 'Event';
// Browser keywords // Browser keywords
SRC_BROWSER_APP_PERF: 'BrowserAppPerf'; SRC_BROWSER_APP_PERF: 'BrowserAppPerf';
......
...@@ -55,7 +55,8 @@ source ...@@ -55,7 +55,8 @@ source
SRC_SERVICE_INSTANCE_CLR_CPU | SRC_SERVICE_INSTANCE_CLR_GC | SRC_SERVICE_INSTANCE_CLR_THREAD | SRC_SERVICE_INSTANCE_CLR_CPU | SRC_SERVICE_INSTANCE_CLR_GC | SRC_SERVICE_INSTANCE_CLR_THREAD |
SRC_ENVOY_INSTANCE_METRIC | SRC_ENVOY_INSTANCE_METRIC |
SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF | SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF |
SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC |
SRC_EVENT
; ;
disableSource disableSource
......
public void dispatch(org.apache.skywalking.oap.server.core.source.Source source) { public void dispatch(org.apache.skywalking.oap.server.core.source.ISource source) {
${sourcePackage}${source} _source = (${sourcePackage}${source})source; ${sourcePackage}${source} _source = (${sourcePackage}${source})source;
<#list metrics as metrics> <#list metrics as metrics>
do${metrics.metricsName}(_source); do${metrics.metricsName}(_source);
</#list> </#list>
} }
\ No newline at end of file
private void do${metricsName}(${sourcePackage}${sourceName} source) { private void do${metricsName}(${sourcePackage}${sourceName} source) {
${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
<#if filterExpressions??> <#if filterExpressions??>
<#list filterExpressions as filterExpression> <#list filterExpressions as filterExpression>
...@@ -9,6 +8,7 @@ ${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage} ...@@ -9,6 +8,7 @@ ${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}
</#list> </#list>
</#if> </#if>
${metricsClassPackage}${metricsName}Metrics metrics = new ${metricsClassPackage}${metricsName}Metrics();
metrics.setTimeBucket(source.getTimeBucket()); metrics.setTimeBucket(source.getTimeBucket());
<#list fieldsFromSource as field> <#list fieldsFromSource as field>
metrics.${field.fieldSetter}(source.${field.fieldGetter}()); metrics.${field.fieldSetter}(source.${field.fieldGetter}());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
event_total = from(Event.*).count();
event_normal_count = from(Event.*).filter(type == "Normal").count();
event_error_count = from(Event.*).filter(type == "Error").count();
event_start_count = from(Event.*).filter(name == "Start").count();
event_shutdown_count = from(Event.*).filter(name == "Shutdown").count();
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
templates:
- name: "Event"
type: "DASHBOARD"
configuration: |-
[
{
"name": "Event",
"type": "service",
"children": [
{
"name": "Global",
"children": [
{
"width": "3",
"title": "Event Count by Severity",
"height": "280",
"entityType": "All",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
"metricName": "event_total,event_normal_count,event_error_count",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine"
},
{
"width": "3",
"title": "Event Count by Lifecycle",
"height": "280",
"entityType": "All",
"independentSelector": false,
"metricType": "REGULAR_VALUE",
"metricName": "event_start_count,event_shutdown_count",
"queryMetricType": "readMetricsValues",
"chartType": "ChartLine"
}
]
}
]
}
]
activated: true
disabled: false
...@@ -29,7 +29,7 @@ import java.util.HashMap; ...@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.core.source.ISource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -43,7 +43,7 @@ public class DispatcherManager implements DispatcherDetectorListener { ...@@ -43,7 +43,7 @@ public class DispatcherManager implements DispatcherDetectorListener {
this.dispatcherMap = new HashMap<>(); this.dispatcherMap = new HashMap<>();
} }
public void forward(Source source) { public void forward(ISource source) {
if (source == null) { if (source == null) {
return; return;
} }
...@@ -96,12 +96,12 @@ public class DispatcherManager implements DispatcherDetectorListener { ...@@ -96,12 +96,12 @@ public class DispatcherManager implements DispatcherDetectorListener {
Object source = ((Class) argument).newInstance(); Object source = ((Class) argument).newInstance();
if (!Source.class.isAssignableFrom(source.getClass())) { if (!ISource.class.isAssignableFrom(source.getClass())) {
throw new UnexpectedException( throw new UnexpectedException(
"unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. "); "unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
} }
Source dispatcherSource = (Source) source; ISource dispatcherSource = (ISource) source;
SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance(); SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();
int scopeId = dispatcherSource.scope(); int scopeId = dispatcherSource.scope();
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis; package org.apache.skywalking.oap.server.core.analysis;
import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.core.source.ISource;
/** /**
* SourceDispatcher implementation processes different types of the source. There are two kinds of the source * SourceDispatcher implementation processes different types of the source. There are two kinds of the source
...@@ -29,6 +29,6 @@ import org.apache.skywalking.oap.server.core.source.Source; ...@@ -29,6 +29,6 @@ import org.apache.skywalking.oap.server.core.source.Source;
* *
* @param <SOURCE> the data type of this dispatcher processes. * @param <SOURCE> the data type of this dispatcher processes.
*/ */
public interface SourceDispatcher<SOURCE extends Source> { public interface SourceDispatcher<SOURCE extends ISource> {
void dispatch(SOURCE source); void dispatch(SOURCE source);
} }
...@@ -16,14 +16,13 @@ ...@@ -16,14 +16,13 @@
* *
*/ */
package org.apache.skywalking.oap.server.core.event; package org.apache.skywalking.oap.server.core.source;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension; import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.Stream;
...@@ -34,12 +33,10 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; ...@@ -34,12 +33,10 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.elasticsearch.common.Strings;
import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EVENT; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EVENT;
@Getter @Getter
...@@ -51,7 +48,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EV ...@@ -51,7 +48,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EV
of = "uuid" of = "uuid"
) )
@MetricsExtension(supportDownSampling = false, supportUpdate = true) @MetricsExtension(supportDownSampling = false, supportUpdate = true)
public class Event extends Metrics implements WithMetadata, LongValueHolder { public class Event extends Metrics implements ISource, WithMetadata, LongValueHolder {
public static final String INDEX_NAME = "events"; public static final String INDEX_NAME = "events";
...@@ -136,13 +133,13 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder { ...@@ -136,13 +133,13 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder {
setEndTime(event.getEndTime()); setEndTime(event.getEndTime());
} }
if (StringUtil.isNotBlank(event.getType())) { if (isNotBlank(event.getType())) {
setType(event.getType()); setType(event.getType());
} }
if (StringUtil.isNotBlank(event.getMessage())) { if (isNotBlank(event.getMessage())) {
setType(event.getMessage()); setType(event.getMessage());
} }
if (StringUtil.isNotBlank(event.getParameters())) { if (isNotBlank(event.getParameters())) {
setParameters(event.getParameters()); setParameters(event.getParameters());
} }
return true; return true;
...@@ -206,16 +203,31 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder { ...@@ -206,16 +203,31 @@ public class Event extends Metrics implements WithMetadata, LongValueHolder {
@Override @Override
public MetricsMetaInfo getMeta() { public MetricsMetaInfo getMeta() {
int scope = DefaultScopeDefine.SERVICE; int scope = DefaultScopeDefine.SERVICE;
if (isNotBlank(getServiceInstance())) {
scope = DefaultScopeDefine.SERVICE_INSTANCE;
} else if (isNotBlank(getEndpoint())) {
scope = DefaultScopeDefine.ENDPOINT;
}
String id = getEntityId();
return new MetricsMetaInfo(getName(), scope, id);
}
@Override
public int scope() {
return EVENT;
}
@Override
public String getEntityId() {
final String serviceId = IDManager.ServiceID.buildId(getService(), true); final String serviceId = IDManager.ServiceID.buildId(getService(), true);
String id = serviceId; String id = serviceId;
if (!Strings.isNullOrEmpty(getServiceInstance())) { if (isNotBlank(getServiceInstance())) {
scope = DefaultScopeDefine.SERVICE_INSTANCE;
id = IDManager.ServiceInstanceID.buildId(serviceId, getServiceInstance()); id = IDManager.ServiceInstanceID.buildId(serviceId, getServiceInstance());
} else if (!Strings.isNullOrEmpty(getEndpoint())) { } else if (isNotBlank(getEndpoint())) {
scope = DefaultScopeDefine.ENDPOINT;
id = IDManager.EndpointID.buildId(serviceId, getEndpoint()); id = IDManager.EndpointID.buildId(serviceId, getEndpoint());
} }
return new MetricsMetaInfo(getName(), scope, id); return id;
} }
public static class Builder implements StorageHashMapBuilder<Event> { public static class Builder implements StorageHashMapBuilder<Event> {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
public interface ISource {
int scope();
long getTimeBucket();
void setTimeBucket(long timeBucket);
String getEntityId();
/**
* Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
*/
default void prepare() {
}
}
...@@ -21,19 +21,9 @@ package org.apache.skywalking.oap.server.core.source; ...@@ -21,19 +21,9 @@ package org.apache.skywalking.oap.server.core.source;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
public abstract class Source { public abstract class Source implements ISource {
public abstract int scope();
@Getter @Getter
@Setter @Setter
private long timeBucket; private long timeBucket;
public abstract String getEntityId();
/**
* Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(Source)}
*/
public void prepare() {
}
} }
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.library.module.Service; ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
* in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}. * in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}.
*/ */
public interface SourceReceiver extends Service { public interface SourceReceiver extends Service {
void receive(Source source); void receive(ISource source);
DispatcherDetectorListener getDispatcherDetectorListener(); DispatcherDetectorListener getDispatcherDetectorListener();
} }
...@@ -32,7 +32,7 @@ public class SourceReceiverImpl implements SourceReceiver { ...@@ -32,7 +32,7 @@ public class SourceReceiverImpl implements SourceReceiver {
} }
@Override @Override
public void receive(Source source) { public void receive(ISource source) {
dispatcherManager.forward(source); dispatcherManager.forward(source);
} }
......
...@@ -23,16 +23,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; ...@@ -23,16 +23,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.management.v3.InstancePingPkg; import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties; import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping; import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate; import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleManager;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.mock.MockModuleProvider;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
...@@ -52,7 +52,7 @@ public class ServiceManagementHandlerTest { ...@@ -52,7 +52,7 @@ public class ServiceManagementHandlerTest {
public static SourceReceiverRule SOURCE_RECEIVER = new SourceReceiverRule() { public static SourceReceiverRule SOURCE_RECEIVER = new SourceReceiverRule() {
@Override @Override
protected void verify(final List<Source> sourceList) throws Throwable { protected void verify(final List<ISource> sourceList) throws Throwable {
ServiceInstanceUpdate instanceUpdate = (ServiceInstanceUpdate) sourceList.get(0); ServiceInstanceUpdate instanceUpdate = (ServiceInstanceUpdate) sourceList.get(0);
Assert.assertEquals(instanceUpdate.getName(), SERVICE_INSTANCE); Assert.assertEquals(instanceUpdate.getName(), SERVICE_INSTANCE);
......
...@@ -21,15 +21,15 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler; ...@@ -21,15 +21,15 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener; import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.junit.rules.Verifier; import org.junit.rules.Verifier;
public abstract class SourceReceiverRule extends Verifier implements SourceReceiver { public abstract class SourceReceiverRule extends Verifier implements SourceReceiver {
private final List<Source> sourceList = Lists.newArrayList(); private final List<ISource> sourceList = Lists.newArrayList();
@Override @Override
public void receive(final Source source) { public void receive(final ISource source) {
sourceList.add(source); sourceList.add(source);
} }
...@@ -38,10 +38,11 @@ public abstract class SourceReceiverRule extends Verifier implements SourceRecei ...@@ -38,10 +38,11 @@ public abstract class SourceReceiverRule extends Verifier implements SourceRecei
return null; return null;
} }
@Override
protected void verify() throws Throwable { protected void verify() throws Throwable {
verify(sourceList); verify(sourceList);
} }
protected abstract void verify(List<Source> sourceList) throws Throwable; protected abstract void verify(List<ISource> sourceList) throws Throwable;
} }
...@@ -22,7 +22,7 @@ import java.util.ArrayList; ...@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import lombok.Getter; import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener; import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
/** /**
...@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver; ...@@ -30,10 +30,10 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
*/ */
public class MockReceiver implements SourceReceiver { public class MockReceiver implements SourceReceiver {
@Getter @Getter
private List<Source> receivedSources = new ArrayList<>(); private List<ISource> receivedSources = new ArrayList<>();
@Override @Override
public void receive(final Source source) { public void receive(final ISource source) {
receivedSources.add(source); receivedSources.add(source);
} }
......
...@@ -43,12 +43,12 @@ import org.apache.skywalking.oap.server.core.source.All; ...@@ -43,12 +43,12 @@ import org.apache.skywalking.oap.server.core.source.All;
import org.apache.skywalking.oap.server.core.source.DatabaseAccess; import org.apache.skywalking.oap.server.core.source.DatabaseAccess;
import org.apache.skywalking.oap.server.core.source.Endpoint; import org.apache.skywalking.oap.server.core.source.Endpoint;
import org.apache.skywalking.oap.server.core.source.EndpointRelation; import org.apache.skywalking.oap.server.core.source.EndpointRelation;
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.Service; import org.apache.skywalking.oap.server.core.source.Service;
import org.apache.skywalking.oap.server.core.source.ServiceInstance; import org.apache.skywalking.oap.server.core.source.ServiceInstance;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation; import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
import org.apache.skywalking.oap.server.core.source.ServiceMeta; import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.ServiceRelation; import org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.source.Source;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -147,7 +147,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -147,7 +147,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment); listener.parseEntry(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size()); Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0); final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1); final Service service = (Service) receivedSources.get(1);
...@@ -212,7 +212,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -212,7 +212,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment); listener.parseEntry(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size()); Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0); final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1); final Service service = (Service) receivedSources.get(1);
...@@ -276,7 +276,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -276,7 +276,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseEntry(spanObject, segment); listener.parseEntry(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(7, receivedSources.size()); Assert.assertEquals(7, receivedSources.size());
final All all = (All) receivedSources.get(0); final All all = (All) receivedSources.get(0);
final Service service = (Service) receivedSources.get(1); final Service service = (Service) receivedSources.get(1);
...@@ -332,7 +332,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -332,7 +332,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseLocal(spanObject, segment); listener.parseLocal(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(1, receivedSources.size()); Assert.assertEquals(1, receivedSources.size());
final Endpoint source = (Endpoint) receivedSources.get(0); final Endpoint source = (Endpoint) receivedSources.get(0);
Assert.assertEquals("/logic-call", source.getName()); Assert.assertEquals("/logic-call", source.getName());
...@@ -377,7 +377,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -377,7 +377,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseLocal(spanObject, segment); listener.parseLocal(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(1, receivedSources.size()); Assert.assertEquals(1, receivedSources.size());
final Endpoint source = (Endpoint) receivedSources.get(0); final Endpoint source = (Endpoint) receivedSources.get(0);
Assert.assertEquals("/GraphQL-service", source.getName()); Assert.assertEquals("/GraphQL-service", source.getName());
...@@ -416,7 +416,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -416,7 +416,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseExit(spanObject, segment); listener.parseExit(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(4, receivedSources.size()); Assert.assertEquals(4, receivedSources.size());
final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0); final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1); final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
...@@ -462,7 +462,7 @@ public class MultiScopesAnalysisListenerTest { ...@@ -462,7 +462,7 @@ public class MultiScopesAnalysisListenerTest {
listener.parseExit(spanObject, segment); listener.parseExit(spanObject, segment);
listener.build(); listener.build();
final List<Source> receivedSources = mockReceiver.getReceivedSources(); final List<ISource> receivedSources = mockReceiver.getReceivedSources();
Assert.assertEquals(2, receivedSources.size()); Assert.assertEquals(2, receivedSources.size());
final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0); final ServiceRelation serviceRelation = (ServiceRelation) receivedSources.get(0);
final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1); final ServiceInstanceRelation serviceInstanceRelation = (ServiceInstanceRelation) receivedSources.get(1);
......
...@@ -23,7 +23,7 @@ import java.util.List; ...@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.skywalking.oap.server.core.event.Event; import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
......
...@@ -22,7 +22,7 @@ import java.io.IOException; ...@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.skywalking.oap.server.core.event.Event; import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Events; import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......
...@@ -26,7 +26,7 @@ import java.util.Map; ...@@ -26,7 +26,7 @@ import java.util.Map;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.event.Event; import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
......
...@@ -28,7 +28,7 @@ import java.util.stream.Collectors; ...@@ -28,7 +28,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.event.Event; import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.EventType; import org.apache.skywalking.oap.server.core.query.type.event.EventType;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.tool.profile.core.mock; package org.apache.skywalking.oap.server.tool.profile.core.mock;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener; import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
/** /**
...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver; ...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
*/ */
public class MockSourceReceiver implements SourceReceiver { public class MockSourceReceiver implements SourceReceiver {
@Override @Override
public void receive(Source source) { public void receive(ISource source) {
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册