提交 94555f62 编写于 作者: B Brandon Fergerson

Merge remote-tracking branch 'origin/qualified-spring-mvc-endpoints' into...

Merge remote-tracking branch 'origin/qualified-spring-mvc-endpoints' into qualified-spring-mvc-endpoints
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.boot;
import java.net.URISyntaxException;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
......@@ -61,9 +62,11 @@ public class AgentPackagePath {
urlString = urlString.substring(urlString.indexOf("file:"), insidePathIndex);
File agentJarFile = null;
try {
agentJarFile = new File(new URL(urlString).getFile());
agentJarFile = new File(new URL(urlString).toURI());
} catch (MalformedURLException e) {
logger.error(e, "Can not locate agent jar file by url:" + urlString);
} catch (URISyntaxException e) {
logger.error(e, "Can not locate agent jar file by url:" + urlString);
}
if (agentJarFile.exists()) {
return agentJarFile.getParentFile();
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
......@@ -28,41 +27,38 @@ import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
*/
public interface AbstractTracerContext {
/**
* Prepare for the cross-process propagation.
* How to initialize the carrier, depends on the implementation.
* Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.
*
* @param carrier to carry the context for crossing process.
*/
void inject(ContextCarrier carrier);
/**
* Build the reference between this segment and a cross-process segment.
* How to build, depends on the implementation.
* Build the reference between this segment and a cross-process segment. How to build, depends on the
* implementation.
*
* @param carrier carried the context from a cross-process segment.
*/
void extract(ContextCarrier carrier);
/**
* Capture a snapshot for cross-thread propagation.
* It's a similar concept with ActiveSpan.Continuation in OpenTracing-java
* How to build, depends on the implementation.
* Capture a snapshot for cross-thread propagation. It's a similar concept with ActiveSpan.Continuation in
* OpenTracing-java How to build, depends on the implementation.
*
* @return the {@link ContextSnapshot} , which includes the reference context.
*/
ContextSnapshot capture();
/**
* Build the reference between this segment and a cross-thread segment.
* How to build, depends on the implementation.
* Build the reference between this segment and a cross-thread segment. How to build, depends on the
* implementation.
*
* @param snapshot from {@link #capture()} in the parent thread.
*/
void continued(ContextSnapshot snapshot);
/**
* Get the global trace id, if needEnhance.
* How to build, depends on the implementation.
* Get the global trace id, if needEnhance. How to build, depends on the implementation.
*
* @return the string represents the id.
*/
......@@ -102,7 +98,21 @@ public interface AbstractTracerContext {
* Finish the given span, and the given span should be the active span of current tracing context(stack)
*
* @param span to finish
* @return true when context should be clear.
*/
boolean stopSpan(AbstractSpan span);
/**
* Notify this context, current span is going to be finished async in another thread.
*
* @return The current context
*/
void stopSpan(AbstractSpan span);
AbstractTracerContext awaitFinishAsync();
/**
* The given span could be stopped officially.
*
* @param span to be stopped.
*/
void asyncStop(AsyncSpan span);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
/**
* Span could use these APIs to active and extend its lift cycle across thread.
*
* This is typical used in async plugin, especially RPC plugins.
*
* @author wusheng
*/
public interface AsyncSpan {
/**
* The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
* called.
*
* This method must be called
*
* 1. In original thread(tracing context).
* 2. Current span is active span.
*
* During alive, tags, logs and attributes of the span could be changed, in any thread.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan prepareForAsync();
/**
* Notify the span, it could be finished.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan asyncFinish();
}
......@@ -18,14 +18,11 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.*;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -39,7 +36,7 @@ import org.apache.skywalking.apm.util.StringUtil;
*
* @author wusheng
*/
public class ContextManager implements TracingContextListener, BootService, IgnoreTracerContextListener {
public class ContextManager implements BootService {
private static final ILog logger = LogManager.getLogger(ContextManager.class);
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
......@@ -59,7 +56,7 @@ public class ContextManager implements TracingContextListener, BootService, Igno
} else {
if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
) {
) {
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
} else {
/**
......@@ -152,6 +149,14 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
}
public static AbstractTracerContext awaitFinishAsync(AbstractSpan span) {
AbstractSpan activeSpan = activeSpan();
if (span != activeSpan) {
throw new RuntimeException("Span is not the active in current context.");
}
return get().awaitFinishAsync();
}
public static AbstractSpan activeSpan() {
return get().activeSpan();
}
......@@ -161,7 +166,9 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
public static void stopSpan(AbstractSpan span) {
get().stopSpan(span);
if (get().stopSpan(span)) {
CONTEXT.remove();
}
}
@Override
......@@ -171,8 +178,6 @@ public class ContextManager implements TracingContextListener, BootService, Igno
@Override
public void boot() {
ContextManagerExtendService service = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
service.registerListeners(this);
}
@Override
......@@ -184,16 +189,6 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
@Override
public void afterFinished(TraceSegment traceSegment) {
CONTEXT.remove();
}
@Override
public void afterFinished(IgnoredTracerContext traceSegment) {
CONTEXT.remove();
}
public static boolean isActive() {
return get() != null;
}
......
......@@ -18,9 +18,7 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
......@@ -45,11 +43,6 @@ public class ContextManagerExtendService implements BootService {
}
public void registerListeners(ContextManager manager) {
TracingContext.ListenerManager.add(manager);
IgnoredTracerContext.ListenerManager.add(manager);
}
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
int suffixIdx = operationName.lastIndexOf(".");
......
......@@ -16,17 +16,14 @@
*
*/
package org.apache.skywalking.apm.agent.core.context;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
import java.util.*;
import org.apache.skywalking.apm.agent.core.context.trace.*;
/**
* The <code>IgnoredTracerContext</code> represent a context should be ignored.
* So it just maintains the stack with an integer depth field.
* The <code>IgnoredTracerContext</code> represent a context should be ignored. So it just maintains the stack with an
* integer depth field.
*
* All operations through this will be ignored, and keep the memory and gc cost as low as possible.
*
......@@ -88,11 +85,20 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override
public void stopSpan(AbstractSpan span) {
public boolean stopSpan(AbstractSpan span) {
stackDepth--;
if (stackDepth == 0) {
ListenerManager.notifyFinish(this);
}
return stackDepth == 0;
}
@Override public AbstractTracerContext awaitFinishAsync() {
return this;
}
@Override public void asyncStop(AsyncSpan span) {
}
public static class ListenerManager {
......
......@@ -18,25 +18,14 @@
package org.apache.skywalking.apm.agent.core.context;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.EntrySpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.context.trace.WithPeerInfo;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.context.trace.*;
import org.apache.skywalking.apm.agent.core.dictionary.*;
import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -80,6 +69,13 @@ public class TracingContext implements AbstractTracerContext {
*/
private int spanIdGenerator;
/**
* The counter indicates
*/
private AtomicInteger asyncSpanCounter;
private volatile boolean isRunningInAsyncMode;
private ReentrantLock asyncFinishLock;
/**
* Initialize all fields with default value.
*/
......@@ -89,6 +85,9 @@ public class TracingContext implements AbstractTracerContext {
if (samplingService == null) {
samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
}
asyncSpanCounter = new AtomicInteger(0);
isRunningInAsyncMode = false;
asyncFinishLock = new ReentrantLock();
}
/**
......@@ -392,7 +391,7 @@ public class TracingContext implements AbstractTracerContext {
* @param span to finish
*/
@Override
public void stopSpan(AbstractSpan span) {
public boolean stopSpan(AbstractSpan span) {
AbstractSpan lastSpan = peek();
if (lastSpan == span) {
if (lastSpan instanceof AbstractTracingSpan) {
......@@ -407,9 +406,41 @@ public class TracingContext implements AbstractTracerContext {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
if (activeSpanStack.isEmpty()) {
this.finish();
if (checkFinishConditions()) {
finish();
}
return activeSpanStack.isEmpty();
}
@Override public AbstractTracerContext awaitFinishAsync() {
isRunningInAsyncMode = true;
asyncSpanCounter.addAndGet(1);
return this;
}
@Override public void asyncStop(AsyncSpan span) {
asyncSpanCounter.addAndGet(-1);
if (checkFinishConditions()) {
finish();
}
}
private boolean checkFinishConditions() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
if (activeSpanStack.isEmpty() && asyncSpanCounter.get() == 0) {
return true;
}
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
return false;
}
/**
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.AsyncSpan;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.network.trace.component.Component;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
......@@ -28,7 +29,7 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
*
* @author wusheng
*/
public interface AbstractSpan {
public interface AbstractSpan extends AsyncSpan {
/**
* Set the component id, which defines in {@link ComponentsDefine}
*
......
......@@ -18,15 +18,10 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
import org.apache.skywalking.apm.agent.core.context.util.ThrowableTransformer;
import java.util.*;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.tag.*;
import org.apache.skywalking.apm.agent.core.context.util.*;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
......@@ -45,6 +40,9 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
protected String operationName;
protected int operationId;
protected SpanLayer layer;
protected boolean isInAsyncMode = false;
protected volatile AbstractTracerContext context;
/**
* The start time of this Span.
*/
......@@ -322,4 +320,20 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
refs.add(ref);
}
}
@Override public AbstractSpan prepareForAsync() {
context = ContextManager.awaitFinishAsync(this);
isInAsyncMode = true;
return this;
}
@Override public AbstractSpan asyncFinish() {
if (!isInAsyncMode) {
throw new RuntimeException("Span is not in async mode, please use '#prepareForAsync' to active.");
}
this.endTime = System.currentTimeMillis();
context.asyncStop(this);
return this;
}
}
......@@ -115,4 +115,12 @@ public class NoopSpan implements AbstractSpan {
@Override public AbstractSpan setPeer(String remotePeer) {
return this;
}
@Override public AbstractSpan prepareForAsync() {
return this;
}
@Override public AbstractSpan asyncFinish() {
return this;
}
}
......@@ -69,16 +69,13 @@ public class ServiceManagerTest {
private void assertIgnoreTracingContextListener() throws Exception {
List<TracingContextListener> listeners = getFieldValue(IgnoredTracerContext.ListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(1));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(listeners.size(), is(0));
}
private void assertTracingContextListener() throws Exception {
List<TracingContextListener> listeners = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(2));
assertThat(listeners.size(), is(1));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true));
}
......
......@@ -30,6 +30,7 @@ For each official Apache release, there is a complete and independent source cod
* `grpc-java` and `java` folders in **apm-protocol/apm-network/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-core/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/target/generated-sources/protobuf**
* `grpc-java` and `java` folders in **oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/target/generated-sources/protobuf**
* `antlr4` folder in **oap-server/generate-tool-grammar/target/generated-sources**
* `oal` folder in **oap-server/generated-analysis/target/generated-sources**
......
......@@ -160,6 +160,42 @@ SpanLayer is the catalog of span. Here are 5 values:
Component IDs are defined and reserved by SkyWalking project.
For component name/ID extension, please follow [cComponent library definition and extension](Component-library-settings.md) document.
### Advanced APIs
#### Async Span APIs
There is a set of advanced APIs in Span, which work specific for async scenario. When tags, logs, attributes(including end time) of the span
needs to set in another thread, you should use these APIs.
```java
/**
* The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
* called.
*
* This method must be called<br/>
* 1. In original thread(tracing context).
* 2. Current span is active span.
*
* During alive, tags, logs and attributes of the span could be changed, in any thread.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan prepareForAsync();
/**
* Notify the span, it could be finished.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan asyncFinish();
```
1. Call `#prepareForAsync` in original context.
1. Propagate the span to any other thread.
1. After all set, call `#asyncFinish` in any thread.
1. Tracing context will be finished and report to backend when all spans's `#prepareForAsync` finished(Judged by count of API execution).
## Develop a plugin
### Abstract
The basic method to trace is intercepting a Java method, by using byte code manipulation tech and AOP concept.
......
......@@ -47,9 +47,9 @@ e.g. accessing DB by JDBC, reading Redis/Memcached are cataloged an ExitSpan.
3. Span parent info called Reference, which is included in span. Reference carries more fields besides
trace id, parent segment id, span id. Others are **entry service instance id**, **parent service instance id**,
**entry endpoint**, **parent endpoint** and **network address**. Follow [SkyWalking Trace Data Protocol v2](Trace-Data-Protocol-v2.md),
**entry endpoint**, **parent endpoint** and **network address**. Follow [Cross Process Propagation Headers Protocol v2](Skywalking-Cross-Process-Propagation-Headers-Protocol-v2.md),
you will know how to get all these fields.
### Step 3. Keep alive.
`ServiceInstancePing#doPing` should be called per several seconds. Make the backend know this instance is still
alive. Existed **service instance id** and **UUID** used in `doServiceInstanceRegister` are required.
\ No newline at end of file
alive. Existed **service instance id** and **UUID** used in `doServiceInstanceRegister` are required.
......@@ -48,7 +48,8 @@ or providing commercial products including Apache SkyWalking.
1. Mxnavi. 沈阳美行科技有限公司 http://www.mxnavi.com/
1. Moji 墨叽(深圳)科技有限公司 https://www.mojivip.com
1. Mypharma.com 北京融贯电子商务有限公司 https://www.mypharma.com
1. Primeton.com 普元信息技术股份有限公司 http://www.primeton.com .
1. Osacart in WeChat app 广州美克曼尼电子商务有限公司
1. Primeton.com 普元信息技术股份有限公司 http://www.primeton.com
1. qiniu.com 七牛云 http://qiniu.com
1. Qingyidai.com 轻易贷 https://www.qingyidai.com/
1. Qsdjf.com 浙江钱宝网络科技有限公司 https://www.qsdjf.com/index.html
......@@ -92,3 +93,5 @@ Provide a customized version SkyWalking agent. It could provide distributed trac
## Primeton
Integrated in Primeton EOS PLATFORM 8, which is a commercial micro-service platform.
## Oscart
Use multiple language agents from SkyWalking and its ecosystem, including SkyWalking Javaagent and [SkyAPM nodejs agent](https://github.com/SkyAPM/SkyAPM-nodejs). SkyWalking OAP platform acts as backend and visualization.
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>exporter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
......@@ -28,8 +28,6 @@ import org.apache.skywalking.oap.server.core.Const;
<#break>
</#if>
</#list>
import org.apache.skywalking.oap.server.core.alarm.AlarmMeta;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
......@@ -45,7 +43,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@IndicatorType
@StreamData
@StorageEntity(name = "${tableName}", builder = ${metricName}Indicator.Builder.class, sourceScopeId = ${sourceScopeId})
public class ${metricName}Indicator extends ${indicatorClassName} implements AlarmSupported {
public class ${metricName}Indicator extends ${indicatorClassName} implements WithMetadata {
<#list fieldsFromSource as sourceField>
@Setter @Getter @Column(columnName = "${sourceField.columnName}") <#if sourceField.isID()>@IDColumn</#if> private ${sourceField.typeName} ${sourceField.fieldName};
......@@ -170,8 +168,8 @@ public class ${metricName}Indicator extends ${indicatorClassName} implements Ala
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}</#if></#list></#if>);
@Override public IndicatorMetaInfo getMeta() {
return new IndicatorMetaInfo("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}</#if></#list></#if>);
}
@Override
......
......@@ -21,8 +21,6 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service.service
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.AlarmMeta;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
......@@ -38,7 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@IndicatorType
@StreamData
@StorageEntity(name = "service_avg", builder = ServiceAvgIndicator.Builder.class, sourceScopeId = 1)
public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSupported {
public class ServiceAvgIndicator extends LongAvgIndicator implements WithMetadata {
@Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId;
......@@ -108,8 +106,8 @@ public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSuppor
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("generate_indicator", 1, entityId);
@Override public IndicatorMetaInfo getMeta() {
return new IndicatorMetaInfo("generate_indicator", 1, entityId);
}
@Override
......
......@@ -41,6 +41,7 @@
<module>generate-tool</module>
<module>server-telemetry</module>
<module>generate-tool-grammar</module>
<module>exporter</module>
</modules>
<properties>
......
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.alarm;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -50,33 +50,33 @@ public class AlarmEntrance {
init();
AlarmMeta alarmMeta = ((AlarmSupported)indicator).getAlarmMeta();
IndicatorMetaInfo indicatorMetaInfo = ((WithMetadata)indicator).getMeta();
MetaInAlarm metaInAlarm;
switch (alarmMeta.getScope()) {
switch (indicatorMetaInfo.getScope()) {
case SERVICE:
int serviceId = Integer.parseInt(alarmMeta.getId());
int serviceId = Integer.parseInt(indicatorMetaInfo.getId());
ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId);
ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm();
serviceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
serviceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
serviceMetaInAlarm.setId(serviceId);
serviceMetaInAlarm.setName(serviceInventory.getName());
metaInAlarm = serviceMetaInAlarm;
break;
case SERVICE_INSTANCE:
int serviceInstanceId = Integer.parseInt(alarmMeta.getId());
int serviceInstanceId = Integer.parseInt(indicatorMetaInfo.getId());
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
ServiceInstanceMetaInAlarm instanceMetaInAlarm = new ServiceInstanceMetaInAlarm();
instanceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
instanceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
instanceMetaInAlarm.setId(serviceInstanceId);
instanceMetaInAlarm.setName(serviceInstanceInventory.getName());
metaInAlarm = instanceMetaInAlarm;
break;
case ENDPOINT:
int endpointId = Integer.parseInt(alarmMeta.getId());
int endpointId = Integer.parseInt(indicatorMetaInfo.getId());
EndpointInventory endpointInventory = endpointInventoryCache.get(endpointId);
EndpointMetaInAlarm endpointMetaInAlarm = new EndpointMetaInAlarm();
endpointMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
endpointMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
endpointMetaInAlarm.setId(endpointId);
serviceId = endpointInventory.getServiceId();
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.core.alarm;
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
......@@ -24,18 +24,18 @@ import org.apache.skywalking.oap.server.core.Const;
/**
* @author wusheng
*/
public class AlarmMeta {
public class IndicatorMetaInfo {
@Setter @Getter private String indicatorName;
@Setter @Getter private int scope;
@Setter @Getter private String id;
public AlarmMeta(String indicatorName, int scope) {
public IndicatorMetaInfo(String indicatorName, int scope) {
this.indicatorName = indicatorName;
this.scope = scope;
this.id = Const.EMPTY_STRING;
}
public AlarmMeta(String indicatorName, int scope, String id) {
public IndicatorMetaInfo(String indicatorName, int scope, String id) {
this.indicatorName = indicatorName;
this.scope = scope;
this.id = id;
......@@ -48,4 +48,12 @@ public class AlarmMeta {
public void setId(String id) {
this.id = id;
}
@Override public String toString() {
return "IndicatorMetaInfo{" +
"indicatorName='" + indicatorName + '\'' +
", scope=" + scope +
", id='" + id + '\'' +
'}';
}
}
......@@ -16,13 +16,13 @@
*
*/
package org.apache.skywalking.oap.server.core.alarm;
package org.apache.skywalking.oap.server.core.analysis.indicator;
/**
* Alarm supported interface implementor could return the {@link AlarmMeta}
* Indicator, which implement this interface, could provide {@link IndicatorMetaInfo}.
*
* @author wusheng
*/
public interface AlarmSupported {
AlarmMeta getAlarmMeta();
public interface WithMetadata {
IndicatorMetaInfo getMeta();
}
......@@ -18,9 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -40,7 +40,7 @@ public class AlarmNotifyWorker extends AbstractWorker<Indicator> {
}
@Override public void in(Indicator indicator) {
if (indicator instanceof AlarmSupported) {
if (indicator instanceof WithMetadata) {
entrance.forward(indicator);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author wusheng
*/
public class ExportWorker extends AbstractWorker<Indicator> {
private ModuleManager moduleManager;
private MetricValuesExportService exportService;
public ExportWorker(int workerId, ModuleManager moduleManager) {
super(workerId);
this.moduleManager = moduleManager;
}
@Override public void in(Indicator indicator) {
if (exportService != null || moduleManager.has(ExporterModule.NAME)) {
if (indicator instanceof WithMetadata) {
if (exportService == null) {
exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
exportService.export(((WithMetadata)indicator).getMeta(), indicator);
}
}
}
}
......@@ -18,21 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
import static java.util.Objects.nonNull;
......@@ -46,16 +41,19 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
private final String modelName;
private final MergeDataCache<Indicator> mergeDataCache;
private final IIndicatorDAO indicatorDAO;
private final AbstractWorker<Indicator> nextWorker;
private final AbstractWorker<Indicator> nextAlarmWorker;
private final AbstractWorker<Indicator> nextExportWorker;
private final DataCarrier<Indicator> dataCarrier;
IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextAlarmWorker,
AbstractWorker<Indicator> nextExportWorker) {
super(moduleManager, workerId, batchSize);
this.modelName = modelName;
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
this.nextAlarmWorker = nextAlarmWorker;
this.nextExportWorker = nextExportWorker;
String name = "INDICATOR_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
......@@ -117,8 +115,11 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
}
if (Objects.nonNull(nextWorker)) {
nextWorker.in(data);
if (Objects.nonNull(nextAlarmWorker)) {
nextAlarmWorker.in(data);
}
if (Objects.nonNull(nextExportWorker)) {
nextExportWorker.in(data);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
......
......@@ -74,8 +74,11 @@ public enum IndicatorProcess {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker);
ExportWorker exportWorker = new ExportWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(exportWorker.getWorkerId(), exportWorker);
IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO, alarmNotifyWorker);
1000, moduleManager, indicatorDAO, alarmNotifyWorker, exportWorker);
WorkerInstances.INSTANCES.put(minutePersistentWorker.getWorkerId(), minutePersistentWorker);
persistentWorkers.add(minutePersistentWorker);
......@@ -85,7 +88,7 @@ public enum IndicatorProcess {
private IndicatorPersistentWorker worker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) {
IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO, null);
1000, moduleManager, indicatorDAO, null, null);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author wusheng
*/
public class ExporterModule extends ModuleDefine {
public static final String NAME = "exporter";
public ExporterModule() {
super(NAME);
}
@Override public Class[] services() {
return new Class[] {MetricValuesExportService.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* Export the metric value from indicators through this service, if provider exists.
*
* @author wusheng
*/
public interface MetricValuesExportService extends Service {
void export(IndicatorMetaInfo meta, Indicator indicator);
}
......@@ -20,4 +20,5 @@ org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
org.apache.skywalking.oap.server.core.query.QueryModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule
\ No newline at end of file
org.apache.skywalking.oap.server.core.alarm.AlarmModule
org.apache.skywalking.oap.server.core.exporter.ExporterModule
\ No newline at end of file
......@@ -30,6 +30,7 @@ import graphql.GraphQLError;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
......@@ -51,6 +52,8 @@ public class GraphQLQueryHandler extends JettyJsonHandler {
private static final String MESSAGE = "message";
private final Gson gson = new Gson();
private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {
}.getType();
private final String path;
......@@ -75,8 +78,7 @@ public class GraphQLQueryHandler extends JettyJsonHandler {
JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), new TypeToken<Map<String, Object>>() {
}.getType()));
return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
}
private JsonObject execute(String request, Map<String, Object> variables) {
......
......@@ -17,7 +17,8 @@
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.skywalking</groupId>
......@@ -82,7 +83,6 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<scala.compiler.version>2.11.7</scala.compiler.version>
<powermock.version>1.6.4</powermock.version>
<checkstyle.version>6.18</checkstyle.version>
<junit.version>4.12</junit.version>
......@@ -114,13 +114,11 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito-all.version}</version>
<scope>test</scope>
</dependency>
<dependency>
......@@ -140,13 +138,13 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<version>${mockito-all.version}</version>
<scope>test</scope>
</dependency>
<dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册