提交 dedc158f 编写于 作者: P pengys5

Merge branch 'master' into feature/266

......@@ -19,16 +19,6 @@ Sky Walking | [中文](https://github.com/wu-sheng/sky-walking/wiki/sky-walking-
* High performance streaming analysis.
* The UI released on [wu-sheng/sky-walking-ui](https://github.com/wu-sheng/sky-walking-ui)
___
<a href="https://github.com/wu-sheng/sky-walking">
<img src="https://sky-walking.github.io/page-resources/3.0/oneapm-award.png" alt="OneAPM Open Source Achievement Award" height="110px" align="left" />
</a>
In October 2016, Sky Walking won `OneAPM Open Source Achievement Award`. The award appreciates sky walking for its "*contribution to popularization of APM technology*". <br/><br/><br/>
Thanks all users of sky walking project.
___
# Contributors
_In chronological order_
......
......@@ -68,16 +68,19 @@ public class SkywalkingSpan implements Span {
return SkywalkingContext.INSTANCE;
}
@NeedSnifferActivation(
"1. ContextManager#activeSpan()" +
"2. SkywalkingSpan#setTag(String, String)")
@Override public Span setTag(String key, String value) {
return null;
return this;
}
@Override public Span setTag(String key, boolean value) {
return null;
return setTag(key, String.valueOf(value));
}
@Override public Span setTag(String key, Number value) {
return null;
return setTag(key, String.valueOf(value));
}
@Override
......
......@@ -20,6 +20,7 @@ public class ConsumerPool<T> {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i].setDaemon(true);
}
}
......
......@@ -20,7 +20,8 @@ service InstanceDiscoveryService {
message ApplicationInstance {
int32 applicationId = 1;
int64 registerTime = 2;
string agentUUID = 2;
int64 registerTime = 3;
}
message ApplicationInstanceMapping {
......@@ -61,4 +62,4 @@ message ServiceNameMappingElement {
message ServiceNameElement {
string serviceName = 1;
int32 applicationId = 2;
}
\ No newline at end of file
}
......@@ -12,12 +12,16 @@ service TraceSegmentService {
}
message UpstreamSegment {
repeated string globalTraceIds = 1;
repeated UniqueId globalTraceIds = 1;
bytes segment = 2; // the byte array of TraceSegmentObject
}
message UniqueId {
repeated int64 idParts = 1;
}
message TraceSegmentObject {
string traceSegmentId = 1;
UniqueId traceSegmentId = 1;
repeated TraceSegmentReference refs = 2;
repeated SpanObject spans = 3;
int32 applicationId = 4;
......@@ -26,13 +30,15 @@ message TraceSegmentObject {
message TraceSegmentReference {
RefType refType = 1;
string parentTraceSegmentId = 2;
UniqueId parentTraceSegmentId = 2;
int32 parentSpanId = 3;
int32 parentApplicationInstanceId = 4;
string networkAddress = 5;
int32 networkAddressId = 6;
string entryServiceName = 7;
int32 entryServiceId = 8;
string parentServiceName = 9;
int32 parentServiceId = 10;
}
message SpanObject {
......
......@@ -13,4 +13,6 @@ public interface BootService {
void boot() throws Throwable;
void afterBoot() throws Throwable;
void shutdown() throws Throwable;
}
......@@ -27,6 +27,16 @@ public enum ServiceManager {
afterBoot();
}
public void shutdown() {
for (BootService service : bootedServices.values()) {
try {
service.shutdown();
} catch (Throwable e) {
logger.error(e, "ServiceManager try to shutdown [{}] fail.", service.getClass().getName());
}
}
}
private Map<Class, BootService> loadAllServices() {
HashMap<Class, BootService> bootedServices = new HashMap<Class, BootService>();
Iterator<BootService> serviceIterator = load().iterator();
......
......@@ -60,12 +60,28 @@ public class Config {
public static String DISCOVERY_SERVICE_NAME = "/grpc/addresses";
}
public static class Jvm {
/**
* The buffer size of collected JVM info.
*/
public static int BUFFER_SIZE = 60 * 10;
}
public static class Buffer {
public static int CHANNEL_SIZE = 5;
public static int BUFFER_SIZE = 300;
}
public static class Dictionary {
/**
* The buffer size of application codes and peer
*/
public static int APPLICATION_CODE_BUFFER_SIZE = 10 * 10000;
public static int OPERATION_NAME_BUFFER_SIZE = 1000 * 10000;
}
public static class Logging {
/**
* Log file name.
......
......@@ -55,7 +55,7 @@ public interface AbstractTracerContext {
* @return the string represents the id.
* @see {@link TracingContext} and {@link IgnoredTracerContext}
*/
String getGlobalTraceId();
String getReadableGlobalTraceId();
/**
* Create an entry span
......
package org.skywalking.apm.agent.core.context;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.ID;
import org.skywalking.apm.agent.core.context.ids.PropagatedTraceId;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
......@@ -19,7 +19,7 @@ public class ContextCarrier implements Serializable {
/**
* {@link TraceSegment#traceSegmentId}
*/
private String traceSegmentId;
private ID traceSegmentId;
private int spanId = -1;
......@@ -29,10 +29,12 @@ public class ContextCarrier implements Serializable {
private String entryOperationName;
private String parentOperationName;
/**
* {@link DistributedTraceId}
*/
private List<DistributedTraceId> distributedTraceIds;
private DistributedTraceId primaryDistributedTraceId;
/**
* Serialize this {@link ContextCarrier} to a {@link String},
......@@ -43,12 +45,13 @@ public class ContextCarrier implements Serializable {
public String serialize() {
if (this.isValid()) {
return StringUtil.join('|',
this.getTraceSegmentId(),
this.getTraceSegmentId().toBase64(),
this.getSpanId() + "",
this.getApplicationInstanceId() + "",
this.getPeerHost(),
this.getEntryOperationName(),
this.serializeDistributedTraceIds());
this.getParentOperationName(),
this.getPrimaryDistributedTraceId());
} else {
return "";
}
......@@ -61,15 +64,16 @@ public class ContextCarrier implements Serializable {
*/
public ContextCarrier deserialize(String text) {
if (text != null) {
String[] parts = text.split("\\|", 6);
if (parts.length == 6) {
String[] parts = text.split("\\|", 7);
if (parts.length == 7) {
try {
this.traceSegmentId = parts[0];
this.traceSegmentId = new ID(parts[0]);
this.spanId = Integer.parseInt(parts[1]);
this.applicationInstanceId = Integer.parseInt(parts[2]);
this.peerHost = parts[3];
this.entryOperationName = parts[4];
this.distributedTraceIds = deserializeDistributedTraceIds(parts[5]);
this.parentOperationName = parts[5];
this.primaryDistributedTraceId = new PropagatedTraceId(parts[6]);
} catch (NumberFormatException e) {
}
......@@ -84,12 +88,13 @@ public class ContextCarrier implements Serializable {
* @return true for unbroken {@link ContextCarrier} or no-initialized. Otherwise, false;
*/
public boolean isValid() {
return !StringUtil.isEmpty(traceSegmentId)
return traceSegmentId != null
&& getSpanId() > -1
&& applicationInstanceId != DictionaryUtil.nullValue()
&& !StringUtil.isEmpty(peerHost)
&& !StringUtil.isEmpty(entryOperationName)
&& distributedTraceIds != null;
&& !StringUtil.isEmpty(parentOperationName)
&& primaryDistributedTraceId != null;
}
public String getEntryOperationName() {
......@@ -104,7 +109,15 @@ public class ContextCarrier implements Serializable {
this.entryOperationName = entryOperationId + "";
}
public String getTraceSegmentId() {
void setParentOperationName(String parentOperationName) {
this.parentOperationName = '#' + parentOperationName;
}
void setParentOperationId(int parentOperationId) {
this.parentOperationName = parentOperationId + "";
}
public ID getTraceSegmentId() {
return traceSegmentId;
}
......@@ -112,7 +125,7 @@ public class ContextCarrier implements Serializable {
return spanId;
}
void setTraceSegmentId(String traceSegmentId) {
void setTraceSegmentId(ID traceSegmentId) {
this.traceSegmentId = traceSegmentId;
}
......@@ -140,51 +153,19 @@ public class ContextCarrier implements Serializable {
this.peerHost = peerId + "";
}
public List<DistributedTraceId> getDistributedTraceIds() {
return distributedTraceIds;
public DistributedTraceId getDistributedTraceId() {
return primaryDistributedTraceId;
}
public void setDistributedTraceIds(List<DistributedTraceId> distributedTraceIds) {
this.distributedTraceIds = distributedTraceIds;
this.primaryDistributedTraceId = distributedTraceIds.get(0);
}
/**
* Serialize {@link #distributedTraceIds} to a string, with ',' split.
*
* @return string, represents all {@link DistributedTraceId}
*/
private String serializeDistributedTraceIds() {
StringBuilder traceIdString = new StringBuilder();
if (distributedTraceIds != null) {
boolean first = true;
for (DistributedTraceId distributedTraceId : distributedTraceIds) {
if (first) {
first = false;
} else {
traceIdString.append(",");
}
traceIdString.append(distributedTraceId.get());
}
}
return traceIdString.toString();
private String getPrimaryDistributedTraceId() {
return primaryDistributedTraceId.toBase64();
}
/**
* Deserialize {@link #distributedTraceIds} from a text, whith
*
* @param text
* @return
*/
private List<DistributedTraceId> deserializeDistributedTraceIds(String text) {
if (StringUtil.isEmpty(text)) {
return null;
}
String[] propagationTraceIdValues = text.split(",");
List<DistributedTraceId> traceIds = new LinkedList<DistributedTraceId>();
for (String propagationTraceIdValue : propagationTraceIdValues) {
traceIds.add(new PropagatedTraceId(propagationTraceIdValue));
}
return traceIds;
public String getParentOperationName() {
return parentOperationName;
}
}
......@@ -76,7 +76,7 @@ public class ContextManager implements TracingContextListener, BootService, Igno
if (segment == null) {
return "N/A";
} else {
return segment.getGlobalTraceId();
return segment.getReadableGlobalTraceId();
}
}
......@@ -162,6 +162,10 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
@Override public void shutdown() throws Throwable {
}
@Override
public void afterFinished(TraceSegment traceSegment) {
CONTEXT.remove();
......
......@@ -2,6 +2,8 @@ package org.skywalking.apm.agent.core.context;
import java.util.List;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.ID;
import org.skywalking.apm.util.StringUtil;
/**
* The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building
......@@ -13,30 +15,52 @@ public class ContextSnapshot {
/**
* trace segment id of the parent trace segment.
*/
private String traceSegmentId;
private ID traceSegmentId;
/**
* span id of the parent span, in parent trace segment.
*/
private int spanId = -1;
private String entryOperationName;
private String parentOperationName;
/**
* {@link DistributedTraceId}
*/
private List<DistributedTraceId> distributedTraceIds;
private DistributedTraceId primaryDistributedTraceId;
ContextSnapshot(String traceSegmentId, int spanId,
ContextSnapshot(ID traceSegmentId, int spanId,
List<DistributedTraceId> distributedTraceIds) {
this.traceSegmentId = traceSegmentId;
this.spanId = spanId;
this.distributedTraceIds = distributedTraceIds;
if (distributedTraceIds != null) {
this.primaryDistributedTraceId = distributedTraceIds.get(0);
}
}
public void setEntryOperationName(String entryOperationName) {
this.entryOperationName = "#" + entryOperationName;
}
public void setEntryOperationId(int entryOperationId) {
this.entryOperationName = entryOperationId + "";
}
public void setParentOperationName(String parentOperationName) {
this.parentOperationName = "#" + parentOperationName;
}
public void setParentOperationId(int parentOperationId) {
this.parentOperationName = parentOperationId + "";
}
public List<DistributedTraceId> getDistributedTraceIds() {
return distributedTraceIds;
public DistributedTraceId getDistributedTraceId() {
return primaryDistributedTraceId;
}
public String getTraceSegmentId() {
public ID getTraceSegmentId() {
return traceSegmentId;
}
......@@ -44,10 +68,19 @@ public class ContextSnapshot {
return spanId;
}
public String getParentOperationName() {
return parentOperationName;
}
public boolean isValid() {
return traceSegmentId != null
&& spanId > -1
&& distributedTraceIds != null
&& distributedTraceIds.size() > 0;
&& primaryDistributedTraceId != null
&& !StringUtil.isEmpty(entryOperationName)
&& !StringUtil.isEmpty(parentOperationName);
}
public String getEntryOperationName() {
return entryOperationName;
}
}
......@@ -41,7 +41,7 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override
public String getGlobalTraceId() {
public String getReadableGlobalTraceId() {
return "[Ignored Trace]";
}
......
......@@ -96,8 +96,8 @@ public class TracingContext implements AbstractTracerContext {
String operationName;
if (refs != null && refs.size() > 0) {
TraceSegmentRef ref = refs.get(0);
operationId = ref.getOperationId();
operationName = ref.getOperationName();
operationId = ref.getEntryOperationId();
operationName = ref.getEntryOperationName();
} else {
AbstractTracingSpan firstSpan = first();
operationId = firstSpan.getOperationId();
......@@ -109,6 +109,13 @@ public class TracingContext implements AbstractTracerContext {
carrier.setEntryOperationId(operationId);
}
int parentOperationId = first().getOperationId();
if (parentOperationId == DictionaryUtil.nullValue()) {
carrier.setParentOperationName(first().getOperationName());
} else {
carrier.setParentOperationId(parentOperationId);
}
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
}
......@@ -121,7 +128,7 @@ public class TracingContext implements AbstractTracerContext {
@Override
public void extract(ContextCarrier carrier) {
this.segment.ref(new TraceSegmentRef(carrier));
this.segment.relatedGlobalTraces(carrier.getDistributedTraceIds());
this.segment.relatedGlobalTraces(carrier.getDistributedTraceId());
}
/**
......@@ -132,10 +139,33 @@ public class TracingContext implements AbstractTracerContext {
*/
@Override
public ContextSnapshot capture() {
return new ContextSnapshot(segment.getTraceSegmentId(),
List<TraceSegmentRef> refs = this.segment.getRefs();
ContextSnapshot snapshot = new ContextSnapshot(segment.getTraceSegmentId(),
activeSpan().getSpanId(),
segment.getRelatedGlobalTraces()
);
segment.getRelatedGlobalTraces());
int entryOperationId;
String entryOperationName;
AbstractTracingSpan firstSpan = first();
if (refs != null && refs.size() > 0) {
TraceSegmentRef ref = refs.get(0);
entryOperationId = ref.getEntryOperationId();
entryOperationName = ref.getEntryOperationName();
} else {
entryOperationId = firstSpan.getOperationId();
entryOperationName = firstSpan.getOperationName();
}
if (entryOperationId == DictionaryUtil.nullValue()) {
snapshot.setEntryOperationName(entryOperationName);
} else {
snapshot.setEntryOperationId(entryOperationId);
}
if (firstSpan.getOperationId() == DictionaryUtil.nullValue()) {
snapshot.setParentOperationName(firstSpan.getOperationName());
} else {
snapshot.setParentOperationId(firstSpan.getOperationId());
}
return snapshot;
}
/**
......@@ -147,15 +177,15 @@ public class TracingContext implements AbstractTracerContext {
@Override
public void continued(ContextSnapshot snapshot) {
this.segment.ref(new TraceSegmentRef(snapshot));
this.segment.relatedGlobalTraces(snapshot.getDistributedTraceIds());
this.segment.relatedGlobalTraces(snapshot.getDistributedTraceId());
}
/**
* @return the first global trace id.
*/
@Override
public String getGlobalTraceId() {
return segment.getRelatedGlobalTraces().get(0).get();
public String getReadableGlobalTraceId() {
return segment.getRelatedGlobalTraces().get(0).toString();
}
/**
......@@ -256,13 +286,13 @@ public class TracingContext implements AbstractTracerContext {
.doInCondition(
new PossibleFound.FoundAndObtain() {
@Override
public Object doProcess(int peerId) {
return new ExitSpan(spanIdGenerator++, parentSpanId, applicationId, peerId);
public Object doProcess(int operationId) {
return new ExitSpan(spanIdGenerator++, parentSpanId, operationId, applicationId);
}
}, new PossibleFound.NotFoundAndObtain() {
@Override
public Object doProcess() {
return new ExitSpan(spanIdGenerator++, parentSpanId, applicationId, remotePeer);
return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer);
}
});
}
......
package org.skywalking.apm.agent.core.context.ids;
import org.skywalking.apm.network.proto.UniqueId;
/**
* The <code>DistributedTraceId</code> presents a distributed call chain.
* <p>
......@@ -14,14 +16,27 @@ package org.skywalking.apm.agent.core.context.ids;
* @author wusheng
*/
public abstract class DistributedTraceId {
private String id;
private ID id;
public DistributedTraceId(String id) {
public DistributedTraceId(ID id) {
this.id = id;
}
public String get() {
return id;
public DistributedTraceId(String id) {
this.id = new ID(id);
}
public String toBase64() {
return id.toBase64();
}
@Override
public String toString() {
return id.toString();
}
public UniqueId toUniqueId() {
return id.transform();
}
/**
......@@ -38,7 +53,7 @@ public abstract class DistributedTraceId {
if (o == null || getClass() != o.getClass())
return false;
DistributedTraceId id1 = (DistributedTraceId) o;
DistributedTraceId id1 = (DistributedTraceId)o;
return id != null ? id.equals(id1.id) : id1.id == null;
}
......
package org.skywalking.apm.agent.core.context.ids;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -12,7 +7,6 @@ import java.util.List;
/**
* @author wusheng
*/
@JsonAdapter(DistributedTraceIds.Serializer.class)
public class DistributedTraceIds {
private LinkedList<DistributedTraceId> relatedGlobalTraces;
......@@ -32,35 +26,4 @@ public class DistributedTraceIds {
relatedGlobalTraces.add(distributedTraceId);
}
}
public static class Serializer extends TypeAdapter<DistributedTraceIds> {
@Override
public void write(JsonWriter out, DistributedTraceIds value) throws IOException {
List<DistributedTraceId> globalTraces = value.getRelatedGlobalTraces();
if (globalTraces.size() > 0) {
out.beginArray();
for (DistributedTraceId trace : globalTraces) {
out.value(trace.get());
}
out.endArray();
}
}
@Override
public DistributedTraceIds read(JsonReader in) throws IOException {
DistributedTraceIds distributedTraceIds = new DistributedTraceIds();
in.beginArray();
try {
while (in.hasNext()) {
PropagatedTraceId traceId = new PropagatedTraceId(in.nextString());
distributedTraceIds.append(traceId);
}
} finally {
in.endArray();
}
return distributedTraceIds;
}
}
}
package org.skywalking.apm.agent.core.context.ids;
import java.util.UUID;
import org.skywalking.apm.util.MachineInfo;
import org.skywalking.apm.util.StringUtil;
import java.util.Random;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
public final class GlobalIdGenerator {
private static final ThreadLocal<Integer> THREAD_ID_SEQUENCE = new ThreadLocal<Integer>() {
private static final ThreadLocal<IDContext> THREAD_ID_SEQUENCE = new ThreadLocal<IDContext>() {
@Override
protected Integer initialValue() {
return 0;
protected IDContext initialValue() {
return new IDContext(System.currentTimeMillis(), (short)0);
}
};
private static final int PROCESS_UUID;
static {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
PROCESS_UUID = uuid.substring(uuid.length() - 7).hashCode();
private GlobalIdGenerator() {
}
private GlobalIdGenerator() {
/**
* Generate a new id, combined by three long numbers.
*
* The first one represents application instance id. (most likely just an integer value, would be helpful in
* protobuf)
*
* The second one represents thread id. (most likely just an integer value, would be helpful in protobuf)
*
* The third one also has two parts,<br/>
* 1) a timestamp, measured in milliseconds<br/>
* 2) a seq, in current thread, between 0(included) and 9999(included)
*
* Notice, a long costs 8 bytes, three longs cost 24 bytes. And at the same time, a char costs 2 bytes. So
* sky-walking's old global and segment id like this: "S.1490097253214.-866187727.57515.1.1" which costs at least 72
* bytes.
*
* @return an array contains three long numbers, which represents a unique id.
*/
public static ID generate() {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
throw new IllegalStateException();
}
IDContext context = THREAD_ID_SEQUENCE.get();
return new ID(
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID,
Thread.currentThread().getId(),
context.nextSeq()
);
}
public static String generate(String type) {
Integer seq = THREAD_ID_SEQUENCE.get();
seq++;
THREAD_ID_SEQUENCE.set(seq);
private static class IDContext {
private long lastTimestamp;
private short threadSeq;
// Just for considering time-shift-back only.
private long runRandomTimestamp;
private int lastRandomValue;
private Random random;
private IDContext(long lastTimestamp, short threadSeq) {
this.lastTimestamp = lastTimestamp;
this.threadSeq = threadSeq;
}
private long nextSeq() {
return timestamp() * 10000 + nextThreadSeq();
}
private long timestamp() {
long currentTimeMillis = System.currentTimeMillis();
return StringUtil.join('.',
type + "", System.currentTimeMillis() + "", PROCESS_UUID + "",
MachineInfo.getProcessNo() + "", Thread.currentThread().getId() + "", seq + "");
if (currentTimeMillis < lastTimestamp) {
// Just for considering time-shift-back by Ops or OS. @hanahmily 's suggestion.
if (random == null) {
random = new Random();
}
if (runRandomTimestamp != currentTimeMillis) {
lastRandomValue = random.nextInt();
runRandomTimestamp = currentTimeMillis;
}
return lastRandomValue;
} else {
lastTimestamp = currentTimeMillis;
return lastTimestamp;
}
}
private short nextThreadSeq() {
return threadSeq++;
}
}
}
package org.skywalking.apm.agent.core.context.ids;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.skywalking.apm.agent.core.context.ids.base64.Base64;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author wusheng
*/
public class ID {
private static final Base64.Encoder ENCODER = Base64.getEncoder();
private static final Base64.Decoder DECODER = Base64.getDecoder();
private long part1;
private long part2;
private long part3;
public ID(long part1, long part2, long part3) {
this.part1 = part1;
this.part2 = part2;
this.part3 = part3;
}
public ID(String base64String) {
int index = 0;
for (int part = 0; part < 3; part++) {
String encodedString;
char potentialTypeChar = base64String.charAt(index);
long value;
if (potentialTypeChar == '#') {
encodedString = base64String.substring(index + 1, index + 5);
index += 5;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(0);
} else if (potentialTypeChar == '$') {
encodedString = base64String.substring(index + 1, index + 9);
index += 9;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().get(0);
} else {
encodedString = base64String.substring(index, index + 12);
index += 12;
value = ByteBuffer.wrap(DECODER.decode(encodedString)).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().get(0);
}
if (part == 0) {
part1 = value;
} else if (part == 1) {
part2 = value;
} else {
part3 = value;
}
}
}
public String toBase64() {
return long2Base64(part1) + long2Base64(part2) + long2Base64(part3);
}
private String long2Base64(long partN) {
if (partN < 0) {
throw new IllegalArgumentException("negative value.");
}
if (partN < 32768) {
// 0 - 32767
// "#" as a prefix of a short value with base64 encoding.
byte[] data = new byte[2];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put((short)partN);
return '#' + ENCODER.encodeToString(data);
} else if (partN <= 2147483647) {
// 32768 - 2147483647
// "$" as a prefix of an integer value (greater than a short) with base64 encoding.
byte[] data = new byte[4];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put((int)partN);
return '$' + ENCODER.encodeToString(data);
} else {
// > 2147483647
// a long value (greater than an integer)
byte[] data = new byte[8];
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(partN);
return ENCODER.encodeToString(data);
}
}
@Override public String toString() {
return part1 + "." + part2 + '.' + part3;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ID id = (ID)o;
if (part1 != id.part1)
return false;
if (part2 != id.part2)
return false;
return part3 == id.part3;
}
@Override public int hashCode() {
int result = (int)(part1 ^ (part1 >>> 32));
result = 31 * result + (int)(part2 ^ (part2 >>> 32));
result = 31 * result + (int)(part3 ^ (part3 >>> 32));
return result;
}
public UniqueId transform() {
return UniqueId.newBuilder().addIdParts(part1).addIdParts(part2).addIdParts(part3).build();
}
}
......@@ -6,9 +6,7 @@ package org.skywalking.apm.agent.core.context.ids;
* @author wusheng
*/
public class NewDistributedTraceId extends DistributedTraceId {
private static final String ID_TYPE = "T";
public NewDistributedTraceId() {
super(GlobalIdGenerator.generate(ID_TYPE));
super(GlobalIdGenerator.generate());
}
}
package org.skywalking.apm.agent.core.context.ids.base64;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
* This class consists of {@code static} utility methods for operating
* on objects. These utilities include {@code null}-safe or {@code
* null}-tolerant methods for computing the hash code of an object,
* returning a string for an object, and comparing two objects.
*
* @since 1.7
*/
public final class Objects {
private Objects() {
throw new AssertionError("No java.util.Objects instances for you!");
}
/**
* Returns {@code true} if the arguments are equal to each other
* and {@code false} otherwise.
* Consequently, if both arguments are {@code null}, {@code true}
* is returned and if exactly one argument is {@code null}, {@code
* false} is returned. Otherwise, equality is determined by using
* the {@link Object#equals equals} method of the first
* argument.
*
* @param a an object
* @param b an object to be compared with {@code a} for equality
* @return {@code true} if the arguments are equal to each other
* and {@code false} otherwise
* @see Object#equals(Object)
*/
public static boolean equals(Object a, Object b) {
return (a == b) || (a != null && a.equals(b));
}
/**
* Returns the hash code of a non-{@code null} argument and 0 for
* a {@code null} argument.
*
* @param o an object
* @return the hash code of a non-{@code null} argument and 0 for
* a {@code null} argument
* @see Object#hashCode
*/
public static int hashCode(Object o) {
return o != null ? o.hashCode() : 0;
}
/**
* Generates a hash code for a sequence of input values. The hash
* code is generated as if all the input values were placed into an
* array, and that array were hashed by calling {@link
* Arrays#hashCode(Object[])}.
*
* <p>This method is useful for implementing {@link
* Object#hashCode()} on objects containing multiple fields. For
* example, if an object that has three fields, {@code x}, {@code
* y}, and {@code z}, one could write:
*
* <blockquote><pre>
* &#064;Override public int hashCode() {
* return Objects.hash(x, y, z);
* }
* </pre></blockquote>
*
* <b>Warning: When a single object reference is supplied, the returned
* value does not equal the hash code of that object reference.</b> This
* value can be computed by calling {@link #hashCode(Object)}.
*
* @param values the values to be hashed
* @return a hash value of the sequence of input values
* @see Arrays#hashCode(Object[])
* @see List#hashCode
*/
public static int hash(Object... values) {
return Arrays.hashCode(values);
}
/**
* Returns the result of calling {@code toString} for a non-{@code
* null} argument and {@code "null"} for a {@code null} argument.
*
* @param o an object
* @return the result of calling {@code toString} for a non-{@code
* null} argument and {@code "null"} for a {@code null} argument
* @see Object#toString
* @see String#valueOf(Object)
*/
public static String toString(Object o) {
return String.valueOf(o);
}
/**
* Returns the result of calling {@code toString} on the first
* argument if the first argument is not {@code null} and returns
* the second argument otherwise.
*
* @param o an object
* @param nullDefault string to return if the first argument is
* {@code null}
* @return the result of calling {@code toString} on the first
* argument if it is not {@code null} and the second argument
* otherwise.
*/
public static String toString(Object o, String nullDefault) {
return (o != null) ? o.toString() : nullDefault;
}
/**
* Returns 0 if the arguments are identical and {@code
* c.compare(a, b)} otherwise.
* Consequently, if both arguments are {@code null} 0
* is returned.
*
* <p>Note that if one of the arguments is {@code null}, a {@code
* NullPointerException} may or may not be thrown depending on
* what ordering policy, if any, the {@link Comparator Comparator}
* chooses to have for {@code null} values.
*
* @param <T> the type of the objects being compared
* @param a an object
* @param b an object to be compared with {@code a}
* @param c the {@code Comparator} to compare the first two arguments
* @return 0 if the arguments are identical and {@code
* c.compare(a, b)} otherwise.
* @see Comparable
* @see Comparator
*/
public static <T> int compare(T a, T b, Comparator<? super T> c) {
return (a == b) ? 0 : c.compare(a, b);
}
/**
* Checks that the specified object reference is not {@code null}. This
* method is designed primarily for doing parameter validation in methods
* and constructors, as demonstrated below:
* <blockquote><pre>
* public Foo(Bar bar) {
* this.bar = Objects.requireNonNull(bar);
* }
* </pre></blockquote>
*
* @param obj the object reference to check for nullity
* @param <T> the type of the reference
* @return {@code obj} if not {@code null}
* @throws NullPointerException if {@code obj} is {@code null}
*/
public static <T> T requireNonNull(T obj) {
if (obj == null)
throw new NullPointerException();
return obj;
}
}
package org.skywalking.apm.agent.core.context.ids.base64;
import java.nio.charset.Charset;
/**
* Constant definitions for the standard {@link Charset Charsets}. These
* charsets are guaranteed to be available on every implementation of the Java
* platform.
*
* @see <a href="Charset#standard">Standard Charsets</a>
* @since 1.7
*/
public final class StandardCharsets {
private StandardCharsets() {
throw new AssertionError("No java.nio.charset.StandardCharsets instances for you!");
}
/**
* Seven-bit ASCII, a.k.a. ISO646-US, a.k.a. the Basic Latin block of the
* Unicode character set
*/
public static final Charset US_ASCII = Charset.forName("US-ASCII");
/**
* ISO Latin Alphabet No. 1, a.k.a. ISO-LATIN-1
*/
public static final Charset ISO_8859_1 = Charset.forName("ISO-8859-1");
/**
* Eight-bit UCS Transformation Format
*/
public static final Charset UTF_8 = Charset.forName("UTF-8");
/**
* Sixteen-bit UCS Transformation Format, big-endian byte order
*/
public static final Charset UTF_16BE = Charset.forName("UTF-16BE");
/**
* Sixteen-bit UCS Transformation Format, little-endian byte order
*/
public static final Charset UTF_16LE = Charset.forName("UTF-16LE");
/**
* Sixteen-bit UCS Transformation Format, byte order identified by an
* optional byte-order mark
*/
public static final Charset UTF_16 = Charset.forName("UTF-16");
}
......@@ -6,6 +6,7 @@ import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceIds;
import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator;
import org.skywalking.apm.agent.core.context.ids.ID;
import org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
......@@ -31,7 +32,7 @@ public class TraceSegment {
* The id of this trace segment.
* Every segment has its unique-global-id.
*/
private String traceSegmentId;
private ID traceSegmentId;
/**
* The refs of parent trace segments, except the primary one.
......@@ -71,7 +72,7 @@ public class TraceSegment {
* and generate a new segment id.
*/
public TraceSegment() {
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.traceSegmentId = GlobalIdGenerator.generate();
this.spans = new LinkedList<AbstractTracingSpan>();
this.relatedGlobalTraces = new DistributedTraceIds();
this.relatedGlobalTraces.append(new NewDistributedTraceId());
......@@ -93,16 +94,9 @@ public class TraceSegment {
/**
* Establish the line between this segment and all relative global trace ids.
*
* @param distributedTraceIds multi global trace ids. @see {@link DistributedTraceId}
*/
public void relatedGlobalTraces(List<DistributedTraceId> distributedTraceIds) {
if (distributedTraceIds == null || distributedTraceIds.size() == 0) {
return;
}
for (DistributedTraceId distributedTraceId : distributedTraceIds) {
relatedGlobalTraces.append(distributedTraceId);
}
public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {
relatedGlobalTraces.append(distributedTraceId);
}
/**
......@@ -124,7 +118,7 @@ public class TraceSegment {
return this;
}
public String getTraceSegmentId() {
public ID getTraceSegmentId() {
return traceSegmentId;
}
......@@ -164,13 +158,13 @@ public class TraceSegment {
public UpstreamSegment transform() {
UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();
for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {
upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.get());
upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());
}
TraceSegmentObject.Builder traceSegmentBuilder = TraceSegmentObject.newBuilder();
/**
* Trace Segment
*/
traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId);
traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());
// TraceSegmentReference
if (this.refs != null) {
for (TraceSegmentRef ref : this.refs) {
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextSnapshot;
import org.skywalking.apm.agent.core.context.ids.ID;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.RefType;
import org.skywalking.apm.network.proto.TraceSegmentReference;
......@@ -15,7 +16,7 @@ import org.skywalking.apm.network.proto.TraceSegmentReference;
public class TraceSegmentRef {
private SegmentRefType type;
private String traceSegmentId;
private ID traceSegmentId;
private int spanId = -1;
......@@ -25,9 +26,13 @@ public class TraceSegmentRef {
private int peerId = DictionaryUtil.nullValue();
private String operationName;
private String entryOperationName;
private int operationId = DictionaryUtil.nullValue();
private int entryOperationId = DictionaryUtil.nullValue();
private String parentOperationName;
private int parentOperationId = DictionaryUtil.nullValue();
/**
* Transform a {@link ContextCarrier} to the <code>TraceSegmentRef</code>
......@@ -47,9 +52,15 @@ public class TraceSegmentRef {
}
String entryOperationName = carrier.getEntryOperationName();
if (entryOperationName.charAt(0) == '#') {
this.operationName = entryOperationName.substring(1);
this.entryOperationName = entryOperationName.substring(1);
} else {
this.operationId = Integer.parseInt(entryOperationName);
this.entryOperationId = Integer.parseInt(entryOperationName);
}
String parentOperationName = carrier.getParentOperationName();
if (parentOperationName.charAt(0) == '#') {
this.parentOperationName = parentOperationName.substring(1);
} else {
this.parentOperationId = Integer.parseInt(parentOperationName);
}
}
......@@ -57,14 +68,26 @@ public class TraceSegmentRef {
this.type = SegmentRefType.CROSS_THREAD;
this.traceSegmentId = snapshot.getTraceSegmentId();
this.spanId = snapshot.getSpanId();
String entryOperationName = snapshot.getEntryOperationName();
if (entryOperationName.charAt(0) == '#') {
this.entryOperationName = entryOperationName.substring(1);
} else {
this.entryOperationId = Integer.parseInt(entryOperationName);
}
String parentOperationName = snapshot.getParentOperationName();
if (parentOperationName.charAt(0) == '#') {
this.parentOperationName = parentOperationName.substring(1);
} else {
this.parentOperationId = Integer.parseInt(parentOperationName);
}
}
public String getOperationName() {
return operationName;
public String getEntryOperationName() {
return entryOperationName;
}
public int getOperationId() {
return operationId;
public int getEntryOperationId() {
return entryOperationId;
}
public TraceSegmentReference transform() {
......@@ -77,17 +100,22 @@ public class TraceSegmentRef {
} else {
refBuilder.setNetworkAddressId(peerId);
}
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
} else {
refBuilder.setEntryServiceId(operationId);
}
} else {
refBuilder.setRefType(RefType.CrossThread);
}
refBuilder.setParentTraceSegmentId(traceSegmentId);
refBuilder.setParentSpanId(spanId);
refBuilder.setParentTraceSegmentId(traceSegmentId.transform());
refBuilder.setParentSpanId(spanId);
if (entryOperationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(entryOperationName);
} else {
refBuilder.setEntryServiceId(entryOperationId);
}
if (parentOperationId == DictionaryUtil.nullValue()) {
refBuilder.setParentServiceName(parentOperationName);
} else {
refBuilder.setParentServiceId(parentOperationId);
}
return refBuilder.build();
}
......
......@@ -9,6 +9,8 @@ import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
import static org.skywalking.apm.agent.core.conf.Config.Dictionary.APPLICATION_CODE_BUFFER_SIZE;
/**
* Map of application id to application code, which is from the collector side.
*
......@@ -24,7 +26,9 @@ public enum ApplicationDictionary {
if (applicationId != null) {
return new Found(applicationId);
} else {
unRegisterApplications.add(applicationCode);
if (applicationDictionary.size() + unRegisterApplications.size() < APPLICATION_CODE_BUFFER_SIZE) {
unRegisterApplications.add(applicationCode);
}
return new NotFound();
}
}
......
......@@ -10,6 +10,8 @@ import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
import static org.skywalking.apm.agent.core.conf.Config.Dictionary.OPERATION_NAME_BUFFER_SIZE;
/**
* @author wusheng
*/
......@@ -19,12 +21,17 @@ public enum OperationNameDictionary {
private Set<OperationNameKey> unRegisterOperationNames = new ConcurrentSet<OperationNameKey>();
public PossibleFound find(int applicationId, String operationName) {
if (operationName == null || operationName.length() == 0) {
return new NotFound();
}
OperationNameKey key = new OperationNameKey(applicationId, operationName);
Integer operationId = operationNameDictionary.get(key);
if (operationId != null) {
return new Found(applicationId);
} else {
unRegisterOperationNames.add(key);
if (operationNameDictionary.size() + unRegisterOperationNames.size() < OPERATION_NAME_BUFFER_SIZE) {
unRegisterOperationNames.add(key);
}
return new NotFound();
}
}
......
package org.skywalking.apm.agent.core.jvm;
import io.grpc.ManagedChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.jvm.cpu.CPUProvider;
......@@ -36,16 +35,14 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
*/
public class JVMService implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(JVMService.class);
private ReentrantLock lock = new ReentrantLock();
private volatile LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>();
private SimpleDateFormat sdf = new SimpleDateFormat("ss");
private LinkedBlockingQueue<JVMMetric> queue;
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private volatile int lastBlockIdx = -1;
private Sender sender;
@Override
public void beforeBoot() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
sender = new Sender();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);
}
......@@ -57,7 +54,7 @@ public class JVMService implements BootService, Runnable {
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(sender, 0, 15, TimeUnit.SECONDS);
.scheduleAtFixedRate(sender, 0, 1, TimeUnit.SECONDS);
}
@Override
......@@ -65,38 +62,33 @@ public class JVMService implements BootService, Runnable {
}
@Override
public void shutdown() throws Throwable {
collectMetricFuture.cancel(true);
sendMetricFuture.cancel(true);
}
@Override
public void run() {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()
) {
long currentTimeMillis = System.currentTimeMillis();
Date day = new Date(currentTimeMillis);
String second = sdf.format(day);
int blockIndex = Integer.parseInt(second) / 15;
if (blockIndex != lastBlockIdx) {
lastBlockIdx = blockIndex;
try {
JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();
jvmBuilder.setTime(currentTimeMillis);
jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricList());
jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());
try {
JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();
jvmBuilder.setTime(currentTimeMillis);
jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricList());
jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());
JVMMetric jvmMetric = jvmBuilder.build();
lock.lock();
try {
buffer.add(jvmMetric);
while (buffer.size() > 4) {
buffer.removeFirst();
}
} finally {
lock.unlock();
}
} catch (Exception e) {
logger.error(e, "Collect JVM info fail.");
JVMMetric jvmMetric = jvmBuilder.build();
if (queue.offer(jvmMetric)) {
queue.poll();
queue.offer(jvmMetric);
}
} catch (Exception e) {
logger.error(e, "Collect JVM info fail.");
}
}
}
......@@ -113,13 +105,9 @@ public class JVMService implements BootService, Runnable {
if (status == GRPCChannelStatus.CONNECTED) {
try {
JVMMetrics.Builder builder = JVMMetrics.newBuilder();
lock.lock();
try {
builder.addAllMetrics(buffer);
buffer.clear();
} finally {
lock.unlock();
}
LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>();
queue.drainTo(buffer);
builder.addAllMetrics(buffer);
builder.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID);
stub.collect(builder.build());
......
package org.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -33,6 +34,7 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
*/
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
......@@ -77,6 +79,11 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
applicationRegisterFuture.cancel(true);
}
@Override
public void run() {
if (CONNECTED.equals(status)) {
......@@ -95,6 +102,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.register(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
......
package org.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.Config;
......@@ -11,6 +12,8 @@ import org.skywalking.apm.agent.core.conf.Config;
* @author wusheng
*/
public class CollectorDiscoveryService implements BootService {
private ScheduledFuture<?> future;
@Override
public void beforeBoot() throws Throwable {
......@@ -18,7 +21,7 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void boot() throws Throwable {
Executors.newSingleThreadScheduledExecutor()
future = Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}
......@@ -27,4 +30,9 @@ public class CollectorDiscoveryService implements BootService {
public void afterBoot() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
future.cancel(true);
}
}
......@@ -49,6 +49,12 @@ public class GRPCChannelManager implements BootService, Runnable {
}
@Override
public void shutdown() throws Throwable {
connectCheckFuture.cancel(true);
managedChannel.shutdownNow();
}
@Override
public void run() {
if (reconnect) {
......
......@@ -49,6 +49,11 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
carrier.shutdownConsumers();
}
@Override
public void init() {
......
......@@ -61,6 +61,11 @@ public class SamplingService implements BootService {
}
@Override
public void shutdown() throws Throwable {
scheduledFuture.cancel(true);
}
/**
* @return true, if sampling mechanism is on, and get the sampling factor successfully.
*/
......
package org.skywalking.apm.agent.core.context;
import com.google.instrumentation.trace.Span;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
......@@ -58,7 +55,7 @@ public class ContextManagerTest {
@Test
public void createSpanWithInvalidateContextCarrier() {
ContextCarrier contextCarrier = new ContextCarrier().deserialize("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/");
ContextCarrier contextCarrier = new ContextCarrier().deserialize("#AQA=#AQA=4WcWe0tQNQA=|1|#127.0.0.1:8080|#/testEntrySpan|#/testEntrySpan|#AQA=#AQA=Et0We0tQNQA=");
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testEntrySpan", contextCarrier);
firstEntrySpan.setComponent(ComponentsDefine.TOMCAT);
......@@ -82,7 +79,7 @@ public class ContextManagerTest {
@Test
public void createMultipleEntrySpan() {
ContextCarrier contextCarrier = new ContextCarrier().deserialize("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2");
ContextCarrier contextCarrier = new ContextCarrier().deserialize("#AQA*#AQA*4WcWe0tQNQA*|1|1|#127.0.0.1:8080|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
assertTrue(contextCarrier.isValid());
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testFirstEntry", contextCarrier);
......@@ -114,9 +111,9 @@ public class ContextManagerTest {
assertThat(actualSegment.getRefs().size(), is(1));
TraceSegmentRef ref = actualSegment.getRefs().get(0);
assertThat(TraceSegmentRefHelper.getPeerHost(ref), is("192.168.1.8 :18002"));
assertThat(ref.getOperationName(), is("/portal/"));
assertThat(ref.getOperationId(), is(0));
assertThat(TraceSegmentRefHelper.getPeerHost(ref), is("127.0.0.1:8080"));
assertThat(ref.getEntryOperationName(), is("/portal/"));
assertThat(ref.getEntryOperationId(), is(0));
List<AbstractTracingSpan> spanList = SegmentHelper.getSpan(actualSegment);
assertThat(spanList.size(), is(2));
......@@ -205,7 +202,7 @@ public class ContextManagerTest {
@Test
public void testTransform() throws InvalidProtocolBufferException {
ContextCarrier contextCarrier = new ContextCarrier().deserialize("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2");
ContextCarrier contextCarrier = new ContextCarrier().deserialize("#AQA*#AQA*4WcWe0tQNQA*|3|1|#127.0.0.1:8080|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
assertTrue(contextCarrier.isValid());
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testFirstEntry", contextCarrier);
......@@ -238,7 +235,7 @@ public class ContextManagerTest {
TraceSegmentReference reference = traceSegmentObject.getRefs(0);
assertThat(reference.getEntryServiceName(), is("/portal/"));
assertThat(reference.getNetworkAddress(), is("192.168.1.8 :18002"));
assertThat(reference.getNetworkAddress(), is("127.0.0.1:8080"));
assertThat(reference.getParentSpanId(), is(3));
assertThat(traceSegmentObject.getApplicationId(), is(1));
......
......@@ -44,6 +44,12 @@ public class SkyWalkingAgent {
ServiceManager.INSTANCE.boot();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override public void run() {
ServiceManager.INSTANCE.shutdown();
}
}, "skywalking service shutdown thread"));
new AgentBuilder.Default().type(pluginFinder.buildMatch()).transform(new AgentBuilder.Transformer() {
@Override
public DynamicType.Builder<?> transform(DynamicType.Builder<?> builder, TypeDescription typeDescription,
......
......@@ -148,7 +148,7 @@ public class DubboInterceptorTest {
@Test
public void testProviderWithAttachment() throws Throwable {
when(rpcContext.isConsumerSide()).thenReturn(false);
when(rpcContext.getAttachment(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2");
when(rpcContext.getAttachment(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8 :18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
dubboInterceptor.beforeMethod(enhancedInstance, "invoke", allArguments, argumentTypes, methodInterceptResult);
dubboInterceptor.afterMethod(enhancedInstance, "invoke", allArguments, argumentTypes, result);
......@@ -160,7 +160,7 @@ public class DubboInterceptorTest {
when(rpcContext.isConsumerSide()).thenReturn(false);
FieldSetter.setStaticValue(BugFixActive.class, "ACTIVE", true);
testParam.setTraceContext("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2");
testParam.setTraceContext("#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8 :18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
dubboInterceptor.beforeMethod(enhancedInstance, "invoke", allArguments, argumentTypes, methodInterceptResult);
dubboInterceptor.afterMethod(enhancedInstance, "invoke", allArguments, argumentTypes, result);
......@@ -194,7 +194,7 @@ public class DubboInterceptorTest {
private void assertTraceSegmentRef(TraceSegmentRef actual) {
assertThat(SegmentRefHelper.getSpanId(actual), is(3));
assertThat(SegmentRefHelper.getTraceSegmentId(actual), is("S.1499176688384.581928182.80935.69.1"));
assertThat(SegmentRefHelper.getTraceSegmentId(actual).toString(), is("1.1.15006458883500001"));
}
private void assertProviderSpan(AbstractTracingSpan span) {
......
......@@ -20,7 +20,7 @@ import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
*/
public class AbstractHttpClientInstrumentation extends HttpClientInstrumentation {
private static final String ENHANCE_CLASS = "org.apache.http.impl.discovery.AbstractHttpClient";
private static final String ENHANCE_CLASS = "org.apache.http.impl.client.AbstractHttpClient";
@Override
public ClassMatch enhanceClass() {
......
......@@ -17,7 +17,7 @@ import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
*/
public class InternalHttpClientInstrumentation extends HttpClientInstrumentation {
private static final String ENHANCE_CLASS = "org.apache.http.impl.discovery.InternalHttpClient";
private static final String ENHANCE_CLASS = "org.apache.http.impl.client.InternalHttpClient";
@Override
public ClassMatch enhanceClass() {
......
......@@ -20,7 +20,7 @@ import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
*/
public class MinimalHttpClientInstrumentation extends HttpClientInstrumentation {
private static final String ENHANCE_CLASS = "org.apache.http.impl.discovery.MinimalHttpClient";
private static final String ENHANCE_CLASS = "org.apache.http.impl.client.MinimalHttpClient";
@Override
public ClassMatch enhanceClass() {
......
......@@ -91,7 +91,7 @@ public class MotanProviderInterceptorTest {
@Test
public void testInvokerWithRefSegment() throws Throwable {
HashMap attachments = new HashMap();
attachments.put(Config.Plugin.Propagation.HEADER_NAME, "S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8:18002|#/portal/|T.1499176688386.581928182.80935.69.2");
attachments.put(Config.Plugin.Propagation.HEADER_NAME, "#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8:18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
when(request.getAttachments()).thenReturn(attachments);
invokeInterceptor.beforeMethod(enhancedInstance, "execute", arguments, argumentType, null);
......@@ -135,7 +135,7 @@ public class MotanProviderInterceptorTest {
}
private void assertRefSegment(TraceSegmentRef primaryRef) {
assertThat(SegmentRefHelper.getTraceSegmentId(primaryRef), is("S.1499176688384.581928182.80935.69.1"));
assertThat(SegmentRefHelper.getTraceSegmentId(primaryRef).toString(), is("1.1.15006458883500001"));
assertThat(SegmentRefHelper.getSpanId(primaryRef), is(3));
assertThat(SegmentRefHelper.getPeerHost(primaryRef), is("192.168.1.8:18002"));
}
......
......@@ -90,7 +90,7 @@ public class ResinV3InterceptorTest {
@Test
public void testWithSerializedContextData() throws Throwable {
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8:18002|#/portal/|T.1499176688386.581928182.80935.69.2");
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8:18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
interceptor.beforeMethod(enhancedInstance, "service", arguments, argumentType, methodInterceptResult);
interceptor.afterMethod(enhancedInstance, "service", arguments, argumentType, null);
......@@ -121,7 +121,7 @@ public class ResinV3InterceptorTest {
private void assertTraceSegmentRef(TraceSegmentRef ref) {
assertThat(SegmentRefHelper.getSpanId(ref), is(3));
assertThat(SegmentRefHelper.getTraceSegmentId(ref), is("S.1499176688384.581928182.80935.69.1"));
assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.1.15006458883500001"));
}
private void assertHttpSpan(AbstractTracingSpan span) {
......
......@@ -92,7 +92,7 @@ public class ResinV4InterceptorTest {
@Test
public void testWithSerializedContextData() throws Throwable {
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8:18002|#/portal/|T.1499176688386.581928182.80935.69.2");
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8:18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
interceptor.beforeMethod(enhancedInstance, "service", arguments, argumentType, methodInterceptResult);
interceptor.afterMethod(enhancedInstance, "service", arguments, argumentType, null);
......@@ -123,7 +123,7 @@ public class ResinV4InterceptorTest {
private void assertTraceSegmentRef(TraceSegmentRef ref) {
assertThat(SegmentRefHelper.getSpanId(ref), is(3));
assertThat(SegmentRefHelper.getTraceSegmentId(ref), is("S.1499176688384.581928182.80935.69.1"));
assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.1.15006458883500001"));
}
private void assertHttpSpan(AbstractTracingSpan span) {
......
......@@ -82,7 +82,7 @@ public class TomcatInterceptorTest {
@Test
public void testWithSerializedContextData() throws Throwable {
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8:18002|#/portal/|T.1499176688386.581928182.80935.69.2");
when(request.getHeader(Config.Plugin.Propagation.HEADER_NAME)).thenReturn("#AQA*#AQA*4WcWe0tQNQA*|3|1|#192.168.1.8:18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*");
tomcatInterceptor.beforeMethod(enhancedInstance, "invoke", arguments, argumentType, methodInterceptResult);
tomcatInterceptor.afterMethod(enhancedInstance, "invoke", arguments, argumentType, null);
......@@ -113,7 +113,7 @@ public class TomcatInterceptorTest {
private void assertTraceSegmentRef(TraceSegmentRef ref) {
assertThat(SegmentRefHelper.getSpanId(ref), is(3));
assertThat(SegmentRefHelper.getTraceSegmentId(ref), is("S.1499176688384.581928182.80935.69.1"));
assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.1.15006458883500001"));
}
private void assertHttpSpan(AbstractTracingSpan span) {
......
package org.skywalking.apm.agent.test.helper;
import org.skywalking.apm.agent.core.context.ids.ID;
import org.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
public class SegmentRefHelper {
......@@ -12,7 +13,7 @@ public class SegmentRefHelper {
return null;
}
public static String getTraceSegmentId(TraceSegmentRef ref) {
public static ID getTraceSegmentId(TraceSegmentRef ref) {
try {
return FieldGetter.getValue(ref, "traceSegmentId");
} catch (Exception e) {
......
......@@ -8,7 +8,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class SegmentRefAssert {
public static void assertSegmentId(TraceSegmentRef ref, String segmentId) {
assertThat(SegmentRefHelper.getTraceSegmentId(ref), is(segmentId));
assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is(segmentId));
}
public static void assertSpanId(TraceSegmentRef ref, int spanId) {
......
......@@ -48,6 +48,7 @@ public class SkywalkingSpanActivation extends ClassInstanceMethodsEnhancePluginD
private static final String FINISH_METHOD_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanFinishInterceptor";
private static final String LOG_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanLogInterceptor";
private static final String SET_OPERATION_NAME_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanSetOperationNameInterceptor";
private static final String SET_TAG_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanSetTagInterceptor";
@Override
protected ClassMatch enhanceClass() {
......@@ -132,6 +133,19 @@ public class SkywalkingSpanActivation extends ClassInstanceMethodsEnhancePluginD
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("setTag").and(takesArgument(0, String.class)).and(takesArgument(1, String.class));
}
@Override public String getMethodsInterceptor() {
return SET_TAG_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
......
package org.skywalking.apm.toolkit.activation.opentracing.span;
import io.opentracing.tag.Tags;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class SpanSetTagInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
String tagKey = String.valueOf(allArguments[0]);
String tagValue = String.valueOf(allArguments[1]);
if (Tags.COMPONENT.getKey().equals(tagKey)) {
activeSpan.setComponent(tagValue);
} else if (Tags.PEER_SERVICE.getKey().equals(tagKey)) {
activeSpan.setOperationName(tagValue);
} else {
activeSpan.tag(tagKey, tagValue);
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
......@@ -181,15 +181,15 @@ public class SkywalkingSpanActivationTest {
.withTag(Tags.PEER_HOST_IPV4.getKey(), "127.0.0.1").withTag(Tags.PEER_PORT.getKey(), 8080);
startSpan();
extractInterceptor.afterMethod(enhancedInstance, "extract",
new Object[] {"S.1499746282749.1100157028.88023.1.1|0|1|#127.0.0.1:8080|#testOperationName|T.1499746282768.1100157028.88023.1.2"}, new Class[] {String.class}, null);
new Object[] {"#AQA*#AQA*4WcWe0tQNQA*|3|1|#127.0.0.1:8080|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*"}, new Class[] {String.class}, null);
stopSpan();
TraceSegment tracingSegment = assertTraceSemgnets();
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(tracingSegment);
assertThat(tracingSegment.getRefs().size(), is(1));
TraceSegmentRef ref = tracingSegment.getRefs().get(0);
assertSegmentId(ref, "S.1499746282749.1100157028.88023.1.1");
assertSpanId(ref, 0);
assertSegmentId(ref, "1.1.15006458883500001");
assertSpanId(ref, 3);
assertPeerHost(ref, "127.0.0.1:8080");
assertThat(spans.size(), is(1));
assertSpanCommonsAttribute(spans.get(0));
......@@ -201,7 +201,7 @@ public class SkywalkingSpanActivationTest {
.withTag(Tags.PEER_HOST_IPV4.getKey(), "127.0.0.1").withTag(Tags.PEER_PORT.getKey(), 8080);
startSpan();
extractInterceptor.afterMethod(enhancedInstance, "extract",
new Object[] {"S.1499746282749.1100157028.88023.1.1|0|1|#127.0.0.1:8080|#testOperationName"}, new Class[] {String.class}, null);
new Object[] {"#AQA*#AQA*4WcWe0tQNQA*|3|#192.168.1.8:18002|#/portal/|#/testEntrySpan|#AQA*#AQA*Et0We0tQNQA*"}, new Class[] {String.class}, null);
stopSpan();
TraceSegment tracingSegment = assertTraceSemgnets();
......
......@@ -172,7 +172,9 @@
</formats>
<instrumentation>
<excludes>
<exclude>org/skywalking/apm/network/trace/proto/*.class</exclude>
<exclude>org/skywalking/apm/network/**/*.class</exclude>
<exclude>org/skywalking/apm/collector/remote/grpc/**/*.class</exclude>
<exclude>org/skywalking/apm/agent/core/context/ids/base64/*.class</exclude>
</excludes>
</instrumentation>
</configuration>
......@@ -198,7 +200,9 @@
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<excludes>org/skywalking/apm/network/**/*.java,
org/skywalking/apm/collector/remote/grpc/**/*.java</excludes>
org/skywalking/apm/collector/remote/grpc/**/*.java,
org/skywalking/apm/agent/core/context/ids/base64/*.java
</excludes>
</configuration>
<goals>
<goal>check</goal>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册