提交 8b22a52d 编写于 作者: P pengys5

Merge branch 'feature/high-performance-agent' of...

Merge branch 'feature/high-performance-agent' of https://github.com/wu-sheng/sky-walking into feature/266
......@@ -32,13 +32,15 @@ ___
# Contributors
_In chronological order_
* 吴晟 [**Project Maintainer**] [@wu-sheng](https://github.com/wu-sheng) Principle Engineer, 2012 Lab, Huawei.
* 张鑫 [**Project Maintainer**] [@ascrutae](https://github.com/ascrutae)
* 吴晟 [**PMC Member**] [@wu-sheng](https://github.com/wu-sheng) Principle Engineer, 2012 Lab, Huawei.
* 张鑫 [**PMC Member**] [@ascrutae](https://github.com/ascrutae)
* 谭真 [@mircoteam](https://github.com/mircoteam) Advanced R&D Engineers, Creative & Interactive Group.
* 徐妍 [@TastySummer](https://github.com/TastySummer)
* 彭勇升 [**Project Maintainer**] [@pengys5](https://github.com/pengys5) Technical Specialist, OneAPM.
* 彭勇升 [**PMC Member**] [@pengys5](https://github.com/pengys5) Technical Specialist, OneAPM.
* 戴文
* 柏杨 [@bai-yang](https://github.com/bai-yang) Senior Engineer, Alibaba Group.
* 陈凤 [@trey03](https://github.com/trey03)
* [More contributors](https://github.com/wu-sheng/sky-walking/graphs/contributors)
This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to wu.sheng@foxmail.com.
......
......@@ -15,12 +15,12 @@
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>0.20.4</version>
<version>0.30.0</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-noop</artifactId>
<version>0.20.4</version>
<version>0.30.0</version>
</dependency>
</dependencies>
......
package org.skywalking.apm.toolkit.opentracing;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The <code>NeedSnifferActivation</code> annotation is flag for reader and maintainers,
* which represents this method should be activated/intercepted in sniffer.
*
* @author wusheng
*/
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR})
@Retention(RetentionPolicy.SOURCE)
public @interface NeedSnifferActivation {
String value() default "What should interceptor do?";
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* All source code in SkyWalkingSpanBuilder acts like an NoopSpanBuilder.
* Actually, it is NOT.
* The whole logic will be added after toolkit-activation.
* <p>
* Created by wusheng on 2016/12/20.
*/
public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
private String operationName;
private long startTime = 0L;
private final Map<String, String> tags;
private SpanContext parentContext;
SkyWalkingSpanBuilder(String operationName) {
this.operationName = operationName;
this.tags = new HashMap<String, String>();
}
/**
* In SkyWalkingTracer, SpanContext will not be used. Tracer will build reference by itself.
*
* @param spanContext
* @return
*/
@Override
public Tracer.SpanBuilder asChildOf(SpanContext spanContext) {
this.parentContext = spanContext;
return this;
}
/**
* In SkyWalkingTracer, Parent Span will not be used. Tracer will build reference by itself.
*
* @param span
* @return
*/
@Override
public Tracer.SpanBuilder asChildOf(Span span) {
asChildOf(span.context());
return this;
}
@Override
public Tracer.SpanBuilder addReference(String referenceType, SpanContext referencedContext) {
if (referenceType.equals(References.CHILD_OF)) {
return asChildOf(referencedContext);
} else {
return this;
}
}
@Override
public Tracer.SpanBuilder withTag(String key, String value) {
if (key != null && value != null) {
tags.put(key, value);
}
return this;
}
@Override
public Tracer.SpanBuilder withTag(String key, boolean value) {
if (key != null) {
tags.put(key, Boolean.toString(value));
}
return this;
}
@Override
public Tracer.SpanBuilder withTag(String key, Number value) {
if (key != null && value != null) {
tags.put(key, value.toString());
}
return this;
}
@Override
public Tracer.SpanBuilder withStartTimestamp(long startTime) {
this.startTime = startTime;
return this;
}
@Override
public Span start() {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
return new SkyWalkingSpan(this.operationName, this.startTime, this.tags);
}
@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return parentContext == null
? Collections.<String, String>emptyMap().entrySet()
: parentContext.baggageItems();
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.ActiveSpan;
import io.opentracing.SpanContext;
import java.util.Map;
/**
* The <code>SkywalkingActiveSpan</code> is an extension of {@link SkywalkingSpan},
* but because of Java inheritance restrict, only can do with a facade mode.
*
* @author wusheng
*/
public class SkywalkingActiveSpan implements ActiveSpan {
private SkywalkingSpan span;
public SkywalkingActiveSpan(SkywalkingSpan span) {
this.span = span;
}
@Override
public void deactivate() {
span.finish();
}
@Override
public void close() {
this.deactivate();
}
@Override
public Continuation capture() {
return new SkywalkingContinuation();
}
@Override
public SpanContext context() {
return span.context();
}
@Override
public ActiveSpan setTag(String key, String value) {
span.setTag(key, value);
return this;
}
@Override
public ActiveSpan setTag(String key, boolean value) {
span.setTag(key, value);
return this;
}
@Override
public ActiveSpan setTag(String key, Number value) {
span.setTag(key, value);
return this;
}
@Override
public ActiveSpan log(Map<String, ?> fields) {
span.log(fields);
return this;
}
@Override
public ActiveSpan log(long timestampMicroseconds, Map<String, ?> fields) {
span.log(timestampMicroseconds, fields);
return this;
}
@Override
public ActiveSpan log(String event) {
span.log(event);
return this;
}
@Override
public ActiveSpan log(long timestampMicroseconds, String event) {
span.log(timestampMicroseconds, event);
return this;
}
/**
* Don't support baggage item.
*/
@Override
public ActiveSpan setBaggageItem(String key, String value) {
return this;
}
/**
* Don't support baggage item.
*
* @return null, always.
*/
@Override
public String getBaggageItem(String key) {
return null;
}
@Override
public ActiveSpan setOperationName(String operationName) {
span.setOperationName(operationName);
return this;
}
/**
* Don't support logging with payload.
*/
@Deprecated
@Override
public ActiveSpan log(String eventName, Object payload) {
return this;
}
/**
* Don't support logging with payload.
*/
@Deprecated
@Override
public ActiveSpan log(long timestampMicroseconds, String eventName, Object payload) {
return this;
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.SpanContext;
import java.util.Map;
/**
* Skywalking tracer context based on {@link ThreadLocal} auto mechanism.
*
* @author wusheng
*/
public class SkywalkingContext implements SpanContext {
public static final SkywalkingContext INSTANCE = new SkywalkingContext();
private SkywalkingContext() {
}
@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return null;
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.ActiveSpan;
/**
* @author wusheng
*/
public class SkywalkingContinuation implements ActiveSpan.Continuation {
@NeedSnifferActivation("1. ContextManager#capture" +
"2. set ContextSnapshot to the dynamic field")
public SkywalkingContinuation() {
}
@NeedSnifferActivation("1. get ContextSnapshot from the dynamic field" +
"2. ContextManager#continued")
@Override
public ActiveSpan activate() {
SkywalkingSpanBuilder builder = new SkywalkingSpanBuilder("Thread/" + Thread.currentThread().getName());
return builder.startActive();
}
}
......@@ -2,112 +2,127 @@ package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wusheng on 2016/12/20.
* @author wusheng
*/
public class SkyWalkingSpan implements Span, SpanContext {
private String operationName;
private long startTime;
private Map<String, String> tags;
public class SkywalkingSpan implements Span {
@NeedSnifferActivation(
"1.ContextManager#createSpan (Entry,Exit,Local based on builder)." +
"2.set the span reference to the dynamic field of enhanced SkywalkingSpan")
SkywalkingSpan(SkywalkingSpanBuilder builder) {
}
private final Map<String, String> baggageItems;
/**
* Create a shell span for {@link SkywalkingTracer#activeSpan()}
*
* @param tracer
*/
@NeedSnifferActivation(
"1. set the span reference to the dynamic field of enhanced SkywalkingSpan")
public SkywalkingSpan(SkywalkingTracer tracer) {
SkyWalkingSpan(String operationName, long startTime, Map<String, String> tags) {
this.operationName = operationName;
this.startTime = startTime;
this.tags = tags;
baggageItems = new HashMap<String, String>();
}
@NeedSnifferActivation("Override span's operationName, which has been given at ")
@Override
public SpanContext context() {
public Span setOperationName(String operationName) {
return this;
}
@NeedSnifferActivation("AbstractTracingSpan#log(long timestampMicroseconds, Map<String, ?> fields)")
@Override
public void finish() {
public Span log(long timestampMicroseconds, Map<String, ?> fields) {
return this;
}
/**
* Stop the active span
*
* @param finishMicros
*/
@NeedSnifferActivation(
"1.ContextManager#stopSpan(AbstractSpan span)" +
"2. The parameter of stop methed is from the dynamic field of enhanced SkywalkingSpan")
@Override
public void finish(long finishMicros) {
}
@Override
public void close() {
public Span log(long timestampMicroseconds, String event) {
Map<String, String> eventMap = new HashMap<String, String>(1);
eventMap.put("event", event);
return log(timestampMicroseconds, eventMap);
}
@Override
public Span setTag(String key, String value) {
return this;
public void finish() {
this.finish(System.currentTimeMillis());
}
@Override
public Span setTag(String key, boolean value) {
return this;
public SpanContext context() {
return SkywalkingContext.INSTANCE;
}
@Override
public Span setTag(String key, Number value) {
return this;
@Override public Span setTag(String key, String value) {
return null;
}
@Override
public Span log(Map<String, ?> fields) {
return this;
@Override public Span setTag(String key, boolean value) {
return null;
}
@Override
public Span log(long timestampMicroseconds, Map<String, ?> fields) {
return this;
@Override public Span setTag(String key, Number value) {
return null;
}
@Override
public Span log(String event) {
return this;
public Span log(Map<String, ?> fields) {
return log(System.currentTimeMillis(), fields);
}
@Override
public Span log(long timestampMicroseconds, String event) {
return this;
public Span log(String event) {
return log(System.currentTimeMillis(), event);
}
/**
* Don't support baggage item.
*/
@Override
public Span setBaggageItem(String key, String value) {
baggageItems.put(key, value);
return this;
}
/**
* Don't support baggage item.
*
* @return null, always.
*/
@Override
public String getBaggageItem(String key) {
return baggageItems.get(key);
}
@Override
public Span setOperationName(String operationName) {
return this;
return null;
}
/**
* Don't support logging with payload.
*/
@Deprecated
@Override
public Span log(String eventName, Object payload) {
return this;
}
/**
* Don't support logging with payload.
*/
@Deprecated
@Override
public Span log(long timestampMicroseconds, String eventName, Object payload) {
return this;
}
@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return baggageItems.entrySet();
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.ActiveSpan;
import io.opentracing.BaseSpan;
import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import java.util.LinkedList;
import java.util.List;
/**
* @author wusheng
*/
public class SkywalkingSpanBuilder implements Tracer.SpanBuilder {
private List<Tag> tags = new LinkedList<Tag>();
private String operationName;
private boolean isEntry = false;
private boolean isExit = false;
private int port;
private String peer;
private String componentName;
private boolean isError = false;
private long startTime;
public SkywalkingSpanBuilder(String operationName) {
this.operationName = operationName;
}
@Override
public Tracer.SpanBuilder asChildOf(SpanContext parent) {
if (parent instanceof SkywalkingContext) {
return this;
}
throw new IllegalArgumentException("parent must be type of SpanContext");
}
@Override
public Tracer.SpanBuilder asChildOf(BaseSpan<?> parent) {
if (parent instanceof SkywalkingSpan || parent instanceof SkywalkingActiveSpan) {
return this;
}
throw new IllegalArgumentException("parent must be type of SkywalkingSpan");
}
/**
* Ignore the reference type. the span always the entry or has a parent span.
*
* @param referenceType
* @param referencedContext
* @return
*/
@Override
public Tracer.SpanBuilder addReference(String referenceType, SpanContext referencedContext) {
if (References.FOLLOWS_FROM.equals(referenceType)) {
throw new IllegalArgumentException("only support CHILD_OF reference");
}
return asChildOf(referencedContext);
}
@Override
public Tracer.SpanBuilder withTag(String key, String value) {
if (Tags.COMPONENT.getKey().equals(key)) {
componentName = value;
} else if (Tags.SPAN_KIND.getKey().equals(key)) {
if (Tags.SPAN_KIND_CLIENT.equals(value) || Tags.SPAN_KIND_PRODUCER.equals(value)) {
isEntry = false;
isExit = true;
} else if (Tags.SPAN_KIND_SERVER.equals(value) || Tags.SPAN_KIND_CONSUMER.equals(value)) {
isEntry = true;
isExit = false;
} else {
isEntry = false;
isExit = false;
}
} else if (Tags.PEER_HOST_IPV4.getKey().equals(key) || Tags.PEER_HOST_IPV6.getKey().equals(key)
|| Tags.PEER_HOSTNAME.getKey().equals(key)) {
peer = value;
} else if (Tags.PEER_SERVICE.getKey().equals(key)) {
operationName = value;
} else {
tags.add(new Tag(key, value));
}
return this;
}
@Override
public Tracer.SpanBuilder withTag(String key, boolean value) {
if (Tags.ERROR.equals(key)) {
isError = value;
} else {
tags.add(new Tag(key, value ? "true" : "false"));
}
return this;
}
@Override
public Tracer.SpanBuilder withTag(String key, Number value) {
if (Tags.PEER_PORT.getKey().equals(key)) {
port = value.intValue();
} else {
tags.add(new Tag(key, value.toString()));
}
return this;
}
@Override
public Tracer.SpanBuilder withStartTimestamp(long microseconds) {
startTime = microseconds;
return this;
}
@Override
public ActiveSpan startActive() {
return new SkywalkingActiveSpan(new SkywalkingSpan(this));
}
@Override
public Span startManual() {
return new SkywalkingSpan(this);
}
@Override
@Deprecated
public Span start() {
return startManual();
}
/**
* All the get methods are for accessing data from activation
*/
public List<Tag> getTags() {
return tags;
}
public String getOperationName() {
return operationName;
}
public boolean isEntry() {
return isEntry;
}
public boolean isExit() {
return isExit;
}
public int getPort() {
return port;
}
public String getPeer() {
return peer;
}
public String getComponentName() {
return componentName;
}
public boolean isError() {
return isError;
}
public long getStartTime() {
return startTime;
}
/**
* All the following methods are needed for activation.
*/
@Override
@NeedSnifferActivation("Stop the active span.")
public Tracer.SpanBuilder ignoreActiveSpan() {
return this;
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.ActiveSpan;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
/**
* All source code in SkyWalkingTracer acts like an NoopTracer.
* Actually, it is NOT.
* The whole logic will be added after toolkit-activation.
* <p>
* Created by wusheng on 2016/12/20.
* @author wusheng
*/
public class SkyWalkingTracer implements Tracer {
public class SkywalkingTracer implements Tracer {
private static String TRACE_HEAD_NAME = "sw3";
public static Tracer INSTANCE = new SkyWalkingTracer();
@NeedSnifferActivation("1. ContextManager#inject" +
"2. ContextCarrier#serialize")
private String inject() {
return null;
}
@NeedSnifferActivation("1. ContextCarrier#deserialize" +
"2. ContextManager#extract")
private void extract(String carrier) {
}
@Override
public SpanBuilder buildSpan(String operationName) {
return new SkyWalkingSpanBuilder(operationName);
return new SkywalkingSpanBuilder(operationName);
}
@Override
public <C> void inject(SpanContext spanContext, Format<C> format, C carrier) {
if (Format.Builtin.TEXT_MAP.equals(format) || Format.Builtin.HTTP_HEADERS.equals(format)) {
((TextMap) carrier).put(TRACE_HEAD_NAME, formatInjectCrossProcessPropagationContextData());
((TextMap)carrier).put(TRACE_HEAD_NAME, inject());
} else if (Format.Builtin.BINARY.equals(format)) {
byte[] key = TRACE_HEAD_NAME.getBytes(ByteBufferContext.CHARSET);
byte[] value = formatInjectCrossProcessPropagationContextData().getBytes(ByteBufferContext.CHARSET);
((ByteBuffer) carrier).put(key);
((ByteBuffer) carrier).putInt(value.length);
((ByteBuffer) carrier).put(value);
byte[] value = inject().getBytes(ByteBufferContext.CHARSET);
((ByteBuffer)carrier).put(key);
((ByteBuffer)carrier).putInt(value.length);
((ByteBuffer)carrier).put(value);
} else {
throw new IllegalArgumentException("Unsupported format: " + format);
}
}
@Override
public <C> SpanContext extract(Format<C> format, C carrier) {
if (Format.Builtin.TEXT_MAP.equals(format) || Format.Builtin.HTTP_HEADERS.equals(format)) {
TextMap textMapCarrier = (TextMap) carrier;
formatExtractCrossProcessPropagationContextData(fetchContextData(textMapCarrier));
TextMap textMapCarrier = (TextMap)carrier;
extract(fetchContextData(textMapCarrier));
return new TextMapContext(textMapCarrier);
} else if (Format.Builtin.BINARY.equals(format)) {
ByteBuffer byteBufferCarrier = (ByteBuffer) carrier;
formatExtractCrossProcessPropagationContextData(fetchContextData(byteBufferCarrier));
return new ByteBufferContext((ByteBuffer) carrier);
ByteBuffer byteBufferCarrier = (ByteBuffer)carrier;
extract(fetchContextData(byteBufferCarrier));
return new ByteBufferContext((ByteBuffer)carrier);
} else {
throw new IllegalArgumentException("Unsupported format: " + format);
}
}
/**
* set context data in toolkit-opentracing-activation
*/
private String formatInjectCrossProcessPropagationContextData() {
return "";
@Override
public ActiveSpan activeSpan() {
return new SkywalkingActiveSpan(new SkywalkingSpan(this));
}
/**
* read context data in toolkit-opentracing-activation
*/
private void formatExtractCrossProcessPropagationContextData(String contextData) {
@Override
public ActiveSpan makeActive(Span span) {
if (span instanceof SkywalkingSpan) {
return new SkywalkingActiveSpan((SkywalkingSpan)span);
} else {
throw new IllegalArgumentException("span must be a type of SkywalkingSpan");
}
}
private String fetchContextData(TextMap textMap) {
......@@ -99,4 +107,5 @@ public class SkyWalkingTracer implements Tracer {
return null;
}
}
}
package org.skywalking.apm.toolkit.opentracing;
/**
* @author wusheng
*/
public class Tag {
private String key;
private String value;
public Tag(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
}
package org.skywalking.apm.toolkit.opentracing;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.TextMap;
import org.junit.Test;
import java.util.Iterator;
import java.util.Map;
/**
* Created by wusheng on 2016/12/21.
*/
public class SkyWalkingTracerTest {
@Test
public void testBuildSpan() {
Tracer tracer = SkyWalkingTracer.INSTANCE;
Tracer.SpanBuilder spanBuilder = tracer.buildSpan("/http/serviceName");
SpanContext context = new TextMapContext(new TextMap() {
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"TextMapInjectAdapter should only be used with Tracer.inject()");
}
@Override
public void put(String key, String value) {
}
});
spanBuilder.asChildOf(context).withTag("example.tag", "testtag");
Span span = spanBuilder.start();
span.finish();
}
}
......@@ -9,23 +9,6 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-logging-log4j2</artifactId>
<artifactId>apm-datacarrier</artifactId>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-logging-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
</project>
package org.skywalking.apm.agent.core.datacarrier;
package org.skywalking.apm.commons.datacarrier;
import org.skywalking.apm.agent.core.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.agent.core.datacarrier.buffer.Channels;
import org.skywalking.apm.agent.core.datacarrier.consumer.ConsumerPool;
import org.skywalking.apm.agent.core.datacarrier.consumer.IConsumer;
import org.skywalking.apm.agent.core.datacarrier.partition.IDataPartitioner;
import org.skywalking.apm.agent.core.datacarrier.partition.SimpleRollingPartitioner;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
import org.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
/**
* DataCarrier main class.
......
package org.skywalking.apm.agent.core.datacarrier.buffer;
package org.skywalking.apm.commons.datacarrier.buffer;
import java.util.LinkedList;
import org.skywalking.apm.agent.core.datacarrier.common.AtomicRangeInteger;
import org.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
* Created by wusheng on 2016/10/25.
......
package org.skywalking.apm.agent.core.datacarrier.buffer;
package org.skywalking.apm.commons.datacarrier.buffer;
import org.skywalking.apm.agent.core.datacarrier.partition.IDataPartitioner;
import org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
/**
* Channels of Buffer
......
package org.skywalking.apm.agent.core.datacarrier.common;
package org.skywalking.apm.commons.datacarrier.common;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
......
package org.skywalking.apm.agent.core.datacarrier.consumer;
package org.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.skywalking.apm.agent.core.datacarrier.buffer.Buffer;
import org.skywalking.apm.agent.core.datacarrier.buffer.Channels;
import org.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers
......@@ -28,6 +28,7 @@ public class ConsumerPool<T> {
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype);
consumerThreads[i].setDaemon(true);
}
}
......
package org.skywalking.apm.agent.core.datacarrier.consumer;
package org.skywalking.apm.commons.datacarrier.consumer;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.datacarrier.buffer.Buffer;
import org.skywalking.apm.commons.datacarrier.buffer.Buffer;
/**
* Created by wusheng on 2016/10/25.
......
package org.skywalking.apm.commons.datacarrier;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import org.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.skywalking.apm.commons.datacarrier.partition.ProducerThreadPartitioner;
import org.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
/**
* Created by wusheng on 2016/10/25.
*/
public class DataCarrierTest {
@Test
public void testCreateDataCarrier() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(5, 100);
Assert.assertEquals(((Integer) (MemberModifier.field(DataCarrier.class, "bufferSize").get(carrier))).intValue(), 100);
Assert.assertEquals(((Integer) (MemberModifier.field(DataCarrier.class, "channelSize").get(carrier))).intValue(), 5);
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Assert.assertEquals(channels.getChannelSize(), 5);
Buffer<SampleData> buffer = channels.getBuffer(0);
Assert.assertEquals(buffer.getBufferSize(), 100);
Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.BLOCKING);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), SimpleRollingPartitioner.class);
carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), ProducerThreadPartitioner.class);
}
@Test
public void testProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
Assert.assertTrue(carrier.produce(new SampleData().setName("a")));
Assert.assertTrue(carrier.produce(new SampleData().setName("b")));
Assert.assertTrue(carrier.produce(new SampleData().setName("c")));
Assert.assertTrue(carrier.produce(new SampleData().setName("d")));
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
Assert.assertEquals(2, result1.size());
Assert.assertEquals(4, result1.size() + result2.size());
}
@Test
public void testOverrideProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
carrier.setBufferStrategy(BufferStrategy.OVERRIDE);
for (int i = 0; i < 500; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
}
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
Assert.assertEquals(200, result1.size() + result2.size());
}
@Test
public void testIfPossibleProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
}
for (int i = 0; i < 200; i++) {
Assert.assertFalse(carrier.produce(new SampleData().setName("d" + i + "_2")));
}
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
Assert.assertEquals(200, result1.size() + result2.size());
}
@Test
public void testBlockingProduce() throws IllegalAccessException {
final DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
}
long time1 = System.currentTimeMillis();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
IConsumer<SampleData> consumer = new IConsumer<SampleData>() {
int i = 0;
@Override
public void init() {
}
@Override
public void consume(List<SampleData> data) {
}
@Override
public void onError(List<SampleData> data, Throwable t) {
}
@Override
public void onExit() {
}
};
carrier.consume(consumer, 1);
}
}).start();
carrier.produce(new SampleData().setName("blocking-data"));
long time2 = System.currentTimeMillis();
Assert.assertTrue(time2 - time1 > 2000);
}
}
package org.skywalking.apm.commons.datacarrier;
/**
* Created by wusheng on 2016/10/25.
*/
public class SampleData {
private int intValue;
private String name;
public int getIntValue() {
return intValue;
}
public String getName() {
return name;
}
public SampleData setIntValue(int intValue) {
this.intValue = intValue;
return this;
}
public SampleData setName(String name) {
this.name = name;
return this;
}
}
package org.skywalking.apm.commons.datacarrier.common;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Created by xin on 2017/7/14.
*/
public class AtomicRangeIntegerTest {
@Test
public void testGetAndIncrement() {
AtomicRangeInteger atomicI = new AtomicRangeInteger(0, 10);
for (int i = 0; i < 10; i++) {
Assert.assertEquals(i, atomicI.getAndIncrement());
}
Assert.assertEquals(0, atomicI.getAndIncrement());
Assert.assertEquals(1, atomicI.get());
Assert.assertEquals(1, atomicI.intValue());
Assert.assertEquals(1, atomicI.longValue());
Assert.assertEquals(1, (int)atomicI.floatValue());
Assert.assertEquals(1, (int)atomicI.doubleValue());
}
}
package org.skywalking.apm.commons.datacarrier.consumer;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import org.skywalking.apm.commons.datacarrier.SampleData;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
/**
* Created by wusheng on 2016/10/26.
*/
public class ConsumerPoolTest {
@Test
public void testBeginConsumerPool() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
pool.begin();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertTrue(threads[0].isAlive());
Assert.assertTrue(threads[1].isAlive());
}
@Test
public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
pool.begin();
Thread.sleep(5000);
pool.close();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertFalse((Boolean)MemberModifier.field(ConsumerThread.class, "running").get(threads[0]));
Assert.assertFalse((Boolean)MemberModifier.field(ConsumerThread.class, "running").get(threads[1]));
}
}
package org.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import org.skywalking.apm.commons.datacarrier.DataCarrier;
import org.skywalking.apm.commons.datacarrier.SampleData;
/**
* Created by wusheng on 2016/10/26.
*/
public class ConsumerTest {
public static LinkedBlockingQueue<SampleData> buffer = new LinkedBlockingQueue<SampleData>();
public static boolean isOccurError = false;
@Test
public void testConsumerLessThanChannel() throws IllegalAccessException {
final DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
for (int i = 0; i < 100; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("data" + i)));
}
SampleConsumer consumer = new SampleConsumer();
consumer.i = 100;
carrier.consume(SampleConsumer.class, 1);
Assert.assertEquals(1, ((SampleConsumer)getConsumer(carrier)).i);
SampleConsumer2 consumer2 = new SampleConsumer2();
consumer2.i = 100;
carrier.consume(consumer2, 1);
Assert.assertEquals(100, ((SampleConsumer2)getConsumer(carrier)).i);
carrier.shutdownConsumers();
}
@Test
public void testConsumerMoreThanChannel() throws IllegalAccessException, InterruptedException {
final DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("data" + i)));
}
SampleConsumer consumer = new SampleConsumer();
carrier.consume(SampleConsumer.class, 5);
Thread.sleep(2000);
List<SampleData> result = new ArrayList<SampleData>();
buffer.drainTo(result);
Assert.assertEquals(200, result.size());
HashSet<Integer> consumerCounter = new HashSet<Integer>();
for (SampleData data : result) {
consumerCounter.add(data.getIntValue());
}
Assert.assertEquals(5, consumerCounter.size());
}
@Test
public void testConsumerOnError() {
final DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("data" + i)));
}
SampleConsumer2 consumer = new SampleConsumer2();
consumer.onError = true;
carrier.consume(consumer, 5);
Assert.assertTrue(isOccurError);
}
class SampleConsumer2 implements IConsumer<SampleData> {
public int i = 1;
public boolean onError = false;
@Override
public void init() {
}
@Override
public void consume(List<SampleData> data) {
if (onError) {
throw new RuntimeException("consume exception");
}
}
@Override
public void onError(List<SampleData> data, Throwable t) {
isOccurError = true;
}
@Override
public void onExit() {
}
}
private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
ConsumerPool pool = ((ConsumerPool)MemberModifier.field(DataCarrier.class, "consumerPool").get(carrier));
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
return (IConsumer)MemberModifier.field(ConsumerThread.class, "consumer").get(threads[0]);
}
}
package org.skywalking.apm.commons.datacarrier.consumer;
import java.util.List;
import org.skywalking.apm.commons.datacarrier.SampleData;
/**
* Created by wusheng on 2016/10/26.
*/
public class SampleConsumer implements IConsumer<SampleData> {
public int i = 1;
@Override
public void init() {
}
@Override
public void consume(List<SampleData> data) {
for(SampleData one : data) {
one.setIntValue(this.hashCode());
ConsumerTest.buffer.offer(one);
}
}
@Override
public void onError(List<SampleData> data, Throwable t) {
}
@Override
public void onExit() {
}
}
package org.skywalking.apm.commons.datacarrier.partition;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.commons.datacarrier.SampleData;
/**
* Created by wusheng on 2016/10/25.
*/
public class ProducerThreadPartitionerTest {
@Test
public void testPartition() {
int partitionNum = (int)Thread.currentThread().getId() % 10;
ProducerThreadPartitioner<SampleData> partitioner = new ProducerThreadPartitioner<SampleData>();
Assert.assertEquals(partitioner.partition(10, new SampleData()), partitionNum);
Assert.assertEquals(partitioner.partition(10, new SampleData()), partitionNum);
Assert.assertEquals(partitioner.partition(10, new SampleData()), partitionNum);
}
}
package org.skywalking.apm.commons.datacarrier.partition;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.commons.datacarrier.SampleData;
/**
* Created by wusheng on 2016/10/25.
*/
public class SimpleRollingPartitionerTest {
@Test
public void testPartition() {
SimpleRollingPartitioner<SampleData> partitioner = new SimpleRollingPartitioner<SampleData>();
Assert.assertEquals(partitioner.partition(10, new SampleData()), 0);
Assert.assertEquals(partitioner.partition(10, new SampleData()), 1);
Assert.assertEquals(partitioner.partition(10, new SampleData()), 2);
}
}
package org.skywalking.apm.logging.log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.logging.ILog;
/**
* @author wusheng
*/
public class Log4j2Logger implements ILog {
private Logger delegateLogger;
Log4j2Logger(Class<?> targetClass) {
delegateLogger = LogManager.getFormatterLogger(targetClass);
}
@Override
public void info(String format) {
delegateLogger.info(format);
}
@Override
public void info(String format, Object... arguments) {
delegateLogger.info(format, arguments);
}
@Override
public void warn(String format, Object... arguments) {
delegateLogger.warn(format, arguments);
}
@Override
public void error(String format, Throwable e) {
delegateLogger.error(format, e);
}
@Override
public void error(Throwable e, String format, Object... arguments) {
delegateLogger.error(format, e, arguments);
}
@Override
public boolean isDebugEnable() {
return delegateLogger.isDebugEnabled();
}
@Override
public boolean isInfoEnable() {
return delegateLogger.isInfoEnabled();
}
@Override
public boolean isWarnEnable() {
return delegateLogger.isWarnEnabled();
}
@Override
public boolean isErrorEnable() {
return delegateLogger.isErrorEnabled();
}
@Override
public void debug(String format) {
delegateLogger.debug(format);
}
@Override
public void debug(String format, Object... arguments) {
delegateLogger.debug(format, arguments);
}
@Override
public void error(String format) {
delegateLogger.error(format);
}
}
package org.skywalking.apm.logging.log4j2;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogResolver;
/**
* The <code>LogResolver</code> is an implementation of {@link LogResolver},
*
* @author wusheng
*/
public class Log4j2Resolver implements LogResolver {
@Override
public ILog getLogger(Class<?> clazz) {
return new Log4j2Logger(clazz);
}
}
......@@ -13,7 +13,7 @@
<modules>
<module>apm-util</module>
<module>apm-logging-api</module>
<module>apm-logging-log4j2</module>
<module>apm-datacarrier</module>
</modules>
<name>apm-commons</name>
......
package org.skywalking.apm.network.trace.component;
/**
* The <code>Component</code> represents component library,
* which has been supported by skywalking sniffer.
*
* The supported list is in {@link ComponentsDefine}.
*
* @author wusheng
*/
public interface Component {
......
package org.skywalking.apm.network.trace.component;
/**
* The supported list of skywalking java sniffer.
*
* @author wusheng
*/
public class ComponentsDefine {
......
......@@ -25,13 +25,14 @@ message TraceSegmentObject {
}
message TraceSegmentReference {
string parentTraceSegmentId = 1;
int32 parentSpanId = 2;
int32 parentApplicationId = 3;
string networkAddress = 4;
int32 networkAddressId = 5;
string entryServiceName = 6;
int32 entryServiceId = 7;
RefType refType = 1;
string parentTraceSegmentId = 2;
int32 parentSpanId = 3;
int32 parentApplicationInstanceId = 4;
string networkAddress = 5;
int32 networkAddressId = 6;
string entryServiceName = 7;
int32 entryServiceId = 8;
}
message SpanObject {
......@@ -52,6 +53,11 @@ message SpanObject {
repeated LogMessage logs = 15;
}
enum RefType {
CrossProcess = 0;
CrossThread = 1;
}
enum SpanType {
Entry = 0;
Exit = 1;
......
......@@ -77,6 +77,29 @@
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<artifactId>mockito-core</artifactId>
<groupId>org.mockito</groupId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-datacarrier</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
......
......@@ -34,6 +34,18 @@ public class Config {
}
public static class Collector {
/**
* grpc channel status check interval
*/
public static long GRPC_CHANNEL_CHECK_INTERVAL = 30;
/**
* application and service registry check interval
*/
public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 10;
/**
* discovery rest check interval
*/
public static long DISCOVERY_CHECK_INTERVAL = 60;
/**
* Collector REST-Service address.
* e.g.
......@@ -45,7 +57,7 @@ public class Config {
/**
* Collector service discovery REST service name
*/
public static String DISCOVERY_SERVICE_NAME = "grpc/addresses";
public static String DISCOVERY_SERVICE_NAME = "/grpc/addresses";
}
public static class Buffer {
......
......@@ -5,11 +5,16 @@ import java.util.List;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
/**
* The <code>RemoteDownstreamConfig</code> includes configurations from collector side.
* All of them initialized null, Null-Value or empty collection.
*
* @author wusheng
*/
public class RemoteDownstreamConfig {
public static class Agent {
public volatile static int APPLICATION_ID = DictionaryUtil.nullValue();
public volatile static int APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue();
}
public static class Collector {
......
......@@ -83,7 +83,7 @@ public class SnifferConfigInitializer {
/**
* Load the config file by the path, which is provided by system property, usually with a "-DconfigPath=" arg.
*
* @return the config file {@link InputStream}, or null if not exist.
* @return the config file {@link InputStream}, or null if not needEnhance.
*/
private static InputStream loadConfigBySystemProperty() {
String configPath = System.getProperty("configPath");
......@@ -107,7 +107,7 @@ public class SnifferConfigInitializer {
/**
* Load the config file, where the agent jar is.
*
* @return the config file {@link InputStream}, or null if not exist.
* @return the config file {@link InputStream}, or null if not needEnhance.
*/
private static InputStream loadConfigFromAgentFolder() {
String agentBasePath = initAgentBasePath();
......
package org.skywalking.apm.agent.core.context;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.EntrySpan;
import org.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.skywalking.apm.agent.core.context.trace.LocalSpan;
/**
* The <code>AbstractTracerContext</code> represents the tracer context manager.
......@@ -8,21 +11,89 @@ import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
* @author wusheng
*/
public interface AbstractTracerContext {
/**
* Prepare for the cross-process propagation.
* How to initialize the carrier, depends on the implementation.
*
* @param carrier to carry the context for crossing process.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
void inject(ContextCarrier carrier);
/**
* Build the reference between this segment and a cross-process segment.
* How to build, depends on the implementation.
*
* @param carrier carried the context from a cross-process segment.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
void extract(ContextCarrier carrier);
/**
* Capture a snapshot for cross-thread propagation.
* It's a similar concept with ActiveSpan.Continuation in OpenTracing-java
* How to build, depends on the implementation.
*
* @return the {@link ContextSnapshot} , which includes the reference context.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
ContextSnapshot capture();
/**
* Build the reference between this segment and a cross-thread segment.
* How to build, depends on the implementation.
*
* @param snapshot from {@link #capture()} in the parent thread.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
void continued(ContextSnapshot snapshot);
/**
* Get the global trace id, if needEnhance.
* How to build, depends on the implementation.
*
* @return the string represents the id.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
String getGlobalTraceId();
/**
* Create an entry span
*
* @param operationName most likely a service name
* @return the span represents an entry point of this segment.
* @see {@link EntrySpan} if the implementation is {@link TracingContext}
*/
AbstractSpan createEntrySpan(String operationName);
/**
* Create a local span
*
* @param operationName most likely a local method signature, or business name.
* @return the span represents a local logic block.
* @see {@link LocalSpan} if the implementation is {@link TracingContext}
*/
AbstractSpan createLocalSpan(String operationName);
/**
* Create an exit span
*
* @param operationName most likely a service name of remote
* @param remotePeer the network id(ip:port, hostname:port or ip1:port1,ip2,port, etc.)
* @return the span represent an exit point of this segment.
* @see {@link ExitSpan} if the implementation is {@link TracingContext}
*/
AbstractSpan createExitSpan(String operationName, String remotePeer);
/**
* @return the active span of current tracing context(stack)
*/
AbstractSpan activeSpan();
/**
* Finish the given span, and the given span should be the active span of current tracing context(stack)
*
* @param span to finish
*/
void stopSpan(AbstractSpan span);
void dispose();
}
......@@ -23,7 +23,7 @@ public class ContextCarrier implements Serializable {
private int spanId = -1;
private int applicationId = DictionaryUtil.nullValue();
private int applicationInstanceId = DictionaryUtil.nullValue();
private String peerHost;
......@@ -45,7 +45,7 @@ public class ContextCarrier implements Serializable {
return StringUtil.join('|',
this.getTraceSegmentId(),
this.getSpanId() + "",
this.getApplicationId() + "",
this.getApplicationInstanceId() + "",
this.getPeerHost(),
this.getEntryOperationName(),
this.serializeDistributedTraceIds());
......@@ -66,7 +66,7 @@ public class ContextCarrier implements Serializable {
try {
this.traceSegmentId = parts[0];
this.spanId = Integer.parseInt(parts[1]);
this.applicationId = Integer.parseInt(parts[2]);
this.applicationInstanceId = Integer.parseInt(parts[2]);
this.peerHost = parts[3];
this.entryOperationName = parts[4];
this.distributedTraceIds = deserializeDistributedTraceIds(parts[5]);
......@@ -86,7 +86,7 @@ public class ContextCarrier implements Serializable {
public boolean isValid() {
return !StringUtil.isEmpty(traceSegmentId)
&& getSpanId() > -1
&& applicationId != DictionaryUtil.nullValue()
&& applicationInstanceId != DictionaryUtil.nullValue()
&& !StringUtil.isEmpty(peerHost)
&& !StringUtil.isEmpty(entryOperationName)
&& distributedTraceIds != null;
......@@ -120,12 +120,12 @@ public class ContextCarrier implements Serializable {
this.spanId = spanId;
}
public int getApplicationId() {
return applicationId;
public int getApplicationInstanceId() {
return applicationInstanceId;
}
void setApplicationId(int applicationId) {
this.applicationId = applicationId;
void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public String getPeerHost() {
......
......@@ -15,7 +15,9 @@ import org.skywalking.apm.util.StringUtil;
/**
* {@link ContextManager} controls the whole context of {@link TraceSegment}. Any {@link TraceSegment} relates to
* single-thread, so this context use {@link ThreadLocal} to maintain the context, and make sure, since a {@link
* TraceSegment} starts, all ChildOf spans are in the same context. <p> What is 'ChildOf'? {@see
* TraceSegment} starts, all ChildOf spans are in the same context.
* <p>
* What is 'ChildOf'? {@see
* https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans}
*
* <p> Also, {@link ContextManager} delegates to all {@link AbstractTracerContext}'s major methods.
......@@ -36,12 +38,9 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
context = new IgnoredTracerContext();
} else {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
/**
* Can't register to collector, no need to trace anything.
*/
context = new IgnoredTracerContext();
} else {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID != DictionaryUtil.nullValue()
|| RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()
) {
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
......@@ -53,6 +52,11 @@ public class ContextManager implements TracingContextListener, BootService, Igno
context = new IgnoredTracerContext();
}
}
} else {
/**
* Can't register to collector, no need to trace anything.
*/
context = new IgnoredTracerContext();
}
}
CONTEXT.set(context);
......@@ -65,7 +69,7 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
/**
* @return the first global trace id if exist. Otherwise, "N/A".
* @return the first global trace id if needEnhance. Otherwise, "N/A".
*/
public static String getGlobalTraceId() {
AbstractTracerContext segment = CONTEXT.get();
......@@ -104,12 +108,42 @@ public class ContextManager implements TracingContextListener, BootService, Igno
return span;
}
public static void inject(ContextCarrier carrier) {
get().inject(carrier);
}
public static void extract(ContextCarrier carrier) {
if (carrier == null) {
throw new IllegalArgumentException("ContextCarrier can't be null.");
}
if (carrier.isValid()) {
get().extract(carrier);
}
}
public static ContextSnapshot capture() {
return get().capture();
}
public static void continued(ContextSnapshot snapshot) {
if (snapshot == null) {
throw new IllegalArgumentException("ContextSnapshot can't be null.");
}
if (snapshot.isValid()) {
get().continued(snapshot);
}
}
public static AbstractSpan activeSpan() {
return get().activeSpan();
}
public static void stopSpan() {
get().stopSpan(activeSpan());
stopSpan(activeSpan());
}
public static void stopSpan(AbstractSpan span) {
get().stopSpan(span);
}
@Override
......
package org.skywalking.apm.agent.core.context;
import java.util.List;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
/**
* The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building
* reference between two segments in two thread, but have a causal relationship.
*
* @author wusheng
*/
public class ContextSnapshot {
/**
* trace segment id of the parent trace segment.
*/
private String traceSegmentId;
/**
* span id of the parent span, in parent trace segment.
*/
private int spanId = -1;
/**
* {@link DistributedTraceId}
*/
private List<DistributedTraceId> distributedTraceIds;
ContextSnapshot(String traceSegmentId, int spanId,
List<DistributedTraceId> distributedTraceIds) {
this.traceSegmentId = traceSegmentId;
this.spanId = spanId;
this.distributedTraceIds = distributedTraceIds;
}
public List<DistributedTraceId> getDistributedTraceIds() {
return distributedTraceIds;
}
public String getTraceSegmentId() {
return traceSegmentId;
}
public int getSpanId() {
return spanId;
}
public boolean isValid() {
return traceSegmentId != null
&& spanId > -1
&& distributedTraceIds != null
&& distributedTraceIds.size() > 0;
}
}
......@@ -7,8 +7,9 @@ import org.skywalking.apm.agent.core.context.trace.NoopSpan;
/**
* The <code>IgnoredTracerContext</code> represent a context should be ignored.
* So it just maintains the stack with integer depth.
* All operations through this <code>IgnoredTracerContext</code> will be ignored, with low gc cost.
* So it just maintains the stack with an integer depth field.
*
* All operations through this will be ignored, and keep the memory and gc cost as low as possible.
*
* @author wusheng
*/
......@@ -31,6 +32,14 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override public ContextSnapshot capture() {
return new ContextSnapshot(null, -1, null);
}
@Override public void continued(ContextSnapshot snapshot) {
}
@Override
public String getGlobalTraceId() {
return "[Ignored Trace]";
......@@ -67,11 +76,6 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
}
@Override
public void dispose() {
}
public static class ListenerManager {
private static List<IgnoreTracerContextListener> LISTENERS = new LinkedList<IgnoreTracerContextListener>();
......
......@@ -16,11 +16,29 @@ import org.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.skywalking.apm.agent.core.sampling.SamplingService;
/**
* The <code>TracingContext</code> represents a core tracing logic controller.
* It build the final {@link TracingContext}, by the stack mechanism,
* which is similar with the codes work.
*
* In opentracing concept, it means, all spans in a segment tracing context(thread)
* are CHILD_OF relationship, but no FOLLOW_OF.
*
* In skywalking core concept, FOLLOW_OF is an abstract concept
* when cross-process MQ or cross-thread async/batch tasks happen,
* we used {@link TraceSegmentRef} for these scenarios.
* Check {@link TraceSegmentRef} which is from {@link ContextCarrier} or {@link ContextSnapshot}.
*
* @author wusheng
*/
public class TracingContext implements AbstractTracerContext {
/**
* @see {@link SamplingService}
*/
private SamplingService samplingService;
/**
* The final {@link TraceSegment}, which includes all finished spans.
*/
private TraceSegment segment;
/**
......@@ -32,8 +50,14 @@ public class TracingContext implements AbstractTracerContext {
*/
private LinkedList<AbstractTracingSpan> activeSpanStack = new LinkedList<AbstractTracingSpan>();
/**
* A counter for the next span.
*/
private int spanIdGenerator;
/**
* Initialize all fields with default value.
*/
TracingContext() {
this.segment = new TraceSegment();
this.spanIdGenerator = 0;
......@@ -42,6 +66,13 @@ public class TracingContext implements AbstractTracerContext {
}
}
/**
* Inject the context into the given carrier, only when the active span is an exit one.
*
* @param carrier to carry the context for crossing process.
* @throws IllegalStateException, if the active span isn't an exit one.
* @see {@link AbstractTracerContext#inject(ContextCarrier)}
*/
@Override
public void inject(ContextCarrier carrier) {
AbstractTracingSpan span = this.activeSpan();
......@@ -53,7 +84,7 @@ public class TracingContext implements AbstractTracerContext {
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(span.getSpanId());
carrier.setApplicationId(segment.getApplicationId());
carrier.setApplicationInstanceId(segment.getApplicationId());
if (DictionaryUtil.isNull(exitSpan.getPeerId())) {
carrier.setPeerHost(exitSpan.getPeer());
......@@ -81,17 +112,59 @@ public class TracingContext implements AbstractTracerContext {
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
}
/**
* Extract the carrier to build the reference for the pre segment.
*
* @param carrier carried the context from a cross-process segment.
* @see {@link AbstractTracerContext#extract(ContextCarrier)}
*/
@Override
public void extract(ContextCarrier carrier) {
this.segment.ref(new TraceSegmentRef(carrier));
this.segment.relatedGlobalTraces(carrier.getDistributedTraceIds());
}
/**
* Capture the snapshot of current context.
*
* @return the snapshot of context for cross-thread propagation
* @see {@link AbstractTracerContext#capture()}
*/
@Override
public ContextSnapshot capture() {
return new ContextSnapshot(segment.getTraceSegmentId(),
activeSpan().getSpanId(),
segment.getRelatedGlobalTraces()
);
}
/**
* Continue the context from the given snapshot of parent thread.
*
* @param snapshot from {@link #capture()} in the parent thread.
* @see {@link AbstractTracerContext#continued(ContextSnapshot)}
*/
@Override
public void continued(ContextSnapshot snapshot) {
this.segment.ref(new TraceSegmentRef(snapshot));
this.segment.relatedGlobalTraces(snapshot.getDistributedTraceIds());
}
/**
* @return the first global trace id.
*/
@Override
public String getGlobalTraceId() {
return segment.getRelatedGlobalTraces().get(0).get();
}
/**
* Create an entry span
*
* @param operationName most likely a service name
* @return span instance.
* @see {@link EntrySpan}
*/
@Override
public AbstractSpan createEntrySpan(final String operationName) {
AbstractTracingSpan entrySpan;
......@@ -129,6 +202,13 @@ public class TracingContext implements AbstractTracerContext {
}
}
/**
* Create a local span
*
* @param operationName most likely a local method signature, or business name.
* @return the span represents a local logic block.
* @see {@link LocalSpan}
*/
@Override
public AbstractSpan createLocalSpan(final String operationName) {
AbstractTracingSpan parentSpan = peek();
......@@ -150,6 +230,14 @@ public class TracingContext implements AbstractTracerContext {
return push(span);
}
/**
* Create an exit span
*
* @param operationName most likely a service name of remote
* @param remotePeer the network id(ip:port, hostname:port or ip1:port1,ip2,port, etc.)
* @return the span represent an exit point of this segment.
* @see {@link ExitSpan}
*/
@Override
public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
AbstractTracingSpan exitSpan;
......@@ -191,6 +279,9 @@ public class TracingContext implements AbstractTracerContext {
return exitSpan;
}
/**
* @return the active span of current context, the top element of {@link #activeSpanStack}
*/
@Override
public AbstractTracingSpan activeSpan() {
AbstractTracingSpan span = peek();
......@@ -200,6 +291,12 @@ public class TracingContext implements AbstractTracerContext {
return span;
}
/**
* Stop the given span, if and only if this one is the top element of {@link #activeSpanStack}.
* Because the tracing core must make sure the span must match in a stack module, like any program did.
*
* @param span to finish
*/
@Override
public void stopSpan(AbstractSpan span) {
AbstractTracingSpan lastSpan = peek();
......@@ -236,12 +333,10 @@ public class TracingContext implements AbstractTracerContext {
TracingContext.ListenerManager.notifyFinish(finishedSegment);
}
@Override
public void dispose() {
this.segment = null;
this.activeSpanStack = null;
}
/**
* The <code>ListenerManager</code> represents an event notify for every registered listener, which are notified
* when the <cdoe>TracingContext</cdoe> finished, and {@link #segment} is ready for further process.
*/
public static class ListenerManager {
private static List<TracingContextListener> LISTENERS = new LinkedList<TracingContextListener>();
......
......@@ -2,19 +2,6 @@ package org.skywalking.apm.agent.core.context;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
/**
* {@link TracingContextListener} is a status change listener of {@link TracerContext}.
* Add a {@link TracingContextListener} implementation through {@link TracerContext}
* <p>
* All this class's methods will be called concurrently. Make sure all implementations are thread-safe.
* <p>
* Created by wusheng on 2017/2/17.
*/
public interface TracingContextListener {
/**
* This method will be called, after the {@link TracerContext#finish()}
*
* @param traceSegment finished {@link TraceSegment}
*/
void afterFinished(TraceSegment traceSegment);
}
package org.skywalking.apm.agent.core.context.trace;
import java.util.Map;
import org.skywalking.apm.network.trace.component.Component;
/**
......@@ -11,10 +12,19 @@ import org.skywalking.apm.network.trace.component.Component;
public interface AbstractSpan {
/**
* Set the component id, which defines in {@link org.skywalking.apm.network.trace.component.ComponentsDefine}
*
* @param component
* @return the span for chaining.
*/
AbstractSpan setComponent(Component component);
/**
* Only use this method in explicit instrumentation, like opentracing-skywalking-bridge.
* It it higher recommend don't use this for performance consideration.
*
* @param componentName
* @return the span for chaining.
*/
AbstractSpan setComponent(String componentName);
AbstractSpan setLayer(SpanLayer layer);
......@@ -42,12 +52,23 @@ public interface AbstractSpan {
boolean isEntry();
/**
* @return true if the actual span is a local span.
* @return true if the actual span is an exit span.
*/
boolean isLocal();
boolean isExit();
/**
* @return true if the actual span is an exit span.
* Record an event at a specific timestamp.
*
* @param timestamp The explicit timestamp for the log record.
* @param event the events
* @return the Span, for chaining
*/
boolean isExit();
AbstractSpan log(long timestamp, Map<String, ?> event);
/**
* Sets the string name for the logical operation this span represents.
*
* @return this Span instance, for chaining
*/
AbstractSpan setOperationName(String operationName);
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.context.trace;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.skywalking.apm.agent.core.context.util.ThrowableTransformer;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
......@@ -86,7 +87,7 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
return true;
}
public AbstractSpan start() {
public AbstractTracingSpan start() {
this.startTime = System.currentTimeMillis();
return this;
}
......@@ -97,7 +98,8 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
* @param t any subclass of {@link Throwable}, which occurs in this span.
* @return the Span, for chaining
*/
public AbstractSpan log(Throwable t) {
@Override
public AbstractTracingSpan log(Throwable t) {
if (logs == null) {
logs = new LinkedList<LogDataEntity>();
}
......@@ -106,22 +108,64 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
.add(new KeyValuePair("error.kind", t.getClass().getName()))
.add(new KeyValuePair("message", t.getMessage()))
.add(new KeyValuePair("stack", ThrowableTransformer.INSTANCE.convert2String(t, 4000)))
.build());
.build(System.currentTimeMillis()));
return this;
}
/**
* Record a common log with multi fields, for supporting opentracing-java
*
* @param fields
* @return the Span, for chaining
*/
@Override
public AbstractTracingSpan log(long timestampMicroseconds, Map<String, ?> fields) {
if (logs == null) {
logs = new LinkedList<LogDataEntity>();
}
LogDataEntity.Builder builder = new LogDataEntity.Builder();
for (Map.Entry<String, ?> entry : fields.entrySet()) {
builder.add(new KeyValuePair(entry.getKey(), entry.getValue().toString()));
}
logs.add(builder.build(timestampMicroseconds));
return this;
}
public AbstractSpan errorOccurred() {
/**
* In the scope of this span tracing context, error occurred,
* in auto-instrumentation mechanism, almost means throw an exception.
*
* @return span instance, for chaining.
*/
@Override
public AbstractTracingSpan errorOccurred() {
this.errorOccurred = true;
return this;
}
/**
* Set the operation name, just because these is not compress dictionary value for this name.
* Use the entire string temporarily, the agent will compress this name in async mode.
*
* @param operationName
* @return span instance, for chaining.
*/
@Override
public AbstractTracingSpan setOperationName(String operationName) {
this.operationName = operationName;
this.operationId = DictionaryUtil.nullValue();
return this;
}
/**
* Set the operation id, which compress by the name.
*
* @param operationId
* @return span instance, for chaining.
*/
public AbstractTracingSpan setOperationId(int operationId) {
this.operationId = operationId;
this.operationName = null;
return this;
}
......@@ -138,19 +182,33 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
}
@Override
public AbstractSpan setLayer(SpanLayer layer) {
public AbstractTracingSpan setLayer(SpanLayer layer) {
this.layer = layer;
return this;
}
/**
* Set the component of this span, with internal supported.
* Highly recommend to use this way.
*
* @param component
* @return span instance, for chaining.
*/
@Override
public AbstractSpan setComponent(Component component) {
public AbstractTracingSpan setComponent(Component component) {
this.componentId = component.getId();
return this;
}
/**
* Set the component name.
* By using this, cost more memory and network.
*
* @param componentName
* @return span instance, for chaining.
*/
@Override
public AbstractSpan setComponent(String componentName) {
public AbstractTracingSpan setComponent(String componentName) {
this.componentName = componentName;
return this;
}
......@@ -162,17 +220,27 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
spanBuilder.setParentSpanId(parentSpanId);
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(endTime);
if (operationId == DictionaryUtil.nullValue()) {
if (operationId != DictionaryUtil.nullValue()) {
spanBuilder.setOperationNameId(operationId);
} else {
spanBuilder.setOperationName(operationName);
}
spanBuilder.setSpanType(SpanType.Entry);
spanBuilder.setSpanLayerValue(this.layer.getCode());
if (componentId == DictionaryUtil.nullValue()) {
if (isEntry()) {
spanBuilder.setSpanType(SpanType.Entry);
} else if (isExit()) {
spanBuilder.setSpanType(SpanType.Exit);
} else {
spanBuilder.setSpanType(SpanType.Local);
}
if (this.layer != null) {
spanBuilder.setSpanLayerValue(this.layer.getCode());
}
if (componentId != DictionaryUtil.nullValue()) {
spanBuilder.setComponentId(componentId);
} else {
spanBuilder.setComponent(componentName);
if (componentName != null) {
spanBuilder.setComponent(componentName);
}
}
spanBuilder.setIsError(errorOccurred);
if (this.tags != null) {
......
......@@ -6,11 +6,14 @@ import org.skywalking.apm.network.trace.component.Component;
/**
* The <code>EntrySpan</code> represents a service provider point, such as Tomcat server entrance.
*
* It is a start point of {@link TraceSegment}, even in a complex application, there maybe have multi entry point,
* It is a start point of {@link TraceSegment}, even in a complex application, there maybe have multi-layer entry point,
* the <code>EntrySpan</code> only represents the first one.
*
* But with the last <code>EntrySpan</code>'s tags and logs, which have more details about a service provider.
*
* Such as: Tomcat Embed -> Dubbox
* The <code>EntrySpan</code> represents the Dubbox span.
*
* @author wusheng
*/
public class EntrySpan extends AbstractTracingSpan {
......@@ -50,7 +53,7 @@ public class EntrySpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setLayer(SpanLayer layer) {
public AbstractTracingSpan setLayer(SpanLayer layer) {
if (stackDepth == currentMaxDepth) {
return super.setLayer(layer);
} else {
......@@ -59,7 +62,7 @@ public class EntrySpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setComponent(Component component) {
public AbstractTracingSpan setComponent(Component component) {
if (stackDepth == currentMaxDepth) {
return super.setComponent(component);
} else {
......@@ -68,7 +71,7 @@ public class EntrySpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setComponent(String componentName) {
public AbstractTracingSpan setComponent(String componentName) {
if (stackDepth == currentMaxDepth) {
return super.setComponent(componentName);
} else {
......@@ -85,6 +88,24 @@ public class EntrySpan extends AbstractTracingSpan {
}
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
if (stackDepth == currentMaxDepth) {
return super.setOperationName(operationName);
} else {
return this;
}
}
@Override
public AbstractTracingSpan setOperationId(int operationId) {
if (stackDepth == currentMaxDepth) {
return super.setOperationId(operationId);
} else {
return this;
}
}
@Override
public EntrySpan log(Throwable t) {
super.log(t);
......@@ -95,10 +116,6 @@ public class EntrySpan extends AbstractTracingSpan {
return true;
}
@Override public boolean isLocal() {
return false;
}
@Override public boolean isExit() {
return false;
}
......
......@@ -5,13 +5,16 @@ import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.trace.component.Component;
/**
* The <code>ExitSpan</code> represents a service consumer point, such as Feign, Okhttp discovery for a Http service.
* The <code>ExitSpan</code> represents a service consumer point, such as Feign, Okhttp client for a Http service.
*
* It is an exit point or a leaf span(our old name) of trace tree.
* In a single rpc call, because of a combination of discovery libs, there maybe contain multi exit point.
* In a single rpc call, because of a combination of discovery libs, there maybe contain multi-layer exit point:
*
* The <code>ExitSpan</code> only presents the first one.
*
* Such as: Dubbox -> Apache Httpcomponent -> ....(Remote)
* The <code>ExitSpan</code> represents the Dubbox span, and ignore the httpcomponent span's info.
*
* @author wusheng
*/
public class ExitSpan extends AbstractTracingSpan {
......@@ -69,7 +72,7 @@ public class ExitSpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setLayer(SpanLayer layer) {
public AbstractTracingSpan setLayer(SpanLayer layer) {
if (stackDepth == 1) {
return super.setLayer(layer);
} else {
......@@ -78,7 +81,7 @@ public class ExitSpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setComponent(Component component) {
public AbstractTracingSpan setComponent(Component component) {
if (stackDepth == 1) {
return super.setComponent(component);
} else {
......@@ -87,7 +90,7 @@ public class ExitSpan extends AbstractTracingSpan {
}
@Override
public AbstractSpan setComponent(String componentName) {
public AbstractTracingSpan setComponent(String componentName) {
if (stackDepth == 1) {
return super.setComponent(componentName);
} else {
......@@ -105,14 +108,34 @@ public class ExitSpan extends AbstractTracingSpan {
@Override public SpanObject.Builder transform() {
SpanObject.Builder spanBuilder = super.transform();
if (peerId == DictionaryUtil.nullValue()) {
if (peerId != DictionaryUtil.nullValue()) {
spanBuilder.setPeerId(peerId);
} else {
spanBuilder.setPeer(peer);
if (peer != null) {
spanBuilder.setPeer(peer);
}
}
return spanBuilder;
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
if (stackDepth == 1) {
return super.setOperationName(operationName);
} else {
return this;
}
}
@Override
public AbstractTracingSpan setOperationId(int operationId) {
if (stackDepth == 1) {
return super.setOperationId(operationId);
} else {
return this;
}
}
public int getPeerId() {
return peerId;
}
......@@ -125,10 +148,6 @@ public class ExitSpan extends AbstractTracingSpan {
return false;
}
@Override public boolean isLocal() {
return false;
}
@Override public boolean isExit() {
return true;
}
......
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.network.proto.SpanObject;
/**
* The <code>LocalSpan</code> represents a normal tracing point, such as a local method.
*
......@@ -29,18 +27,10 @@ public class LocalSpan extends AbstractTracingSpan {
return this;
}
@Override public SpanObject.Builder transform() {
return null;
}
@Override public boolean isEntry() {
return false;
}
@Override public boolean isLocal() {
return true;
}
@Override public boolean isExit() {
return false;
}
......
......@@ -12,9 +12,11 @@ import org.skywalking.apm.network.proto.LogMessage;
* @author wusheng
*/
public class LogDataEntity {
protected List<KeyValuePair> logs;
private long timestamp = 0;
private List<KeyValuePair> logs;
private LogDataEntity(List<KeyValuePair> logs) {
private LogDataEntity(long timestamp, List<KeyValuePair> logs) {
this.timestamp = timestamp;
this.logs = logs;
}
......@@ -36,8 +38,8 @@ public class LogDataEntity {
return this;
}
public LogDataEntity build() {
return new LogDataEntity(logs);
public LogDataEntity build(long timestamp) {
return new LogDataEntity(timestamp, logs);
}
}
......@@ -46,6 +48,7 @@ public class LogDataEntity {
for (KeyValuePair log : logs) {
logMessageBuilder.addData(log.transform());
}
logMessageBuilder.setTime(timestamp);
return logMessageBuilder.build();
}
}
package org.skywalking.apm.agent.core.context.trace;
import java.util.Map;
import org.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.skywalking.apm.network.trace.component.Component;
/**
* The <code>NoopSpan</code> represents a span implementation without any actual operation.
* This span implementation is for {@link IgnoredTracerContext}.
* This span implementation is for {@link IgnoredTracerContext},
* for keeping the memory and gc cost as low as possible.
*
* @author wusheng
*/
......@@ -13,7 +15,6 @@ public class NoopSpan implements AbstractSpan {
public NoopSpan() {
}
@Override
public AbstractSpan log(Throwable t) {
return this;
......@@ -23,7 +24,7 @@ public class NoopSpan implements AbstractSpan {
return null;
}
public void finish(){
public void finish() {
}
......@@ -48,11 +49,15 @@ public class NoopSpan implements AbstractSpan {
return false;
}
@Override public boolean isLocal() {
@Override public boolean isExit() {
return false;
}
@Override public boolean isExit() {
return false;
@Override public AbstractSpan log(long timestamp, Map<String, ?> event) {
return this;
}
@Override public AbstractSpan setOperationName(String operationName) {
return this;
}
}
......@@ -2,14 +2,11 @@ package org.skywalking.apm.agent.core.context.trace;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceIds;
import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator;
import org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId;
import org.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.TraceSegmentObject;
......@@ -51,14 +48,6 @@ public class TraceSegment {
*/
private List<AbstractTracingSpan> spans;
/**
* The <code>applicationId</code> represents a name of current application/JVM and indicates which is business
* role in the cluster.
* <p>
* e.g. account_app, billing_app
*/
private int applicationId;
/**
* The <code>relatedGlobalTraces</code> represent a set of all related trace. Most time it contains only one
* element, because only one parent {@link TraceSegment} exists, but, in batch scenario, the num becomes greater
......@@ -82,20 +71,6 @@ public class TraceSegment {
* and generate a new segment id.
*/
public TraceSegment() {
this.applicationId = (Integer)DictionaryManager.findApplicationCodeSection()
.find(Config.Agent.APPLICATION_CODE)
.doInCondition(
new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int applicationId) {
return applicationId;
}
},
new PossibleFound.NotFoundAndObtain() {
@Override public Object doProcess() {
throw new IllegalStateException("Application id must not NULL.");
}
}
);
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.spans = new LinkedList<AbstractTracingSpan>();
this.relatedGlobalTraces = new DistributedTraceIds();
......@@ -154,7 +129,7 @@ public class TraceSegment {
}
public int getApplicationId() {
return applicationId;
return RemoteDownstreamConfig.Agent.APPLICATION_ID;
}
public boolean hasRef() {
......@@ -206,8 +181,8 @@ public class TraceSegment {
for (AbstractTracingSpan span : this.spans) {
traceSegmentBuilder.addSpans(span.transform());
}
traceSegmentBuilder.setApplicationId(this.applicationId);
traceSegmentBuilder.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_ID);
traceSegmentBuilder.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID);
traceSegmentBuilder.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID);
upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());
return upstreamBuilder.build();
......@@ -219,7 +194,6 @@ public class TraceSegment {
"traceSegmentId='" + traceSegmentId + '\'' +
", refs=" + refs +
", spans=" + spans +
", applicationId='" + applicationId + '\'' +
", relatedGlobalTraces=" + relatedGlobalTraces +
'}';
}
......
package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextSnapshot;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.RefType;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
......@@ -11,11 +13,13 @@ import org.skywalking.apm.network.proto.TraceSegmentReference;
* Created by wusheng on 2017/2/17.
*/
public class TraceSegmentRef {
private SegmentRefType type;
private String traceSegmentId;
private int spanId = -1;
private int applicationId;
private int applicationInstanceId;
private String peerHost;
......@@ -31,9 +35,10 @@ public class TraceSegmentRef {
* @param carrier the valid cross-process propagation format.
*/
public TraceSegmentRef(ContextCarrier carrier) {
this.type = SegmentRefType.CROSS_PROCESS;
this.traceSegmentId = carrier.getTraceSegmentId();
this.spanId = carrier.getSpanId();
this.applicationId = carrier.getApplicationId();
this.applicationInstanceId = carrier.getApplicationInstanceId();
String host = carrier.getPeerHost();
if (host.charAt(0) == '#') {
this.peerHost = host.substring(1);
......@@ -48,6 +53,12 @@ public class TraceSegmentRef {
}
}
public TraceSegmentRef(ContextSnapshot snapshot) {
this.type = SegmentRefType.CROSS_THREAD;
this.traceSegmentId = snapshot.getTraceSegmentId();
this.spanId = snapshot.getSpanId();
}
public String getOperationName() {
return operationName;
}
......@@ -58,19 +69,25 @@ public class TraceSegmentRef {
public TraceSegmentReference transform() {
TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder();
refBuilder.setParentTraceSegmentId(traceSegmentId);
refBuilder.setParentSpanId(spanId);
refBuilder.setParentApplicationId(applicationId);
if (peerId == DictionaryUtil.nullValue()) {
refBuilder.setNetworkAddress(peerHost);
} else {
refBuilder.setNetworkAddressId(peerId);
}
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
if (SegmentRefType.CROSS_PROCESS.equals(type)) {
refBuilder.setRefType(RefType.CrossProcess);
refBuilder.setParentApplicationInstanceId(applicationInstanceId);
if (peerId == DictionaryUtil.nullValue()) {
refBuilder.setNetworkAddress(peerHost);
} else {
refBuilder.setNetworkAddressId(peerId);
}
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
} else {
refBuilder.setEntryServiceId(operationId);
}
} else {
refBuilder.setEntryServiceId(operationId);
refBuilder.setRefType(RefType.CrossThread);
}
refBuilder.setParentTraceSegmentId(traceSegmentId);
refBuilder.setParentSpanId(spanId);
return refBuilder.build();
}
......@@ -94,4 +111,9 @@ public class TraceSegmentRef {
result = 31 * result + spanId;
return result;
}
public enum SegmentRefType {
CROSS_PROCESS,
CROSS_THREAD
}
}
......@@ -4,6 +4,10 @@ import io.netty.util.internal.ConcurrentSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
/**
* Map of application id to application code, which is from the collector side.
......@@ -13,15 +17,29 @@ import java.util.concurrent.ConcurrentHashMap;
public enum ApplicationDictionary {
INSTANCE;
private Map<String, Integer> applicationDictionary = new ConcurrentHashMap<String, Integer>();
private Set<String> unRegisterApplication = new ConcurrentSet<String>();
private Set<String> unRegisterApplications = new ConcurrentSet<String>();
public PossibleFound find(String applicationCode) {
Integer applicationId = applicationDictionary.get(applicationCode);
if (applicationId != null) {
return new Found(applicationId);
} else {
unRegisterApplication.add(applicationCode);
unRegisterApplications.add(applicationCode);
return new NotFound();
}
}
public void syncRemoteDictionary(
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addAllApplicationCode(unRegisterApplications).build());
if (applicationMapping.getApplicationCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationList()) {
unRegisterApplications.remove(keyWithIntegerValue.getKey());
applicationDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
}
}
}
}
......@@ -4,6 +4,11 @@ import io.netty.util.internal.ConcurrentSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
/**
* @author wusheng
......@@ -11,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
public enum OperationNameDictionary {
INSTANCE;
private Map<OperationNameKey, Integer> operationNameDictionary = new ConcurrentHashMap<OperationNameKey, Integer>();
private Set<OperationNameKey> unRegisterOperationName = new ConcurrentSet<OperationNameKey>();
private Set<OperationNameKey> unRegisterOperationNames = new ConcurrentSet<OperationNameKey>();
public PossibleFound find(int applicationId, String operationName) {
OperationNameKey key = new OperationNameKey(applicationId, operationName);
......@@ -19,11 +24,35 @@ public enum OperationNameDictionary {
if (operationId != null) {
return new Found(applicationId);
} else {
unRegisterOperationName.add(key);
unRegisterOperationNames.add(key);
return new NotFound();
}
}
public void syncRemoteDictionary(
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub) {
if (unRegisterOperationNames.size() > 0) {
ServiceNameCollection.Builder builder = ServiceNameCollection.newBuilder();
for (OperationNameKey operationNameKey : unRegisterOperationNames) {
ServiceNameElement serviceNameElement = ServiceNameElement.newBuilder()
.setApplicationId(operationNameKey.getApplicationId())
.setServiceName(operationNameKey.getOperationName())
.build();
builder.addElements(serviceNameElement);
}
ServiceNameMappingCollection serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(builder.build());
if (serviceNameMappingCollection.getElementsCount() > 0) {
for (ServiceNameMappingElement serviceNameMappingElement : serviceNameMappingCollection.getElementsList()) {
OperationNameKey key = new OperationNameKey(
serviceNameMappingElement.getElement().getApplicationId(),
serviceNameMappingElement.getElement().getServiceName());
unRegisterOperationNames.remove(key);
operationNameDictionary.put(key, serviceNameMappingElement.getServiceId());
}
}
}
}
private class OperationNameKey {
private int applicationId;
private String operationName;
......@@ -33,6 +62,14 @@ public enum OperationNameDictionary {
this.operationName = operationName;
}
public int getApplicationId() {
return applicationId;
}
public String getOperationName() {
return operationName;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
......
package org.skywalking.apm.agent.core.dictionary;
/**
* The <code>PossibleFound</code> represents a value, which may exist or not.
* The <code>PossibleFound</code> represents a value, which may needEnhance or not.
*
* @author wusheng
*/
......
package org.skywalking.apm.agent.core.jvm;
import io.grpc.ManagedChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.jvm.cpu.CPUProvider;
import org.skywalking.apm.agent.core.jvm.gc.GCProvider;
import org.skywalking.apm.agent.core.jvm.memory.MemoryProvider;
import org.skywalking.apm.agent.core.jvm.memorypool.MemoryPoolProvider;
import org.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.JVMMetric;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/**
* The <code>JVMService</code> represents a timer,
* which collectors JVM cpu, memory, memorypool and gc info,
* and send the collected info to Collector through the channel provided by {@link GRPCChannelManager}
*
* @author wusheng
*/
public class JVMService implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(JVMService.class);
private ReentrantLock lock = new ReentrantLock();
private volatile LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>();
private SimpleDateFormat sdf = new SimpleDateFormat("ss");
private volatile ScheduledFuture<?> scheduledFuture;
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private volatile int lastBlockIdx = -1;
private Sender sender;
@Override
public void beforeBoot() throws Throwable {
sender = new Sender();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);
}
@Override
public void boot() throws Throwable {
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
scheduledFuture = service.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
collectMetricFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
}
@Override
......@@ -38,16 +67,76 @@ public class JVMService implements BootService, Runnable {
@Override
public void run() {
long currentTimeMillis = System.currentTimeMillis();
Date day = new Date(currentTimeMillis);
String second = sdf.format(day);
if (Integer.parseInt(second) % 15 == 0) {
JVMMetric.Builder JVMBuilder = JVMMetric.newBuilder();
JVMBuilder.setTime(currentTimeMillis);
JVMBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
JVMBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
JVMBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricList());
if (RemoteDownstreamConfig.Agent.APPLICATION_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()
) {
long currentTimeMillis = System.currentTimeMillis();
Date day = new Date(currentTimeMillis);
String second = sdf.format(day);
int blockIndex = Integer.parseInt(second) / 15;
if (blockIndex != lastBlockIdx) {
lastBlockIdx = blockIndex;
try {
JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();
jvmBuilder.setTime(currentTimeMillis);
jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricList());
jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());
JVMMetric jvmMetric = jvmBuilder.build();
lock.lock();
try {
buffer.add(jvmMetric);
while (buffer.size() > 4) {
buffer.removeFirst();
}
} finally {
lock.unlock();
}
} catch (Exception e) {
logger.error(e, "Collect JVM info fail.");
}
}
}
}
private class Sender implements Runnable, GRPCChannelListener {
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub = null;
@Override
public void run() {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()
) {
if (status == GRPCChannelStatus.CONNECTED) {
try {
JVMMetrics.Builder builder = JVMMetrics.newBuilder();
lock.lock();
try {
builder.addAllMetrics(buffer);
buffer.clear();
} finally {
lock.unlock();
}
builder.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID);
stub.collect(builder.build());
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
}
}
}
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
}
this.status = status;
}
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class CMSGCModule extends GCModule {
public CMSGCModule(List<GarbageCollectorMXBean> beans) {
super(beans);
}
@Override protected String getOldGCName() {
return "ConcurrentMarkSweep";
}
@Override protected String getNewGCName() {
return "ParNew";
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class G1GCModule extends GCModule {
public G1GCModule(List<GarbageCollectorMXBean> beans) {
super(beans);
}
@Override protected String getOldGCName() {
return "G1 Old Generation";
}
@Override protected String getNewGCName() {
return "G1 Young Generation";
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.util.List;
import org.skywalking.apm.network.proto.GC;
/**
* @author wusheng
*/
public interface GCMetricAccessor {
List<GC> getGCList();
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.GCPhrase;
/**
* @author wusheng
*/
public abstract class GCModule implements GCMetricAccessor {
private List<GarbageCollectorMXBean> beans;
public GCModule(List<GarbageCollectorMXBean> beans) {
this.beans = beans;
}
@Override
public List<GC> getGCList() {
List<GC> gcList = new LinkedList<GC>();
for (GarbageCollectorMXBean bean : beans) {
String name = bean.getName();
GCPhrase phrase;
if (name.equals(getNewGCName())) {
phrase = GCPhrase.NEW;
} else if (name.equals(getOldGCName())) {
phrase = GCPhrase.OLD;
} else {
continue;
}
gcList.add(
GC.newBuilder().setPhrase(phrase)
.setCount(bean.getCollectionCount())
.setTime(bean.getCollectionTime())
.build()
);
}
return gcList;
}
protected abstract String getOldGCName();
protected abstract String getNewGCName();
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;
import org.skywalking.apm.network.proto.GC;
/**
* @author wusheng
*/
public enum GCProvider {
INSTANCE;
private GCMetricAccessor metricAccessor;
private List<GarbageCollectorMXBean> beans;
GCProvider() {
beans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean bean : beans) {
String name = bean.getName();
GCMetricAccessor accessor = findByBeanName(name);
if (accessor != null) {
metricAccessor = accessor;
break;
}
}
this.metricAccessor = new UnknowGC();
}
public List<GC> getGCList() {
return metricAccessor.getGCList();
}
private GCMetricAccessor findByBeanName(String name) {
if (name.indexOf("PS") > -1) {
//Parallel (Old) collector ( -XX:+UseParallelOldGC )
return new ParallelGCModule(beans);
} else if (name.indexOf("ConcurrentMarkSweep") > -1) {
// CMS collector ( -XX:+UseConcMarkSweepGC )
return new CMSGCModule(beans);
} else if (name.indexOf("G1") > -1) {
// G1 collector ( -XX:+UseG1GC )
return new G1GCModule(beans);
} else if (name.equals("MarkSweepCompact")) {
// Serial collector ( -XX:+UseSerialGC )
return new SerialGCModule(beans);
} else {
// Unknown
return null;
}
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class ParallelGCModule extends GCModule {
public ParallelGCModule(List<GarbageCollectorMXBean> beans) {
super(beans);
}
@Override protected String getOldGCName() {
return "PS MarkSweep";
}
@Override protected String getNewGCName() {
return "PS Scavenge";
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.lang.management.GarbageCollectorMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class SerialGCModule extends GCModule {
public SerialGCModule(List<GarbageCollectorMXBean> beans) {
super(beans);
}
@Override protected String getOldGCName() {
return "MarkSweepCompact";
}
@Override protected String getNewGCName() {
return "Copy";
}
}
package org.skywalking.apm.agent.core.jvm.gc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.GCPhrase;
/**
* @author wusheng
*/
public class UnknowGC implements GCMetricAccessor {
@Override
public List<GC> getGCList() {
List<GC> gcList = new LinkedList<GC>();
gcList.add(GC.newBuilder().setPhrase(GCPhrase.NEW).build());
gcList.add(GC.newBuilder().setPhrase(GCPhrase.OLD).build());
return gcList;
}
}
package org.skywalking.apm.agent.core.jvm.memorypool;
import java.lang.management.MemoryPoolMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class CMSCollectorModule extends MemoryPoolModule {
public CMSCollectorModule(List<MemoryPoolMXBean> beans) {
super(beans);
}
@Override protected String getPermName() {
return "CMS Perm Gen";
}
@Override protected String getCodeCacheName() {
return "Code Cache";
}
@Override protected String getEdenName() {
return "Par Eden Space";
}
@Override protected String getOldName() {
return "CMS Old Gen";
}
@Override protected String getSurvivorName() {
return "Par Survivor Space";
}
@Override protected String getMetaspaceName() {
return "Metaspace";
}
}
package org.skywalking.apm.agent.core.jvm.memorypool;
import java.lang.management.MemoryPoolMXBean;
import java.util.List;
/**
* @author wusheng
*/
public class G1CollectorModule extends MemoryPoolModule {
public G1CollectorModule(List<MemoryPoolMXBean> beans) {
super(beans);
}
@Override protected String getPermName() {
return "G1 Perm Gen";
}
@Override protected String getCodeCacheName() {
return "Code Cache";
}
@Override protected String getEdenName() {
return "G1 Eden Space";
}
@Override protected String getOldName() {
return "G1 Old Gen";
}
@Override protected String getSurvivorName() {
return "G1 Survivor Space";
}
@Override protected String getMetaspaceName() {
return "Metaspace";
}
}
......@@ -35,17 +35,17 @@ public abstract class MemoryPoolModule implements MemoryPoolMetricAccessor {
type = PoolType.METASPACE_USAGE;
} else if (name.equals(getPermName())) {
type = PoolType.PERMGEN_USAGE;
} else {
continue;
}
if (type != null) {
MemoryUsage usage = bean.getUsage();
poolList.add(MemoryPool.newBuilder().setType(type)
.setInit(usage.getInit())
.setMax(usage.getMax())
.setCommited(usage.getCommitted())
.setUsed(usage.getUsed())
.build());
}
MemoryUsage usage = bean.getUsage();
poolList.add(MemoryPool.newBuilder().setType(type)
.setInit(usage.getInit())
.setMax(usage.getMax())
.setCommited(usage.getCommitted())
.setUsed(usage.getUsed())
.build());
}
return poolList;
}
......
......@@ -39,10 +39,10 @@ public enum MemoryPoolProvider {
return new ParallelCollectorModule(beans);
} else if (name.indexOf("CMS") > -1) {
// CMS collector ( -XX:+UseConcMarkSweepGC )
return null;
return new CMSCollectorModule(beans);
} else if (name.indexOf("G1") > -1) {
// G1 collector ( -XX:+UseG1GC )
return null;
return new G1CollectorModule(beans);
} else if (name.equals("Survivor Space")) {
// Serial collector ( -XX:+UseSerialGC )
return new SerialCollectorModule(beans);
......
......@@ -6,7 +6,7 @@ import java.util.List;
/**
* @author wusheng
*/
public class ParallelCollectorModule extends MemoryPoolModule{
public class ParallelCollectorModule extends MemoryPoolModule {
public ParallelCollectorModule(List<MemoryPoolMXBean> beans) {
super(beans);
......
......@@ -18,6 +18,6 @@ public class UnknownMemoryPool implements MemoryPoolMetricAccessor {
poolList.add(MemoryPool.newBuilder().setType(PoolType.SURVIVOR_USAGE).build());
poolList.add(MemoryPool.newBuilder().setType(PoolType.PERMGEN_USAGE).build());
poolList.add(MemoryPool.newBuilder().setType(PoolType.METASPACE_USAGE).build());
return poolList;
return new LinkedList<MemoryPool>();
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.plugin;
import net.bytebuddy.dynamic.DynamicType;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.skywalking.apm.util.StringUtil;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
......@@ -63,11 +64,11 @@ public abstract class AbstractClassEnhancePluginDefine {
DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader) throws PluginException;
/**
* Define the classname of target class.
* Define the {@link ClassMatch} for filtering class.
*
* @return class full name.
* @return {@link ClassMatch}
*/
protected abstract String enhanceClassName();
protected abstract ClassMatch enhanceClass();
/**
* Witness classname list. Why need witness classname? Let's see like this: A library existed two released versions
......
......@@ -27,7 +27,7 @@ public class PluginBootstrap {
List<URL> resources = resolver.getResources();
if (resources == null || resources.size() == 0) {
logger.info("no plugin files (skywalking-plugin.properties) found, continue to start application.");
logger.info("no plugin files (skywalking-plugin.def) found, continue to start application.");
return new ArrayList<AbstractClassEnhancePluginDefine>();
}
......@@ -35,7 +35,7 @@ public class PluginBootstrap {
try {
PluginCfg.INSTANCE.load(pluginUrl.openStream());
} catch (Throwable t) {
logger.error(t, "plugin [{}] init failure.", pluginUrl);
logger.error(t, "plugin file [{}] init failure.", pluginUrl);
}
}
......
package org.skywalking.apm.agent.core.plugin;
import org.skywalking.apm.agent.core.plugin.exception.IllegalPluginDefineException;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.agent.core.plugin.exception.IllegalPluginDefineException;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
public enum PluginCfg {
INSTANCE;
......@@ -31,7 +32,7 @@ public enum PluginCfg {
pluginClassList.add(plugin);
}
} catch (IllegalPluginDefineException e) {
logger.error("Failed to format plugin define.", e);
logger.error(e,"Failed to format plugin({}) define.", pluginDefine);
}
}
} finally {
......
......@@ -4,43 +4,75 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import net.bytebuddy.description.NamedElement;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.skywalking.apm.agent.core.plugin.bytebuddy.AbstractJunction;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.skywalking.apm.agent.core.plugin.match.IndirectMatch;
import org.skywalking.apm.agent.core.plugin.match.NameMatch;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* The <code>PluginFinder</code> represents a finder , which assist to find the one
* from the given {@link AbstractClassEnhancePluginDefine} list, by name match.
* from the given {@link AbstractClassEnhancePluginDefine} list.
*
* @author wusheng
*/
public class PluginFinder {
private final Map<String, LinkedList<AbstractClassEnhancePluginDefine>> pluginDefineMap = new HashMap<String, LinkedList<AbstractClassEnhancePluginDefine>>();
private final Map<String, AbstractClassEnhancePluginDefine> nameMatchDefine = new HashMap<String, AbstractClassEnhancePluginDefine>();
private final List<AbstractClassEnhancePluginDefine> signatureMatchDefine = new LinkedList<AbstractClassEnhancePluginDefine>();
public PluginFinder(List<AbstractClassEnhancePluginDefine> plugins) {
for (AbstractClassEnhancePluginDefine plugin : plugins) {
String enhanceClassName = plugin.enhanceClassName();
ClassMatch match = plugin.enhanceClass();
if (enhanceClassName == null) {
if (match == null) {
continue;
}
LinkedList<AbstractClassEnhancePluginDefine> pluginDefinesWithSameTarget = pluginDefineMap.get(enhanceClassName);
if (pluginDefinesWithSameTarget == null) {
pluginDefinesWithSameTarget = new LinkedList<AbstractClassEnhancePluginDefine>();
pluginDefineMap.put(enhanceClassName, pluginDefinesWithSameTarget);
if (match instanceof NameMatch) {
NameMatch nameMatch = (NameMatch)match;
nameMatchDefine.put(nameMatch.getClassName(), plugin);
} else {
signatureMatchDefine.add(plugin);
}
pluginDefinesWithSameTarget.add(plugin);
}
}
public List<AbstractClassEnhancePluginDefine> find(String enhanceClassName) {
if (pluginDefineMap.containsKey(enhanceClassName)) {
return pluginDefineMap.get(enhanceClassName);
public AbstractClassEnhancePluginDefine find(TypeDescription typeDescription,
ClassLoader classLoader) {
String typeName = typeDescription.getTypeName();
if (nameMatchDefine.containsKey(typeName)) {
return nameMatchDefine.get(typeName);
}
for (AbstractClassEnhancePluginDefine pluginDefine : signatureMatchDefine) {
IndirectMatch match = (IndirectMatch)pluginDefine.enhanceClass();
if (match.isMatch(typeDescription)) {
return pluginDefine;
}
}
throw new PluginException("Can not find plugin:" + enhanceClassName);
return null;
}
public boolean exist(String enhanceClassName) {
return pluginDefineMap.containsKey(enhanceClassName);
public ElementMatcher<? super TypeDescription> buildMatch() {
ElementMatcher.Junction judge = new AbstractJunction<NamedElement>() {
@Override
public boolean matches(NamedElement target) {
return nameMatchDefine.containsKey(target.getActualName());
}
};
judge = judge.and(not(isInterface()));
for (AbstractClassEnhancePluginDefine define : signatureMatchDefine) {
ClassMatch match = define.enhanceClass();
if (match instanceof IndirectMatch) {
judge = judge.or(((IndirectMatch)match).buildJunction());
}
}
return judge;
}
}
......@@ -24,10 +24,6 @@ public class PluginResourcesResolver {
try {
urls = getDefaultClassLoader().getResources("skywalking-plugin.def");
if (!urls.hasMoreElements()) {
logger.info("no plugin files (skywalking-plugin.def) found");
}
while (urls.hasMoreElements()) {
URL pluginUrl = urls.nextElement();
cfgUrlPaths.add(pluginUrl);
......
package org.skywalking.apm.agent.core.plugin;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.ClassFileLocator;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.TypeResolutionStrategy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.pool.TypePool;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
/**
* A test entrance for enhancing class.
* This should be used only in bytecode-manipulate test.
* And make sure, all classes which need to be enhanced, must not be loaded.
*
* @author wusheng
*/
public class TracingBootstrap {
private static final ILog logger = LogManager.getLogger(TracingBootstrap.class);
private TracingBootstrap() {
}
/**
* Main entrance for testing.
*
* @param args includes target classname ( which exists "public static void main(String[] args)" ) and arguments
* list.
* @throws PluginException
* @throws ClassNotFoundException
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
public static void main(String[] args)
throws PluginException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
if (args.length == 0) {
throw new RuntimeException("bootstrap failure. need args[0] to be main class.");
}
List<AbstractClassEnhancePluginDefine> plugins = null;
try {
PluginBootstrap bootstrap = new PluginBootstrap();
plugins = bootstrap.loadPlugins();
} catch (Throwable t) {
logger.error("PluginBootstrap start failure.", t);
}
for (AbstractClassEnhancePluginDefine plugin : plugins) {
String enhanceClassName = plugin.enhanceClassName();
TypePool.Resolution resolution = TypePool.Default.ofClassPath().describe(enhanceClassName);
if (!resolution.isResolved()) {
logger.error("Failed to resolve the class " + enhanceClassName, null);
continue;
}
DynamicType.Builder<?> newClassBuilder =
new ByteBuddy().rebase(resolution.resolve(), ClassFileLocator.ForClassLoader.ofClassPath());
newClassBuilder = ((AbstractClassEnhancePluginDefine)plugin).define(enhanceClassName, newClassBuilder, TracingBootstrap.class.getClassLoader());
newClassBuilder.make(new TypeResolutionStrategy.Active()).load(ClassLoader.getSystemClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded();
}
String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
Class.forName(args[0]).getMethod("main", String[].class).invoke(null, new Object[] {newArgs});
}
}
package org.skywalking.apm.agent.core.plugin.bytebuddy;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
/**
* match all methods, which inherits from {@link Object}
* <p>
* Created by wusheng on 2017/1/3.
*/
public enum AllObjectDefaultMethodsMatch implements ElementMatcher<MethodDescription> {
/**
* Stay in singleton.
*/
INSTANCE;
/**
* The matcher will be init in constructor and stay permanent.
*/
private final ElementMatcher.Junction<MethodDescription> matcher;
/**
* Init the match, include {@link Object}'s methods.
*/
AllObjectDefaultMethodsMatch() {
ElementMatcher.Junction<MethodDescription>[] allDefaultMethods = new ElementMatcher.Junction[] {
named("finalize").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("wait").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("wait").and(takesArguments(long.class, int.class)).and(ElementMatchers.<MethodDescription>isPublic()),
named("wait").and(takesArguments(long.class)).and(ElementMatchers.<MethodDescription>isPublic()),
named("equals").and(takesArguments(Object.class)).and(ElementMatchers.<MethodDescription>isPublic()),
named("toString").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("hashCode").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("getClass").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("clone").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("notify").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic()),
named("notifyAll").and(takesArguments(0)).and(ElementMatchers.<MethodDescription>isPublic())};
ElementMatcher.Junction<MethodDescription> newMatcher = null;
for (int i = 0; i < allDefaultMethods.length; i++) {
if (i == 0) {
newMatcher = allDefaultMethods[i];
} else {
newMatcher = newMatcher.or(allDefaultMethods[i]);
}
}
matcher = newMatcher;
}
/**
* @param target method description.
* @return true, if the method inherit from {@link Object}'s methods.
*/
@Override
public boolean matches(MethodDescription target) {
return matcher.matches(target);
}
}
package org.skywalking.apm.agent.core.plugin.match;
import java.util.Arrays;
import java.util.List;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.description.annotation.AnnotationList;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.isAnnotatedWith;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* Match the class by the given annotations in class.
*
* @author wusheng
*/
public class ClassAnnotationMatch implements IndirectMatch {
private String[] annotations;
private ClassAnnotationMatch(String[] annotations) {
if (annotations == null || annotations.length == 0) {
throw new IllegalArgumentException("annotations is null");
}
this.annotations = annotations;
}
@Override
public ElementMatcher.Junction buildJunction() {
ElementMatcher.Junction junction = null;
for (String annotation : annotations) {
if (junction == null) {
junction = buildEachAnnotation(annotation);
} else {
junction = junction.and(buildEachAnnotation(annotation));
}
}
junction = junction.and(not(isInterface()));
return junction;
}
@Override
public boolean isMatch(TypeDescription typeDescription) {
List<String> annotationList = Arrays.asList(annotations);
AnnotationList declaredAnnotations = typeDescription.getDeclaredAnnotations();
for (AnnotationDescription annotation : declaredAnnotations) {
annotationList.remove(annotation.getAnnotationType().getActualName());
}
if (annotationList.isEmpty()) {
return true;
}
return false;
}
private ElementMatcher.Junction buildEachAnnotation(String annotationName) {
return isAnnotatedWith(named(annotationName));
}
public static ClassMatch byClassAnnotationMatch(String[] annotations) {
return new ClassAnnotationMatch(annotations);
}
}
package org.skywalking.apm.agent.core.plugin.match;
/**
* @author wusheng
*/
public interface ClassMatch {
}
package org.skywalking.apm.agent.core.plugin.match;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* Match the class by the given super class or interfaces.
*
* @author wusheng
*/
public class HierarchyMatch implements IndirectMatch {
private String[] parentTypes;
private HierarchyMatch(String[] parentTypes) {
if (parentTypes == null || parentTypes.length == 0) {
throw new IllegalArgumentException("parentTypes is null");
}
this.parentTypes = parentTypes;
}
@Override
public ElementMatcher.Junction buildJunction() {
ElementMatcher.Junction junction = null;
for (String superTypeName : parentTypes) {
if (junction == null) {
junction = buildEachAnnotation(superTypeName);
} else {
junction = junction.and(buildEachAnnotation(superTypeName));
}
}
junction = junction.and(not(isInterface()));
return junction;
}
private ElementMatcher.Junction buildEachAnnotation(String superTypeName) {
return hasSuperType(named(superTypeName));
}
@Override
public boolean isMatch(TypeDescription typeDescription) {
return false;
}
public static ClassMatch byHierarchyMatch(String[] parentTypes) {
return new HierarchyMatch(parentTypes);
}
}
package org.skywalking.apm.agent.core.plugin.match;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/**
* All implementations can't direct match the class like {@link NameMatch} did.
*
* @author wusheng
*/
public interface IndirectMatch extends ClassMatch {
ElementMatcher.Junction buildJunction();
boolean isMatch(TypeDescription typeDescription);
}
package org.skywalking.apm.agent.core.plugin.match;
import java.util.Arrays;
import java.util.List;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.description.annotation.AnnotationList;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.declaresMethod;
import static net.bytebuddy.matcher.ElementMatchers.isAnnotatedWith;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* Match the class, which has methods with the certain annotations.
* This is a very complex match.
*
* @author wusheng
*/
public class MethodAnnotationMatch implements IndirectMatch {
private String[] annotations;
private MethodAnnotationMatch(String[] annotations) {
if (annotations == null || annotations.length == 0) {
throw new IllegalArgumentException("annotations is null");
}
this.annotations = annotations;
}
@Override
public ElementMatcher.Junction buildJunction() {
ElementMatcher.Junction junction = null;
for (String annotation : annotations) {
if (junction == null) {
junction = buildEachAnnotation(annotation);
} else {
junction = junction.and(buildEachAnnotation(annotation));
}
}
junction = declaresMethod(junction).and(not(isInterface()));
return junction;
}
@Override
public boolean isMatch(TypeDescription typeDescription) {
for (MethodDescription.InDefinedShape methodDescription : typeDescription.getDeclaredMethods()) {
List<String> annotationList = Arrays.asList(annotations);
AnnotationList declaredAnnotations = methodDescription.getDeclaredAnnotations();
for (AnnotationDescription annotation : declaredAnnotations) {
annotationList.remove(annotation.getAnnotationType().getActualName());
}
if (annotationList.isEmpty()) {
return true;
}
}
return false;
}
private ElementMatcher.Junction buildEachAnnotation(String annotationName) {
return isAnnotatedWith(named(annotationName));
}
public static ClassMatch byMethodAnnotationMatch(String[] annotations) {
return new MethodAnnotationMatch(annotations);
}
}
package org.skywalking.apm.agent.core.plugin.match;
/**
* Match the class with an explicit class name.
*
* @author wusheng
*/
public class NameMatch implements ClassMatch {
private String className;
private NameMatch(String className) {
this.className = className;
}
public String getClassName() {
return className;
}
public static NameMatch byName(String className) {
return new NameMatch(className);
}
}
package org.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.ApplicationDictionary;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/**
* @author wusheng
*/
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile boolean needRegisterRecover = false;
private volatile long lastSegmentTime = -1;
@Override
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()) {
needRegisterRecover = true;
}
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
} else {
applicationRegisterServiceBlockingStub = null;
instanceDiscoveryServiceBlockingStub = null;
serviceNameDiscoveryServiceBlockingStub = null;
}
this.status = status;
}
@Override
public void beforeBoot() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
public void afterBoot() throws Throwable {
TracingContext.ListenerManager.add(this);
}
@Override
public void run() {
if (CONNECTED.equals(status)) {
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication(0).getValue();
}
}
} else {
if (instanceDiscoveryServiceBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.register(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
}
} else {
if (needRegisterRecover) {
instanceDiscoveryServiceBlockingStub.registerRecover(ApplicationInstanceRecover.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
}
}
ApplicationDictionary.INSTANCE.syncRemoteDictionary(applicationRegisterServiceBlockingStub);
OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
}
}
}
} catch (Throwable t) {
logger.error(t, "AppAndServiceRegisterClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
@Override
public void afterFinished(TraceSegment traceSegment) {
lastSegmentTime = System.currentTimeMillis();
}
}
package org.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.Config;
/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
......@@ -15,9 +18,9 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void boot() throws Throwable {
Thread collectorClientThread = new Thread(new DiscoveryRestServiceClient(), "collectorClientThread");
collectorClientThread.setDaemon(true);
collectorClientThread.start();
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......
......@@ -5,6 +5,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
......@@ -30,22 +31,25 @@ public class DiscoveryRestServiceClient implements Runnable {
private volatile int selectedServer = -1;
public DiscoveryRestServiceClient() {
if (Config.Collector.SERVERS == null || Config.Collector.SERVERS.trim().length() == 0) {
logger.warn("Collector server not configured.");
return;
}
serverList = Config.Collector.SERVERS.split(",");
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
}
@Override
public void run() {
while (true) {
try {
try2Sleep(60 * 1000);
findServerList();
} catch (Throwable t) {
logger.error(t, "Find server list fail.");
}
try {
findServerList();
} catch (Throwable t) {
logger.error(t, "Find server list fail.");
}
}
......@@ -66,11 +70,14 @@ public class DiscoveryRestServiceClient implements Runnable {
for (JsonElement element : serverList) {
newServerList.add(element.getAsString());
}
if (!newServerList.equals(GRPC_SERVERS)) {
if (!isListEquals(newServerList, GRPC_SERVERS)) {
GRPC_SERVERS = newServerList;
logger.debug("Refresh GRPC server list: {}", GRPC_SERVERS);
} else {
logger.debug("GRPC server list remain unchanged: {}", GRPC_SERVERS);
}
}
}
}
......@@ -82,6 +89,20 @@ public class DiscoveryRestServiceClient implements Runnable {
}
}
private boolean isListEquals(List<String> list1, List<String> list2) {
if (list1.size() != list2.size()) {
return false;
}
for (String ip1 : list1) {
if (!list2.contains(ip1)) {
return false;
}
}
return true;
}
/**
* Prepare the given message for HTTP Post service.
*
......
......@@ -10,20 +10,25 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import static org.skywalking.apm.agent.core.conf.Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL;
/**
* @author wusheng
*/
public class GRPCChannelManager implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(DiscoveryRestServiceClient.class);
private volatile Thread channelManagerThread = null;
private volatile ManagedChannel managedChannel = null;
private volatile long nextStartTime = 0;
private volatile ScheduledFuture<?> connectCheckFuture;
private volatile boolean reconnect = true;
private Random random = new Random();
private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());
......@@ -34,7 +39,9 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
this.connectInBackground(false);
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, 0, GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......@@ -42,37 +49,9 @@ public class GRPCChannelManager implements BootService, Runnable {
}
private void connectInBackground(boolean forceStart) {
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
synchronized (this) {
if (forceStart) {
/**
* The startup has invoked in 30 seconds before, don't invoke again.
*/
if (System.currentTimeMillis() < nextStartTime) {
return;
}
}
resetNextStartTime();
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
if (forceStart || managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) {
if (managedChannel != null) {
managedChannel.shutdownNow();
}
Thread channelManagerThread = new Thread(this, "ChannelManagerThread");
channelManagerThread.setDaemon(true);
channelManagerThread.start();
}
}
}
}
}
@Override
public void run() {
while (true) {
resetNextStartTime();
if (reconnect) {
if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) {
int index = random.nextInt() % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
String server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
......@@ -84,19 +63,20 @@ public class GRPCChannelManager implements BootService, Runnable {
.maxInboundMessageSize(1024 * 1024 * 50)
.usePlaintext(true);
managedChannel = channelBuilder.build();
for (GRPCChannelListener listener : listeners) {
listener.statusChanged(GRPCChannelStatus.CONNECTED);
if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
} else {
notify(GRPCChannelStatus.DISCONNECT);
}
break;
return;
} catch (Throwable t) {
logger.error(t, "Create channel to {} fail.", server);
notify(GRPCChannelStatus.DISCONNECT);
}
}
resetNextStartTime();
int waitTime = 5 * 1000;
logger.debug("Selected collector grpc service is not available. Wait {} millis to try", waitTime);
try2Sleep(waitTime);
logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", GRPC_CHANNEL_CHECK_INTERVAL);
}
}
......@@ -110,11 +90,22 @@ public class GRPCChannelManager implements BootService, Runnable {
/**
* If the given expcetion is triggered by network problem, connect in background.
*
* @param throwable
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
this.connectInBackground(true);
reconnect = true;
}
}
private void notify(GRPCChannelStatus status) {
for (GRPCChannelListener listener : listeners) {
try {
listener.statusChanged(status);
} catch (Throwable t) {
logger.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}
......@@ -140,21 +131,4 @@ public class GRPCChannelManager implements BootService, Runnable {
}
return false;
}
private void resetNextStartTime() {
nextStartTime = System.currentTimeMillis() + 20 * 1000;
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
......@@ -4,5 +4,6 @@ package org.skywalking.apm.agent.core.remote;
* @author wusheng
*/
public enum GRPCChannelStatus {
CONNECTED
CONNECTED,
DISCONNECT
}
......@@ -8,9 +8,9 @@ import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.datacarrier.DataCarrier;
import org.skywalking.apm.agent.core.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.agent.core.datacarrier.consumer.IConsumer;
import org.skywalking.apm.commons.datacarrier.DataCarrier;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.Downstream;
......@@ -26,10 +26,11 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
*/
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);
private static final int TIMEOUT = 30 * 1000;
private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub;
private volatile GRPCChannelStatus status = null;
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
@Override
public void beforeBoot() throws Throwable {
......@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish(30 * 1000);
status.wait4Finish(TIMEOUT);
if (logger.isDebugEnable()) {
logger.debug("{} trace segments have been sent to collector.", data.size());
......@@ -127,8 +128,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
serviceStub = TraceSegmentServiceGrpc.newStub(channel);
} else {
}
this.status = status;
}
}
......@@ -4,3 +4,4 @@ org.skywalking.apm.agent.core.remote.CollectorDiscoveryService
org.skywalking.apm.agent.core.sampling.SamplingService
org.skywalking.apm.agent.core.remote.GRPCChannelManager
org.skywalking.apm.agent.core.jvm.JVMService
org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient
package org.skywalking.apm.agent.core.boot;
import org.junit.Assert;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.jvm.JVMService;
import org.skywalking.apm.agent.core.remote.CollectorDiscoveryService;
import org.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.skywalking.apm.agent.core.sampling.SamplingService;
import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* @author wusheng
*/
public class ServiceManagerTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
@Test
public void testBoot() {
ServiceManager.INSTANCE.boot();
ContextManager manager = ServiceManager.INSTANCE.findService(ContextManager.class);
Assert.assertNotNull(manager);
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(7));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
assertCollectorDiscoveryService(ServiceManager.INSTANCE.findService(CollectorDiscoveryService.class));
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
assertTracingContextListener();
assertIgnoreTracingContextListener();
}
private void assertIgnoreTracingContextListener() throws Exception {
List<TracingContextListener> LISTENERS = getFieldValue(IgnoredTracerContext.ListenerManager.class, "LISTENERS");
assertThat(LISTENERS.size(), is(1));
assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
}
private void assertTracingContextListener() throws Exception {
List<TracingContextListener> LISTENERS = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS");
assertThat(LISTENERS.size(), is(3));
assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true));
}
private void assertJVMService(JVMService service) {
assertNotNull(service);
}
private void assertGRPCChannelManager(GRPCChannelManager service) throws Exception {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
assertEquals(listeners.size(), 3);
}
private void assertSamplingService(SamplingService service) {
assertNotNull(service);
}
private void assertCollectorDiscoveryService(CollectorDiscoveryService service) {
assertNotNull(service);
}
private void assertContextManager(ContextManager service) {
assertNotNull(service);
}
private void assertTraceSegmentServiceClient(TraceSegmentServiceClient service) {
assertNotNull(service);
}
private <T> T getFieldValue(Object instance, String fieldName) throws Exception {
Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return (T)field.get(instance);
}
private <T> T getFieldValue(Class clazz, String fieldName) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return (T)field.get(clazz);
}
}
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册