提交 c0650800 编写于 作者: P pengys5

Merge remote-tracking branch 'origin/feature/3.0' into feature/collector

......@@ -6,6 +6,7 @@ Sky Walking
SkyWalking: Large-Scale Distributed Systems Tracing Infrastructure, also known Distributed Tracer.
[![Build Status](https://travis-ci.org/wu-sheng/sky-walking.svg?branch=master)](https://travis-ci.org/wu-sheng/sky-walking)
[![Coverage Status](https://coveralls.io/repos/github/wu-sheng/sky-walking/badge.svg?branch=master)](https://coveralls.io/github/wu-sheng/sky-walking?branch=master)
![license](https://img.shields.io/aur/license/yaourt.svg)
[![codebeat badge](https://codebeat.co/badges/579e4dce-1dc7-4f32-a163-c164eafa1335)](https://codebeat.co/projects/github-com-wu-sheng-sky-walking)
[![Join the chat at https://gitter.im/sky-walking/Lobby](https://badges.gitter.im/sky-walking/Lobby.svg)](https://gitter.im/sky-walking/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
......
......@@ -12,14 +12,14 @@ import java.util.Map;
*/
public class LogData {
@Expose
@SerializedName(value="ti")
@SerializedName(value="tm")
private long time;
@Expose
@SerializedName(value="fi")
private Map<String, ?> fields;
private Map<String, String> fields;
LogData(long time, Map<String, ?> fields) {
LogData(long time, Map<String, String> fields) {
this.time = time;
if(fields == null){
throw new NullPointerException();
......
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.TraceId.PropagatedTraceId;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
......
......@@ -57,8 +57,16 @@ public class Span{
* {@see https://github.com/opentracing/specification/blob/master/specification.md#set-a-span-tag}
*/
@Expose
@SerializedName(value="ta")
private final Map<String, Object> tags;
@SerializedName(value="ts")
private final Map<String, String> tagsWithStr;
@Expose
@SerializedName(value="tb")
private final Map<String, Boolean> tagsWithBool;
@Expose
@SerializedName(value="ti")
private final Map<String, Integer> tagsWithInt;
/**
* Log is a concept from OpenTracing spec.
......@@ -151,7 +159,9 @@ public class Span{
* Create a new/empty span.
*/
public Span() {
tags = new HashMap<String, Object>();
tagsWithStr = new HashMap<String, String>(5);
tagsWithBool = new HashMap<String, Boolean>(1);
tagsWithInt = new HashMap<String, Integer>(2);
logs = new LinkedList<LogData>();
}
......@@ -194,17 +204,17 @@ public class Span{
* @return this Span instance, for chaining
*/
public final Span setTag(String key, String value) {
tags.put(key, value);
tagsWithStr.put(key, value);
return this;
}
public final Span setTag(String key, boolean value) {
tags.put(key, value);
tagsWithBool.put(key, value);
return this;
}
public final Span setTag(String key, Number value) {
tags.put(key, value);
public final Span setTag(String key, Integer value) {
tagsWithInt.put(key, value);
return this;
}
......@@ -214,7 +224,11 @@ public class Span{
* @return
*/
public final Map<String, Object> getTags() {
return Collections.unmodifiableMap(tags);
Map<String, Object> tags = new HashMap<String, Object>();
tags.putAll(tagsWithStr);
tags.putAll(tagsWithBool);
tags.putAll(tagsWithInt);
return tags;
}
/**
......@@ -223,8 +237,16 @@ public class Span{
* @param key the given tag key.
* @return tag value.
*/
public Object getTag(String key) {
return tags.get(key);
public String getStrTag(String key) {
return tagsWithStr.get(key);
}
public Boolean getBoolTag(String key) {
return tagsWithBool.get(key);
}
public Integer getIntTag(String key) {
return tagsWithInt.get(key);
}
/**
......@@ -250,7 +272,7 @@ public class Span{
* @return the Span, for chaining
* @see Span#log(String)
*/
public Span log(Map<String, ?> fields) {
public Span log(Map<String, String> fields) {
logs.add(new LogData(System.currentTimeMillis(), fields));
return this;
}
......
......@@ -26,21 +26,21 @@ public class TraceSegment {
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName(value="ts")
@SerializedName(value = "ts")
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName(value="st")
@SerializedName(value = "st")
private long startTime;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName(value="et")
@SerializedName(value = "et")
private long endTime;
/**
......@@ -50,7 +50,7 @@ public class TraceSegment {
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName(value="rs")
@SerializedName(value = "rs")
private List<TraceSegmentRef> refs;
/**
......@@ -59,7 +59,7 @@ public class TraceSegment {
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName(value="ss")
@SerializedName(value = "ss")
private List<Span> spans;
/**
......@@ -69,7 +69,7 @@ public class TraceSegment {
* e.g. account_app, billing_app
*/
@Expose
@SerializedName(value="ac")
@SerializedName(value = "ac")
private String applicationCode;
/**
......@@ -86,9 +86,19 @@ public class TraceSegment {
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName(value="gt")
@SerializedName(value = "gt")
private DistributedTraceIds relatedGlobalTraces;
/**
* The <code>sampled</code> is a flag, which represent, when this {@link TraceSegment} finished, it need to be send
* to Collector.
*
* Its value depends on SamplingService. True, by default.
*
* This value is not serialized.
*/
private boolean sampled;
/**
* Create a trace segment, by given segmentId.
* This segmentId is generated by TraceSegmentRef, AKA, from tracer/agent module.
......@@ -107,6 +117,7 @@ public class TraceSegment {
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new DistributedTraceIds();
this.relatedGlobalTraces.append(new NewDistributedTraceId());
this.sampled = true;
}
/**
......@@ -118,7 +129,7 @@ public class TraceSegment {
if (refs == null) {
refs = new LinkedList<TraceSegmentRef>();
}
if(!refs.contains(refSegment)){
if (!refs.contains(refSegment)) {
refs.add(refSegment);
}
}
......@@ -183,6 +194,14 @@ public class TraceSegment {
return applicationCode;
}
public boolean isSampled() {
return sampled;
}
public void setSampled(boolean sampled) {
this.sampled = sampled;
}
@Override
public String toString() {
return "TraceSegment{" +
......
......@@ -86,25 +86,18 @@ public class TraceSegmentRef{
'}';
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TraceSegmentRef ref = (TraceSegmentRef)o;
TraceSegmentRef that = (TraceSegmentRef) o;
if (traceSegmentId != null ? !traceSegmentId.equals(ref.traceSegmentId) : ref.traceSegmentId != null)
return false;
return applicationCode != null ? applicationCode.equals(ref.applicationCode) : ref.applicationCode == null;
return traceSegmentId != null ? traceSegmentId.equals(that.traceSegmentId) : that.traceSegmentId == null;
}
@Override
public int hashCode() {
int result = traceSegmentId != null ? traceSegmentId.hashCode() : 0;
result = 31 * result + (applicationCode != null ? applicationCode.hashCode() : 0);
return result;
return traceSegmentId != null ? traceSegmentId.hashCode() : 0;
}
}
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.trace.Span;
* All span's tags inherit from {@link AbstractTag},
* which provide an easy way to
* {@link Span#setTag(String, String)} ,
* {@link Span#setTag(String, Number)} ,
* {@link Span#setTag(String, Integer)}
* {@link Span#setTag(String, boolean)} ,
*
* Created by wusheng on 2017/2/17.
......
......@@ -30,13 +30,11 @@ public class BooleanTag extends AbstractTag<Boolean> {
*/
@Override
public Boolean get(Span span) {
Object tagValue = span.getTag(super.key);
Boolean tagValue = span.getBoolTag(super.key);
if (tagValue == null) {
return defaultValue;
} else if (tagValue instanceof Boolean) {
return (Boolean) tagValue;
} else {
return Boolean.valueOf(tagValue.toString());
return tagValue;
}
}
}
......@@ -26,13 +26,11 @@ public class IntTag extends AbstractTag<Integer> {
*/
@Override
public Integer get(Span span) {
Object tagValue = span.getTag(super.key);
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else if(tagValue instanceof Integer){
return (Integer)tagValue;
}else {
return Integer.valueOf(tagValue.toString());
} else {
return tagValue;
}
}
}
......@@ -14,7 +14,7 @@ public class ShortTag extends AbstractTag<Short> {
@Override
public void set(Span span, Short tagValue) {
span.setTag(super.key, tagValue);
span.setTag(super.key, (int)tagValue.shortValue());
}
/**
......@@ -25,12 +25,10 @@ public class ShortTag extends AbstractTag<Short> {
* @return tag value
*/
@Override public Short get(Span span) {
Object tagValue = span.getTag(super.key);
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else if(tagValue instanceof Short){
return (Short)tagValue;
}else {
} else {
return Short.valueOf(tagValue.toString());
}
}
......
......@@ -19,6 +19,6 @@ public class StringTag extends AbstractTag<String> {
}
@Override public String get(Span span) {
return (String)span.getTag(super.key);
return span.getStrTag(super.key);
}
}
......@@ -11,7 +11,7 @@ import org.junit.Test;
public class LogDataTestCase {
@Test
public void testHoldValue(){
Map<String, ?> fields = new HashMap<String, String>();
Map<String, String> fields = new HashMap<String, String>();
LogData logData = new LogData(123L, fields);
Assert.assertEquals(123, logData.getTime());
......
......@@ -12,13 +12,13 @@ public class TagsTest {
public void testLayer(){
Span span = new Span(1, "/test");
Tags.SPAN_LAYER.asDB(span);
Assert.assertEquals("db", span.getTag("span.layer"));
Assert.assertEquals("db", span.getStrTag("span.layer"));
Tags.SPAN_LAYER.asRPCFramework(span);
Assert.assertEquals("rpc", span.getTag("span.layer"));
Assert.assertEquals("rpc", span.getStrTag("span.layer"));
Tags.SPAN_LAYER.asHttp(span);
Assert.assertEquals("http", span.getTag("span.layer"));
Assert.assertEquals("http", span.getStrTag("span.layer"));
}
@Test
......
......@@ -36,23 +36,20 @@
</manifestEntries>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>com.lmax:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>commons-codec:*</exclude>
<exclude>*:gson</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>${shade.net.bytebuddy.source}</pattern>
<shadedPattern>${shade.net.bytebuddy.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.lmax.disruptor.source}</pattern>
<shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
......@@ -61,6 +58,12 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......@@ -120,17 +123,11 @@
</repository>
</distributionManagement>
<properties>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}</shade.com.lmax.disruptor.target>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
<premain.class>com.a.eye.skywalking.agent.SkyWalkingAgent</premain.class>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
</properties>
</project>
......@@ -21,13 +21,6 @@
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}
</shade.com.lmax.disruptor.target>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
</properties>
<dependencies>
......@@ -123,23 +116,20 @@
</manifestEntries>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>com.lmax:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>commons-codec:*</exclude>
<exclude>*:gson</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>${shade.net.bytebuddy.source}</pattern>
<shadedPattern>${shade.net.bytebuddy.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.lmax.disruptor.source}</pattern>
<shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
......
......@@ -99,8 +99,8 @@ public class SkyWalkingAgent {
private static void initConfig() {
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.SkyWalking.AGENT_BASE_PATH = initAgentBasePath();
Config.Agent.IS_PREMAIN_MODE = true;
Config.Agent.PATH = initAgentBasePath();
SnifferConfigInitializer.initialize();
}
......
......@@ -17,6 +17,15 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jetty.version>9.4.2.v20170220</jetty.version>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}
</shade.com.lmax.disruptor.target>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
</properties>
<dependencies>
......@@ -74,6 +83,40 @@
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>net.bytebuddy:byte-buddy:jar:</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>${shade.com.lmax.disruptor.source}</pattern>
<shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.a.eye.skywalking.api.client;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.SegmentsMessage;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClients;
......@@ -21,11 +31,18 @@ public class CollectorClient implements Runnable {
private static ILog logger = LogManager.getLogger(CollectorClient.class);
private static long SLEEP_TIME_MILLIS = 500;
private CloseableHttpClient httpclient;
private String[] serverList;
private volatile int selectedServer = -1;
public CollectorClient() {
serverList = Config.Collector.SERVERS.split(",");
httpclient = HttpClients.custom()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.build();
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
}
@Override
......@@ -36,13 +53,19 @@ public class CollectorClient implements Runnable {
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
SegmentsMessage message = null;
int count = 0;
for (TraceSegment segment : cachedTraceSegments) {
/**
* No receiver found, means collector server is off-line.
*/
sleepTime = SLEEP_TIME_MILLIS * 10;
break;
if (message == null) {
message = new SegmentsMessage();
}
message.append(segment);
if (count == Config.Collector.BATCH_SIZE) {
sendToCollector(message);
message = null;
}
}
sendToCollector(message);
} else {
sleepTime = SLEEP_TIME_MILLIS;
}
......@@ -56,6 +79,64 @@ public class CollectorClient implements Runnable {
}
}
/**
* Send the given {@link SegmentsMessage} to collector.
*
* @param message to be send.
*/
private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusError, IOException {
if (message == null) {
return;
}
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String messageJson = gson.toJson(message);
try {
HttpPost httpPost = ready2Send(messageJson);
if (httpPost != null) {
CloseableHttpResponse httpResponse = httpclient.execute(httpPost);
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (200 != statusCode) {
findBackupServer();
throw new RESTResponseStatusError(statusCode);
}
}
} catch (IOException e) {
findBackupServer();
throw e;
}
}
/**
* Prepare the given message for HTTP Post service.
*
* @param messageJson to send
* @return {@link HttpPost}, when is ready to send. otherwise, null.
*/
private HttpPost ready2Send(String messageJson) {
if (selectedServer == -1) {
//no available server
return null;
}
HttpPost post = new HttpPost("http://" + serverList[selectedServer] + Config.Collector.SERVICE_NAME);
StringEntity entity = new StringEntity(messageJson, ContentType.APPLICATION_JSON);
post.setEntity(entity);
return post;
}
/**
* Choose the next server in {@link #serverList}, by moving {@link #selectedServer}.
*/
private void findBackupServer() {
selectedServer++;
if (selectedServer == serverList.length) {
selectedServer = 0;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
......
package com.a.eye.skywalking.api.client;
/**
* The <code>RESTResponseStatusError</code> represents the REST-Service client got an unexpected response code.
* Most likely, the response code is not 200.
*
* @author wusheng
*/
class RESTResponseStatusError extends Exception {
RESTResponseStatusError(int responseCode){
super("Unexpected service response code: " + responseCode);
}
}
......@@ -2,20 +2,26 @@ package com.a.eye.skywalking.api.conf;
public class Config {
public static class SkyWalking {
public static class Agent {
public static String APPLICATION_CODE = "";
public static boolean IS_PREMAIN_MODE = false;
public static String AGENT_BASE_PATH = "";
public static String PATH = "";
public static int SAMPLING_RATE = 10000;
}
public static class Collector{
public static String SERVERS = "";
public static String SERVICE_NAME = "/segments";
public static int BATCH_SIZE = 50;
}
public static class Disruptor{
public static int BUFFER_SIZE = 512;
public static class Buffer {
public static int SIZE = 512;
}
......
......@@ -15,7 +15,7 @@ public class SnifferConfigInitializer {
public static void initialize() {
InputStream configFileStream;
if (Config.SkyWalking.IS_PREMAIN_MODE) {
if (Config.Agent.IS_PREMAIN_MODE) {
configFileStream = fetchAuthFileInputStream();
} else {
configFileStream = SnifferConfigInitializer.class.getResourceAsStream("/sky-walking.config");
......@@ -35,24 +35,24 @@ public class SnifferConfigInitializer {
String applicationCode = System.getProperty("applicationCode");
if (!StringUtil.isEmpty(applicationCode)) {
Config.SkyWalking.APPLICATION_CODE = applicationCode;
Config.Agent.APPLICATION_CODE = applicationCode;
}
String servers = System.getProperty("servers");
if(!StringUtil.isEmpty(servers)) {
Config.SkyWalking.SERVERS = servers;
Config.Collector.SERVERS = servers;
}
if (StringUtil.isEmpty(Config.SkyWalking.APPLICATION_CODE)) {
if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) {
throw new ExceptionInInitializerError("'-DapplicationCode=' is missing.");
}
if (StringUtil.isEmpty(Config.SkyWalking.SERVERS)) {
if (StringUtil.isEmpty(Config.Collector.SERVERS)) {
throw new ExceptionInInitializerError("'-Dservers=' is missing.");
}
}
private static InputStream fetchAuthFileInputStream() {
try {
return new FileInputStream(Config.SkyWalking.AGENT_BASE_PATH + File.separator + "sky-walking.config");
return new FileInputStream(Config.Agent.PATH + File.separator + "sky-walking.config");
} catch (Exception e) {
logger.warn("sky-walking.config is missing, use default config.");
return null;
......
......@@ -42,6 +42,11 @@ public class ContextCarrier implements Serializable {
*/
private List<DistributedTraceId> distributedTraceIds;
/**
* {@link TraceSegment#sampled}
*/
private boolean sampled;
/**
* Serialize this {@link ContextCarrier} to a {@link String},
* with '|' split.
......@@ -54,7 +59,8 @@ public class ContextCarrier implements Serializable {
this.getSpanId() + "",
this.getApplicationCode(),
this.getPeerHost(),
this.serializeDistributedTraceIds());
this.serializeDistributedTraceIds(),
this.isSampled() ? "1" : "0");
}
/**
......@@ -64,14 +70,15 @@ public class ContextCarrier implements Serializable {
*/
public ContextCarrier deserialize(String text) {
if (text != null) {
String[] parts = text.split("\\|", 5);
if (parts.length == 5) {
String[] parts = text.split("\\|", 6);
if (parts.length == 6) {
try {
setSpanId(Integer.parseInt(parts[1]));
setTraceSegmentId(parts[0]);
setApplicationCode(parts[2]);
setPeerHost(parts[3]);
setDistributedTraceIds(deserializeDistributedTraceIds(parts[4]));
setSampled("1".equals(parts[5]));
} catch (NumberFormatException e) {
}
......@@ -129,6 +136,14 @@ public class ContextCarrier implements Serializable {
return distributedTraceIds;
}
public boolean isSampled() {
return sampled;
}
public void setSampled(boolean sampled) {
this.sampled = sampled;
}
public void setDistributedTraceIds(List<DistributedTraceId> distributedTraceIds) {
this.distributedTraceIds = distributedTraceIds;
}
......
package com.a.eye.skywalking.api.context;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.sampling.SamplingService;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -34,7 +36,8 @@ public final class TracerContext {
* Create a {@link TraceSegment} and init {@link #spanIdGenerator} as 0;
*/
TracerContext() {
this.segment = new TraceSegment(Config.SkyWalking.APPLICATION_CODE);
this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE);
ServiceManager.INSTANCE.findService(SamplingService.class).trySampling(this.segment);
this.spanIdGenerator = 0;
}
......@@ -123,9 +126,10 @@ public final class TracerContext {
public void inject(ContextCarrier carrier) {
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(this.activeSpan().getSpanId());
carrier.setApplicationCode(Config.SkyWalking.APPLICATION_CODE);
carrier.setApplicationCode(Config.Agent.APPLICATION_CODE);
carrier.setPeerHost(Tags.PEER_HOST.get(activeSpan()));
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
carrier.setSampled(this.segment.isSampled());
}
/**
......@@ -137,6 +141,7 @@ public final class TracerContext {
public void extract(ContextCarrier carrier) {
if(carrier.isValid()) {
this.segment.ref(getRef(carrier));
ServiceManager.INSTANCE.findService(SamplingService.class).setSampleWhenExtract(this.segment, carrier);
this.segment.relatedGlobalTraces(carrier.getDistributedTraceIds());
}
}
......
......@@ -19,7 +19,7 @@ public class SyncFileWriter implements IWriter {
private SyncFileWriter() {
try {
File logFilePath = new File(Config.SkyWalking.AGENT_BASE_PATH, Config.Logging.LOG_DIR_NAME);
File logFilePath = new File(Config.Agent.PATH, Config.Logging.LOG_DIR_NAME);
if (!logFilePath.exists()) {
logFilePath.mkdirs();
}
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.api.conf.Config;
public class WriterFactory {
public static IWriter getLogWriter(){
if (Config.SkyWalking.IS_PREMAIN_MODE){
if (Config.Agent.IS_PREMAIN_MODE){
return SyncFileWriter.instance();
}else{
return new STDOutWriter();
......
......@@ -30,8 +30,8 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
private volatile int cacheIndex;
public TraceSegmentProcessQueue() {
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE];
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Buffer.SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Buffer.SIZE];
cacheIndex = 0;
disruptor.handleEventsWith(this);
buffer = disruptor.getRingBuffer();
......@@ -43,9 +43,14 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
disruptor.start();
}
/**
* Append the given traceSegment to the queue, wait for sending to Collector.
*
* @param traceSegment finished {@link TraceSegment}
*/
@Override
public void afterFinished(TraceSegment traceSegment) {
if (isStarted()) {
if (isStarted() && traceSegment.isSampled()) {
long sequence = this.buffer.next(); // Grab the next sequence
try {
TraceSegmentHolder data = this.buffer.get(sequence);
......
package com.a.eye.skywalking.api.sampling;
/**
* Use <code>IllegalSamplingRateException</code>, only if the rate can not be supported.
*
* @author wusheng
*/
public class IllegalSamplingRateException extends Exception {
IllegalSamplingRateException(String message) {
super(message);
}
}
package com.a.eye.skywalking.api.sampling;
import com.a.eye.skywalking.api.boot.BootService;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.context.ContextCarrier;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s
* have been traced, but, considering CPU cost of serialization/deserialization, and network bandwidth, the agent do NOT
* send all of them to collector, if SAMPLING is on.
*
* By default, SAMPLING is off, and {@link Config.Agent#SAMPLING_RATE} == 1000.
*
* @author wusheng
*/
public class SamplingService implements BootService {
private static ILog logger = LogManager.getLogger(SamplingService.class);
private volatile boolean on = false;
private volatile int rate = 0;
private volatile int rollingSeed = 0;
@Override
public void bootUp() throws Throwable {
if (Config.Agent.SAMPLING_RATE == 10000) {
return;
}
if (Config.Agent.SAMPLING_RATE > 10000 || Config.Agent.SAMPLING_RATE < 1) {
throw new IllegalSamplingRateException("sampling rate should stay in (0, 10000].");
}
rate = 10000 / Config.Agent.SAMPLING_RATE;
on = true;
logger.debug("The trace sampling is on, and the sampling rate is: {}", rate);
}
public void trySampling(TraceSegment segment) {
if (on) {
if (rollingSeed++ != rate) {
segment.setSampled(false);
}
}
}
/**
* Set the {@link TraceSegment} to sampled, when {@link ContextCarrier} contains "isSampled" flag.
*
* A -> B, if TraceSegment is sampled in A, then the related TraceSegment in B must be sampled, no matter you
* sampling rate. And reset the {@link #rollingSeed}, in case of too many {@link TraceSegment}s, which started in
* this JVM, are sampled.
*
* @param segment the current TraceSegment.
* @param carrier
*/
public void setSampleWhenExtract(TraceSegment segment, ContextCarrier carrier) {
if(on) {
if (!segment.isSampled() && carrier.isSampled()) {
segment.setSampled(true);
this.rollingSeed = 0;
}
}
}
}
com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue
com.a.eye.skywalking.api.context.ContextManager
com.a.eye.skywalking.api.client.CollectorClientService
com.a.eye.skywalking.api.sampling.SamplingService
......@@ -75,7 +75,7 @@ public class HTTPRestServiceTestApp {
baseRequest.setHandled(true);
}
});
//server.start();
server.start();
return server;
}
......
......@@ -11,13 +11,13 @@ public class SnifferConfigInitializerTest {
@Test
public void testInitialize(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
SnifferConfigInitializer.initialize();
Assert.assertEquals("crmApp", Config.SkyWalking.APPLICATION_CODE);
Assert.assertEquals("127.0.0.1:8080", Config.SkyWalking.SERVERS);
Assert.assertEquals("crmApp", Config.Agent.APPLICATION_CODE);
Assert.assertEquals("127.0.0.1:8080", Config.Collector.SERVERS);
Assert.assertNotNull(Config.Disruptor.BUFFER_SIZE);
Assert.assertNotNull(Config.Buffer.SIZE);
Assert.assertNotNull(Config.Logging.LOG_DIR_NAME);
Assert.assertNotNull(Config.Logging.LOG_FILE_NAME);
Assert.assertNotNull(Config.Logging.MAX_LOG_FILE_LENGTH);
......@@ -26,12 +26,12 @@ public class SnifferConfigInitializerTest {
@Test(expected = ExceptionInInitializerError.class)
public void testErrorInitialize(){
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.Agent.IS_PREMAIN_MODE = true;
SnifferConfigInitializer.initialize();
}
@AfterClass
public static void reset(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
}
}
......@@ -18,17 +18,18 @@ public class ContextCarrierTestCase {
carrier.setSpanId(100);
carrier.setApplicationCode("REMOTE_APP");
carrier.setPeerHost("10.2.3.16:8080");
carrier.setSampled(true);
List<DistributedTraceId> ids = new LinkedList<DistributedTraceId>();
ids.add(new PropagatedTraceId("Trace.global.id.123"));
carrier.setDistributedTraceIds(ids);
Assert.assertEquals("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123", carrier.serialize());
Assert.assertEquals("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123|1", carrier.serialize());
}
@Test
public void testDeserialize(){
ContextCarrier carrier = new ContextCarrier();
carrier.deserialize("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222");
carrier.deserialize("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222|1");
Assert.assertEquals("trace_id_A", carrier.getTraceSegmentId());
Assert.assertEquals(100, carrier.getSpanId());
......@@ -36,6 +37,7 @@ public class ContextCarrierTestCase {
Assert.assertEquals("10.2.3.16:8080", carrier.getPeerHost());
Assert.assertEquals("Trace.global.id.123", carrier.getDistributedTraceIds().get(0).get());
Assert.assertEquals("Trace.global.id.222", carrier.getDistributedTraceIds().get(1).get());
Assert.assertEquals(true, carrier.isSampled());
}
@Test
......@@ -62,6 +64,10 @@ public class ContextCarrierTestCase {
carrier = new ContextCarrier();
carrier.deserialize("trace_id|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222");
Assert.assertFalse(carrier.isValid());
carrier = new ContextCarrier();
carrier.deserialize("trace_id|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222|0");
Assert.assertTrue(carrier.isValid());
}
}
package com.a.eye.skywalking.api.context;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Created by wusheng on 2017/2/19.
*/
public class ContextManagerTestCase {
@BeforeClass
public static void setup(){
ServiceManager.INSTANCE.boot();
}
@Test
public void testDelegateToTracerContext(){
Span span = ContextManager.createSpan("serviceA");
......
......@@ -26,7 +26,7 @@ public class EasyLoggerTest {
@Test
public void testLog(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
PrintStream output = Mockito.mock(PrintStream.class);
System.setOut(output);
......
......@@ -25,18 +25,18 @@ public class WriterFactoryTest {
*/
@Test
public void testGetLogWriter(){
Config.SkyWalking.IS_PREMAIN_MODE = true;
Config.Agent.IS_PREMAIN_MODE = true;
PrintStream mockStream = Mockito.mock(PrintStream.class);
System.setErr(mockStream);
Assert.assertEquals(SyncFileWriter.instance(), WriterFactory.getLogWriter());
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
Assert.assertTrue(WriterFactory.getLogWriter() instanceof STDOutWriter);
}
@AfterClass
public static void reset(){
Config.SkyWalking.IS_PREMAIN_MODE = false;
Config.Agent.IS_PREMAIN_MODE = false;
System.setErr(errRef);
}
}
skywalking.application_code = crmApp
skywalking.servers = 127.0.0.1:8080
agent.application_code = crmApp
collector.servers = 127.0.0.1:8080
......@@ -18,11 +18,6 @@
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>3.0-2017</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
......
......@@ -80,7 +80,7 @@ public class DubboInterceptorTest {
Mockito.when(RpcContext.getContext()).thenReturn(rpcContext);
when(rpcContext.isConsumerSide()).thenReturn(true);
when(methodInvokeContext.allArguments()).thenReturn(new Object[]{invoker, invocation});
Config.SkyWalking.APPLICATION_CODE = "DubboTestCases-APP";
Config.Agent.APPLICATION_CODE = "DubboTestCases-APP";
}
......@@ -155,7 +155,7 @@ public class DubboInterceptorTest {
@Test
public void testProviderWithAttachment() {
when(rpcContext.isConsumerSide()).thenReturn(false);
when(rpcContext.getAttachment(DubboInterceptor.ATTACHMENT_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
when(rpcContext.getAttachment(DubboInterceptor.ATTACHMENT_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
dubboInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
dubboInterceptor.afterMethod(classInstanceContext, methodInvokeContext, result);
......@@ -168,7 +168,7 @@ public class DubboInterceptorTest {
when(rpcContext.isConsumerSide()).thenReturn(false);
when(BugFixActive.isActive()).thenReturn(true);
testParam.setTraceContext("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
testParam.setTraceContext("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
dubboInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
......
......@@ -20,13 +20,6 @@
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
......
......@@ -18,11 +18,6 @@
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>3.0-2017</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......
......@@ -94,7 +94,7 @@ public class MotanProviderInterceptorTest {
@Test
public void testInvokerWithRefSegment() {
HashMap attachments = new HashMap();
attachments.put("SWTraceContext", "302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
attachments.put("SWTraceContext", "302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
when(request.getAttachments()).thenReturn(attachments);
invokeInterceptor.beforeMethod(instanceContext, interceptorContext, null);
......
......@@ -81,7 +81,7 @@ public class TomcatInterceptorTest {
@Test
public void testWithSerializedContextData() {
when(request.getHeader(TomcatInterceptor.HEADER_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
when(request.getHeader(TomcatInterceptor.HEADER_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
tomcatInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
tomcatInterceptor.afterMethod(classInstanceContext, methodInvokeContext, null);
......
......@@ -12,11 +12,6 @@
<artifactId>skywalking-toolkit-opentracing-activation</artifactId>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-toolkit-opentracing</artifactId>
......
......@@ -24,8 +24,10 @@ public class SpanSetTagInterceptor implements InstanceMethodsAroundInterceptor {
ContextManager.activeSpan().setTag(key, (String)value);
else if (value instanceof Boolean)
ContextManager.activeSpan().setTag(key, (Boolean)value);
else if (value instanceof Number)
ContextManager.activeSpan().setTag(key, (Number)value);
else if (value instanceof Integer)
ContextManager.activeSpan().setTag(key, (Integer)value);
else
ContextManager.activeSpan().setTag(key, value.toString());
}
@Override
......
......@@ -12,11 +12,4 @@
<packaging>jar</packaging>
<name>skywalking-toolkit-trace-context-activation</name>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>3.0-2017</version>
</dependency>
</dependencies>
</project>
......@@ -24,9 +24,7 @@ public class TraceContextInterceptor implements StaticMethodsAroundInterceptor {
@Override
public Object afterMethod(StaticMethodInvokeContext interceptorContext, Object ret) {
ContextCarrier carrier = new ContextCarrier();
ContextManager.inject(carrier);
return carrier.getTraceSegmentId();
return ContextManager.getTraceSegmentId();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册