提交 dcafdaa3 编写于 作者: wu-sheng's avatar wu-sheng

Fix compile issue. Add new rest-service Object: SegmentsMessage.java. It uses...

Fix compile issue. Add new rest-service Object: SegmentsMessage.java. It uses Gson(JsonAdapter) as serialize/deserialize tool.
上级 f48f82fc
...@@ -91,12 +91,14 @@ ...@@ -91,12 +91,14 @@
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>${compiler.version}</source> <source>${compiler.version}</source>
<target>${compiler.version}</target> <target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding> <encoding>${project.build.sourceEncoding}</encoding>
</configuration> </configuration>
<version>3.6.1</version>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
......
package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.TraceId.PropagatedTraceId;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* The <code>SegmentsMessage</code> is a set of {@link TraceSegment},
* this set provides a container, when several {@link TraceSegment}s are going to uplink to server.
*
*
* @author wusheng
*/
@JsonAdapter(SegmentsMessage.Serializer.class)
public class SegmentsMessage {
private List<TraceSegment> segments;
public SegmentsMessage(){
segments = new LinkedList<TraceSegment>();
}
public void append(TraceSegment segment){
this.segments.add(segment);
}
public List<TraceSegment> getSegments() {
return Collections.unmodifiableList(segments);
}
public static class Serializer extends TypeAdapter<SegmentsMessage>{
@Override
public void write(JsonWriter out, SegmentsMessage value) throws IOException {
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
out.beginArray();
try {
for (TraceSegment segment : value.segments) {
out.jsonValue(gson.toJson(segment));
}
}finally {
out.endArray();
}
}
@Override
public SegmentsMessage read(JsonReader in) throws IOException {
SegmentsMessage message = new SegmentsMessage();
in.beginArray();
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
try {
while (in.hasNext()) {
TraceSegment traceSegment = gson.fromJson(in, TraceSegment.class);
message.append(traceSegment);
}
} finally {
in.endArray();
}
return message;
}
}
}
package com.a.eye.skywalking.trace.TraceId; package com.a.eye.skywalking.trace.TraceId;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
/** /**
* The <code>DistributedTraceId</code> presents a distributed call chain. * The <code>DistributedTraceId</code> presents a distributed call chain.
* *
...@@ -19,7 +13,6 @@ import java.io.IOException; ...@@ -19,7 +13,6 @@ import java.io.IOException;
* *
* @author wusheng * @author wusheng
*/ */
@JsonAdapter(DistributedTraceId.Serializer.class)
public abstract class DistributedTraceId { public abstract class DistributedTraceId {
private String id; private String id;
...@@ -47,22 +40,4 @@ public abstract class DistributedTraceId { ...@@ -47,22 +40,4 @@ public abstract class DistributedTraceId {
public int hashCode() { public int hashCode() {
return id != null ? id.hashCode() : 0; return id != null ? id.hashCode() : 0;
} }
public static class Serializer extends TypeAdapter<DistributedTraceId> {
@Override
public void write(JsonWriter out, DistributedTraceId value) throws IOException {
out.beginArray();
out.value(value.get());
out.endArray();
}
@Override
public DistributedTraceId read(JsonReader in) throws IOException {
in.beginArray();
PropagatedTraceId traceId = new PropagatedTraceId(in.nextString());
in.endArray();
return traceId;
}
}
} }
package com.a.eye.skywalking.trace.TraceId;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/**
* @author wusheng
*/
@JsonAdapter(DistributedTraceIds.Serializer.class)
public class DistributedTraceIds {
private LinkedList<DistributedTraceId> relatedGlobalTraces;
public DistributedTraceIds() {
relatedGlobalTraces = new LinkedList<DistributedTraceId>();
}
public List<DistributedTraceId> getRelatedGlobalTraces() {
return Collections.unmodifiableList(relatedGlobalTraces);
}
public void append(DistributedTraceId distributedTraceId) {
if (relatedGlobalTraces.size() > 0 && relatedGlobalTraces.getFirst() instanceof NewDistributedTraceId) {
relatedGlobalTraces.removeFirst();
}
if (!relatedGlobalTraces.contains(distributedTraceId)) {
relatedGlobalTraces.add(distributedTraceId);
}
}
public static class Serializer extends TypeAdapter<DistributedTraceIds> {
@Override
public void write(JsonWriter out, DistributedTraceIds value) throws IOException {
List<DistributedTraceId> globalTraces = value.getRelatedGlobalTraces();
if (globalTraces.size() > 0) {
out.beginArray();
for (DistributedTraceId trace : globalTraces) {
out.value(trace.get());
}
out.endArray();
}
}
@Override
public DistributedTraceIds read(JsonReader in) throws IOException {
DistributedTraceIds distributedTraceIds = new DistributedTraceIds();
in.beginArray();
try {
while (in.hasNext()) {
PropagatedTraceId traceId = new PropagatedTraceId(in.nextString());
distributedTraceIds.append(traceId);
}
} finally {
in.endArray();
}
return distributedTraceIds;
}
}
}
package com.a.eye.skywalking.trace; package com.a.eye.skywalking.trace;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId; import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceIds;
import com.a.eye.skywalking.trace.TraceId.NewDistributedTraceId; import com.a.eye.skywalking.trace.TraceId.NewDistributedTraceId;
import com.google.gson.annotations.Expose; import com.google.gson.annotations.Expose;
import com.google.gson.annotations.SerializedName; import com.google.gson.annotations.SerializedName;
...@@ -86,7 +87,7 @@ public class TraceSegment { ...@@ -86,7 +87,7 @@ public class TraceSegment {
*/ */
@Expose @Expose
@SerializedName(value="gt") @SerializedName(value="gt")
private LinkedList<DistributedTraceId> relatedGlobalTraces; private DistributedTraceIds relatedGlobalTraces;
/** /**
* Create a trace segment, by given segmentId. * Create a trace segment, by given segmentId.
...@@ -94,18 +95,18 @@ public class TraceSegment { ...@@ -94,18 +95,18 @@ public class TraceSegment {
*/ */
public TraceSegment(String applicationCode) { public TraceSegment(String applicationCode) {
this(); this();
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.applicationCode = applicationCode; this.applicationCode = applicationCode;
this.startTime = System.currentTimeMillis();
} }
/** /**
* Create a default/empty trace segment * Create a default/empty trace segment
*/ */
public TraceSegment() { public TraceSegment() {
this.startTime = System.currentTimeMillis();
this.traceSegmentId = GlobalIdGenerator.generate(ID_TYPE);
this.spans = new LinkedList<Span>(); this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new LinkedList<DistributedTraceId>(); this.relatedGlobalTraces = new DistributedTraceIds();
this.relatedGlobalTraces.add(new NewDistributedTraceId()); this.relatedGlobalTraces.append(new NewDistributedTraceId());
} }
/** /**
...@@ -126,13 +127,8 @@ public class TraceSegment { ...@@ -126,13 +127,8 @@ public class TraceSegment {
if (distributedTraceIds == null || distributedTraceIds.size() == 0) { if (distributedTraceIds == null || distributedTraceIds.size() == 0) {
return; return;
} }
if (relatedGlobalTraces.getFirst() instanceof NewDistributedTraceId) {
relatedGlobalTraces.removeFirst();
}
for (DistributedTraceId distributedTraceId : distributedTraceIds) { for (DistributedTraceId distributedTraceId : distributedTraceIds) {
if(!relatedGlobalTraces.contains(distributedTraceId)){ relatedGlobalTraces.append(distributedTraceId);
relatedGlobalTraces.add(distributedTraceId);
}
} }
} }
...@@ -176,7 +172,7 @@ public class TraceSegment { ...@@ -176,7 +172,7 @@ public class TraceSegment {
} }
public List<DistributedTraceId> getRelatedGlobalTraces() { public List<DistributedTraceId> getRelatedGlobalTraces() {
return Collections.unmodifiableList(relatedGlobalTraces); return relatedGlobalTraces.getRelatedGlobalTraces();
} }
public List<Span> getSpans() { public List<Span> getSpans() {
...@@ -187,7 +183,8 @@ public class TraceSegment { ...@@ -187,7 +183,8 @@ public class TraceSegment {
return applicationCode; return applicationCode;
} }
@Override public String toString() { @Override
public String toString() {
return "TraceSegment{" + return "TraceSegment{" +
"traceSegmentId='" + traceSegmentId + '\'' + "traceSegmentId='" + traceSegmentId + '\'' +
", startTime=" + startTime + ", startTime=" + startTime +
......
...@@ -106,11 +106,15 @@ public class TraceSegmentTestCase { ...@@ -106,11 +106,15 @@ public class TraceSegmentTestCase {
.excludeFieldsWithoutExposeAnnotation() .excludeFieldsWithoutExposeAnnotation()
.create(); .create();
String json = gson.toJson(segment); SegmentsMessage message = new SegmentsMessage();
message.append(segment);
String json = gson.toJson(message);
System.out.println(json); System.out.println(json);
message = gson.fromJson(json, SegmentsMessage.class);
TraceSegment newSegment = gson.fromJson(json, TraceSegment.class); TraceSegment newSegment = message.getSegments().get(0);
Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size()); Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId()); Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId());
......
...@@ -45,41 +45,13 @@ ...@@ -45,41 +45,13 @@
<pattern>${shade.com.lmax.disruptor.source}</pattern> <pattern>${shade.com.lmax.disruptor.source}</pattern>
<shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern> <shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern>
</relocation> </relocation>
<relocation>
<pattern>${shade.akka.source}</pattern>
<shadedPattern>${shade.akka.target}</shadedPattern>
</relocation>
<relocation> <relocation>
<pattern>${shade.com.google.source}</pattern> <pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern> <shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation> </relocation>
<relocation> <relocation>
<pattern>${shade.org.agrona.source}</pattern> <pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.agrona.target}</shadedPattern> <shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.jboss.netty.source}</pattern>
<shadedPattern>${shade.org.jboss.netty.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.reactivestreams.source}</pattern>
<shadedPattern>${shade.org.reactivestreams.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.uncommons.maths.source}</pattern>
<shadedPattern>${shade.org.uncommons.maths.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.scala.source}</pattern>
<shadedPattern>${shade.scala.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.io.aeron.source}</pattern>
<shadedPattern>${shade.io.aeron.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.typesafe.source}</pattern>
<shadedPattern>${shade.com.typesafe.target}</shadedPattern>
</relocation> </relocation>
</relocations> </relocations>
</configuration> </configuration>
...@@ -148,8 +120,17 @@ ...@@ -148,8 +120,17 @@
</repository> </repository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}</shade.com.lmax.disruptor.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
<premain.class>com.a.eye.skywalking.agent.SkyWalkingAgent</premain.class> <premain.class>com.a.eye.skywalking.agent.SkyWalkingAgent</premain.class>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
</properties> </properties>
</project> </project>
...@@ -22,9 +22,12 @@ ...@@ -22,9 +22,12 @@
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source> <shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target> <shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source> <shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}</shade.com.lmax.disruptor.target> <shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}
</shade.com.lmax.disruptor.target>
<shade.com.google.source>com.google</shade.com.google.source> <shade.com.google.source>com.google</shade.com.google.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target> <shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
</properties> </properties>
<dependencies> <dependencies>
...@@ -133,6 +136,10 @@ ...@@ -133,6 +136,10 @@
<pattern>${shade.com.google.source}</pattern> <pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern> <shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation> </relocation>
<relocation>
<pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
</relocations> </relocations>
</configuration> </configuration>
</execution> </execution>
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
<version>3.0-2017</version> <version>3.0-2017</version>
</parent> </parent>
<artifactId>skywalking-api</artifactId> <artifactId>skywalking-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
...@@ -17,6 +16,7 @@ ...@@ -17,6 +16,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jetty.version>9.4.2.v20170220</jetty.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -40,17 +40,26 @@ ...@@ -40,17 +40,26 @@
<artifactId>disruptor</artifactId> <artifactId>disruptor</artifactId>
<version>3.3.6</version> <version>3.3.6</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId> <artifactId>maven-resources-plugin</artifactId>
...@@ -59,20 +68,6 @@ ...@@ -59,20 +68,6 @@
<encoding>${project.build.sourceEncoding}</encoding> <encoding>${project.build.sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
...@@ -8,5 +8,5 @@ package com.a.eye.skywalking.api.boot; ...@@ -8,5 +8,5 @@ package com.a.eye.skywalking.api.boot;
* @author wusheng * @author wusheng
*/ */
public interface BootService { public interface BootService {
void bootUp() throws Exception; void bootUp() throws Throwable;
} }
...@@ -23,14 +23,14 @@ public enum ServiceManager { ...@@ -23,14 +23,14 @@ public enum ServiceManager {
public void boot() { public void boot() {
if (!isStarted) { if (!isStarted) {
try { try {
bootedServices = new HashMap<>(); bootedServices = new HashMap<Class, BootService>();
Iterator<BootService> serviceIterator = load().iterator(); Iterator<BootService> serviceIterator = load().iterator();
while (serviceIterator.hasNext()) { while (serviceIterator.hasNext()) {
BootService bootService = serviceIterator.next(); BootService bootService = serviceIterator.next();
try { try {
bootService.bootUp(); bootService.bootUp();
bootedServices.put(bootService.getClass(), bootService); bootedServices.put(bootService.getClass(), bootService);
} catch (Exception e) { } catch (Throwable e) {
logger.error(e, "ServiceManager try to start [{}] fail.", bootService.getClass().getName()); logger.error(e, "ServiceManager try to start [{}] fail.", bootService.getClass().getName());
} }
} }
......
...@@ -18,7 +18,7 @@ public abstract class StatusBootService implements BootService { ...@@ -18,7 +18,7 @@ public abstract class StatusBootService implements BootService {
} }
@Override @Override
public final void bootUp() throws Exception{ public final void bootUp() throws Throwable{
try { try {
bootUpWithStatus(); bootUpWithStatus();
started = true; started = true;
......
package com.a.eye.skywalking.api.client;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
import java.util.List;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClients;
/**
* The <code>CollectorClient</code> runs as an independency thread.
* It retrieves cached {@link TraceSegment} from {@link TraceSegmentProcessQueue},
* and send to collector by HTTP-RESTFUL-SERVICE: POST /skywalking/trace/segment
*
* @author wusheng
*/
public class CollectorClient implements Runnable {
private static ILog logger = LogManager.getLogger(CollectorClient.class);
private static long SLEEP_TIME_MILLIS = 500;
private CloseableHttpClient httpclient;
public CollectorClient() {
httpclient = HttpClients.custom()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.build();
}
@Override
public void run() {
while (true) {
try {
long sleepTime = -1;
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
for (TraceSegment segment : cachedTraceSegments) {
/**
* No receiver found, means collector server is off-line.
*/
sleepTime = SLEEP_TIME_MILLIS * 10;
break;
}
} else {
sleepTime = SLEEP_TIME_MILLIS;
}
if (sleepTime > 0) {
try2Sleep(sleepTime);
}
} catch (Throwable t) {
logger.error(t, "Send trace segments to collector failure.");
}
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
package com.a.eye.skywalking.api.client; package com.a.eye.skywalking.api.client;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.boot.StatusBootService; import com.a.eye.skywalking.api.boot.StatusBootService;
import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue; import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.TraceSegment;
import java.util.List;
/** /**
* The <code>CollectorClientService</code> is responsible for start {@link CollectorClient}.
*
* @author wusheng * @author wusheng
*/ */
public class CollectorClientService extends StatusBootService implements Runnable { public class CollectorClientService extends StatusBootService {
private static ILog logger = LogManager.getLogger(CollectorClientService.class);
private static long SLEEP_TIME_MILLIS = 500;
/** /**
* Start a new {@link Thread} to get finished {@link TraceSegment} by {@link TraceSegmentProcessQueue#getCachedTraceSegments()} * Start a new {@link Thread} to get finished {@link TraceSegment} by {@link TraceSegmentProcessQueue#getCachedTraceSegments()}
*/ */
@Override @Override
protected void bootUpWithStatus() throws Exception { protected void bootUpWithStatus() throws Exception {
Thread collectorClientThread = new Thread(this, "collectorClientThread"); Thread collectorClientThread = new Thread(new CollectorClient(), "collectorClientThread");
collectorClientThread.start(); collectorClientThread.start();
} }
@Override
public void run() {
while (true) {
try {
long sleepTime = -1;
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
for (TraceSegment segment : cachedTraceSegments) {
/**
* No receiver found, means collector server is off-line.
*/
sleepTime = SLEEP_TIME_MILLIS * 10;
break;
}
} else {
sleepTime = SLEEP_TIME_MILLIS;
}
if (sleepTime > 0) {
try2Sleep(sleepTime);
}
} catch (Throwable t) {
logger.error(t, "Send trace segments to collector failure.");
}
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
} }
...@@ -11,6 +11,7 @@ public class Config { ...@@ -11,6 +11,7 @@ public class Config {
public static String SERVERS = ""; public static String SERVERS = "";
public static String SERVICE_NAME = "/segments";
} }
public static class Disruptor{ public static class Disruptor{
......
...@@ -165,7 +165,7 @@ public class ContextCarrier implements Serializable { ...@@ -165,7 +165,7 @@ public class ContextCarrier implements Serializable {
return null; return null;
} }
String[] propagationTraceIdValues = text.split(","); String[] propagationTraceIdValues = text.split(",");
List<DistributedTraceId> traceIds = new LinkedList<>(); List<DistributedTraceId> traceIds = new LinkedList<DistributedTraceId>();
for (String propagationTraceIdValue : propagationTraceIdValues) { for (String propagationTraceIdValue : propagationTraceIdValues) {
traceIds.add(new PropagatedTraceId(propagationTraceIdValue)); traceIds.add(new PropagatedTraceId(propagationTraceIdValue));
} }
......
...@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment; ...@@ -18,7 +18,7 @@ import com.a.eye.skywalking.trace.TraceSegment;
* Created by wusheng on 2017/2/17. * Created by wusheng on 2017/2/17.
*/ */
public class ContextManager implements TracerContextListener, BootService { public class ContextManager implements TracerContextListener, BootService {
private static ThreadLocal<TracerContext> CONTEXT = new ThreadLocal<>(); private static ThreadLocal<TracerContext> CONTEXT = new ThreadLocal<TracerContext>();
private static TracerContext get() { private static TracerContext get() {
TracerContext segment = CONTEXT.get(); TracerContext segment = CONTEXT.get();
......
...@@ -186,7 +186,7 @@ public final class TracerContext { ...@@ -186,7 +186,7 @@ public final class TracerContext {
} }
public static class ListenerManager { public static class ListenerManager {
private static List<TracerContextListener> listeners = new LinkedList<>(); private static List<TracerContextListener> listeners = new LinkedList<TracerContextListener>();
/** /**
* Add the given {@link TracerContextListener} to {@link #listeners} list. * Add the given {@link TracerContextListener} to {@link #listeners} list.
......
...@@ -31,7 +31,7 @@ public class PluginBootstrap { ...@@ -31,7 +31,7 @@ public class PluginBootstrap {
if (resources == null || resources.size() == 0) { if (resources == null || resources.size() == 0) {
logger.info("no plugin files (skywalking-plugin.properties) found, continue to start application."); logger.info("no plugin files (skywalking-plugin.properties) found, continue to start application.");
return new ArrayList<>(); return new ArrayList<AbstractClassEnhancePluginDefine>();
} }
for (URL pluginUrl : resources) { for (URL pluginUrl : resources) {
......
...@@ -46,7 +46,7 @@ public class ClassInstanceMethodsInterceptor { ...@@ -46,7 +46,7 @@ public class ClassInstanceMethodsInterceptor {
*/ */
@RuntimeType @RuntimeType
public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @Origin Method method, @SuperCall Callable<?> zuper, public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @Origin Method method, @SuperCall Callable<?> zuper,
@FieldValue(ClassEnhancePluginDefine.contextAttrName) EnhancedClassInstanceContext instanceContext) throws Exception { @FieldValue(ClassEnhancePluginDefine.contextAttrName) EnhancedClassInstanceContext instanceContext) throws Throwable {
InstanceMethodsAroundInterceptor interceptor = InterceptorInstanceLoader InstanceMethodsAroundInterceptor interceptor = InterceptorInstanceLoader
.load(instanceMethodsAroundInterceptorClassName, obj.getClass().getClassLoader()); .load(instanceMethodsAroundInterceptorClassName, obj.getClass().getClassLoader());
......
...@@ -46,7 +46,7 @@ public class ClassStaticMethodsInterceptor { ...@@ -46,7 +46,7 @@ public class ClassStaticMethodsInterceptor {
* or unexpected exception in sky-walking ( This is a bug, if anything triggers this condition ). * or unexpected exception in sky-walking ( This is a bug, if anything triggers this condition ).
*/ */
@RuntimeType @RuntimeType
public Object intercept(@Origin Class<?> clazz, @AllArguments Object[] allArguments, @Origin Method method, @SuperCall Callable<?> zuper) throws Exception { public Object intercept(@Origin Class<?> clazz, @AllArguments Object[] allArguments, @Origin Method method, @SuperCall Callable<?> zuper) throws Throwable {
StaticMethodsAroundInterceptor interceptor = InterceptorInstanceLoader StaticMethodsAroundInterceptor interceptor = InterceptorInstanceLoader
.load(staticMethodsAroundInterceptorClassName, clazz.getClassLoader()); .load(staticMethodsAroundInterceptorClassName, clazz.getClassLoader());
......
...@@ -30,7 +30,7 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -30,7 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class InterceptorInstanceLoader { public class InterceptorInstanceLoader {
private static ILog logger = LogManager.getLogger(InterceptorInstanceLoader.class); private static ILog logger = LogManager.getLogger(InterceptorInstanceLoader.class);
private static ConcurrentHashMap<String, Object> INSTANCE_CACHE = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, Object> INSTANCE_CACHE = new ConcurrentHashMap<String, Object>();
private static ReentrantLock instanceLoadLock = new ReentrantLock(); private static ReentrantLock instanceLoadLock = new ReentrantLock();
......
...@@ -30,7 +30,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace ...@@ -30,7 +30,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
private volatile int cacheIndex; private volatile int cacheIndex;
public TraceSegmentProcessQueue() { public TraceSegmentProcessQueue() {
disruptor = new Disruptor<>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE]; secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE];
cacheIndex = 0; cacheIndex = 0;
disruptor.handleEventsWith(this); disruptor.handleEventsWith(this);
...@@ -83,7 +83,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace ...@@ -83,7 +83,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
} }
public List<TraceSegment> getCachedTraceSegments(){ public List<TraceSegment> getCachedTraceSegments(){
List<TraceSegment> segmentList = new LinkedList<>(); List<TraceSegment> segmentList = new LinkedList<TraceSegment>();
for (int i = 0; i < secondLevelCache.length; i++) { for (int i = 0; i < secondLevelCache.length; i++) {
TraceSegment segment = secondLevelCache[i]; TraceSegment segment = secondLevelCache[i];
if(segment != null){ if(segment != null){
......
package com.a.eye.skywalking.api.client;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClients;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
/**
* This is a small application, test for http restful service.
* Use APACHE HttpClient as client, nanohttpd as server.
*
* @author wusheng
*/
public class HTTPRestServiceTestApp {
public static void main(String[] args) throws Exception {
CloseableHttpClient client = null;
Server server = null;
try {
HTTPRestServiceTestApp test = new HTTPRestServiceTestApp();
server = test.startServer();
client = test.send();
} finally {
if (client != null) {
client.close();
}
if (server != null) {
server.stop();
}
}
}
private CloseableHttpClient send() {
CloseableHttpClient httpclient = HttpClients.custom()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.build();
HttpPost post = new HttpPost("http://localhost:7000/segments");
StringEntity entity = new StringEntity("[{'abc'}]", ContentType.APPLICATION_JSON);
post.setEntity(entity);
try {
CloseableHttpResponse httpResponse = httpclient.execute(post);
System.out.println(httpResponse.getStatusLine().getStatusCode());
} catch (IOException e) {
e.printStackTrace();
}
return httpclient;
}
private Server startServer() throws Exception {
Server server = new Server(7000);
server.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
BufferedReader br = request.getReader();
String str, wholeStr = "";
while ((str = br.readLine()) != null) {
wholeStr += str;
}
System.out.println(wholeStr);
response.setContentType("text/html; charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
}
});
//server.start();
return server;
}
}
...@@ -18,7 +18,7 @@ public class ContextCarrierTestCase { ...@@ -18,7 +18,7 @@ public class ContextCarrierTestCase {
carrier.setSpanId(100); carrier.setSpanId(100);
carrier.setApplicationCode("REMOTE_APP"); carrier.setApplicationCode("REMOTE_APP");
carrier.setPeerHost("10.2.3.16:8080"); carrier.setPeerHost("10.2.3.16:8080");
List<DistributedTraceId> ids = new LinkedList<>(); List<DistributedTraceId> ids = new LinkedList<DistributedTraceId>();
ids.add(new PropagatedTraceId("Trace.global.id.123")); ids.add(new PropagatedTraceId("Trace.global.id.123"));
carrier.setDistributedTraceIds(ids); carrier.setDistributedTraceIds(ids);
......
...@@ -77,7 +77,7 @@ public class TracerContextTestCase { ...@@ -77,7 +77,7 @@ public class TracerContextTestCase {
carrier.setSpanId(5); carrier.setSpanId(5);
carrier.setApplicationCode("REMOTE_APP"); carrier.setApplicationCode("REMOTE_APP");
carrier.setPeerHost("10.2.3.16:8080"); carrier.setPeerHost("10.2.3.16:8080");
List<DistributedTraceId> ids = new LinkedList<>(); List<DistributedTraceId> ids = new LinkedList<DistributedTraceId>();
ids.add(new PropagatedTraceId("Trace.global.id.123")); ids.add(new PropagatedTraceId("Trace.global.id.123"));
carrier.setDistributedTraceIds(ids); carrier.setDistributedTraceIds(ids);
......
...@@ -11,7 +11,7 @@ import org.junit.Test; ...@@ -11,7 +11,7 @@ import org.junit.Test;
public class PluginFinderTest { public class PluginFinderTest {
@Test @Test
public void testFind(){ public void testFind(){
ArrayList<AbstractClassEnhancePluginDefine> defines = new ArrayList<>(); ArrayList<AbstractClassEnhancePluginDefine> defines = new ArrayList<AbstractClassEnhancePluginDefine>();
defines.add(new NewTestPlugin()); defines.add(new NewTestPlugin());
defines.add(new NewTestPlugin2()); defines.add(new NewTestPlugin2());
PluginFinder finder = new PluginFinder(defines); PluginFinder finder = new PluginFinder(defines);
...@@ -22,7 +22,7 @@ public class PluginFinderTest { ...@@ -22,7 +22,7 @@ public class PluginFinderTest {
@Test(expected = PluginException.class) @Test(expected = PluginException.class)
public void testCanNotFind(){ public void testCanNotFind(){
ArrayList<AbstractClassEnhancePluginDefine> defines = new ArrayList<>(); ArrayList<AbstractClassEnhancePluginDefine> defines = new ArrayList<AbstractClassEnhancePluginDefine>();
defines.add(new NewTestPlugin()); defines.add(new NewTestPlugin());
PluginFinder finder = new PluginFinder(defines); PluginFinder finder = new PluginFinder(defines);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册