提交 11b4dc7b 编写于 作者: wu-sheng's avatar wu-sheng

Support finding grpc-server list. Attempt to acquire the list every 60 seconds. Related to #254

上级 84769692
package org.skywalking.apm.agent.core.client;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.queue.TraceSegmentProcessQueue;
import org.skywalking.apm.agent.core.boot.StatusBootService;
/**
* The <code>CollectorClientService</code> is responsible for start {@link CollectorClient}.
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
*
* @author wusheng
*/
public class CollectorClientService extends StatusBootService {
/**
* Start a new {@link Thread} to get finished {@link TraceSegment} by {@link TraceSegmentProcessQueue#getCachedTraceSegments()}
*/
public class CollectorDiscoveryService extends StatusBootService {
@Override
protected void bootUpWithStatus() throws Exception {
Thread collectorClientThread = new Thread(new CollectorClient(), "collectorClientThread");
Thread collectorClientThread = new Thread(new DiscoveryRestServiceClient(), "collectorClientThread");
collectorClientThread.start();
}
}
package org.skywalking.apm.agent.core.client;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Random;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.http.util.EntityUtils;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.queue.TraceSegmentProcessQueue;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import static org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig.Collector.GRPC_SERVERS;
/**
* The <code>CollectorClient</code> runs as an independency thread.
* It retrieves cached {@link TraceSegment} from {@link TraceSegmentProcessQueue},
* and send to collector by HTTP-RESTFUL-SERVICE: POST /skywalking/trace/segment
* The <code>DiscoveryRestServiceClient</code> try to get the collector's grpc-server list
* in every 60 seconds,
* and override {@link org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig.Collector#GRPC_SERVERS}.
*
* @author wusheng
*/
public class CollectorClient implements Runnable {
private static final ILog logger = LogManager.getLogger(CollectorClient.class);
private static long SLEEP_TIME_MILLIS = 500;
public class DiscoveryRestServiceClient implements Runnable {
private static final ILog logger = LogManager.getLogger(DiscoveryRestServiceClient.class);
private String[] serverList;
private volatile int selectedServer = -1;
private Gson serializer;
public CollectorClient() {
public DiscoveryRestServiceClient() {
serverList = Config.Collector.SERVERS.split(",");
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
serializer = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
}
@Override
public void run() {
while (true) {
try {
long sleepTime = -1;
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
SegmentsMessage message = null;
int count = 0;
for (TraceSegment segment : cachedTraceSegments) {
if (message == null) {
message = new SegmentsMessage();
}
message.append(segment);
if (count == Config.Collector.BATCH_SIZE) {
sendToCollector(message);
message = null;
}
}
sendToCollector(message);
} else {
sleepTime = SLEEP_TIME_MILLIS;
}
if (sleepTime > 0) {
try2Sleep(sleepTime);
}
try2Sleep(60 * 1000);
findServerList();
} catch (Throwable t) {
logger.error(t, "Send trace segments to collector failure.");
logger.error(t, "Find server list fail.");
}
}
}
/**
* Send the given {@link SegmentsMessage} to collector.
*
* @param message to be send.
*/
private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusError, IOException {
if (message == null) {
return;
}
String messageJson = message.serialize(serializer);
private void findServerList() throws RESTResponseStatusError, IOException {
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
HttpPost httpPost = ready2Send(messageJson);
if (httpPost != null) {
CloseableHttpResponse httpResponse = httpClient.execute(httpPost);
HttpGet httpGet = buildGet();
if (httpGet != null) {
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (200 != statusCode) {
findBackupServer();
throw new RESTResponseStatusError(statusCode);
} else {
JsonArray serverList = new Gson().fromJson(EntityUtils.toString(httpResponse.getEntity()), JsonArray.class);
if (serverList != null && serverList.size() > 0) {
LinkedList<String> newServerList = new LinkedList<String>();
for (JsonElement element : serverList) {
newServerList.add(element.getAsString());
}
if (!newServerList.equals(GRPC_SERVERS)) {
logger.debug("Refresh GRPC server list: {}", GRPC_SERVERS);
} else {
logger.debug("GRPC server list remain unchanged: {}", GRPC_SERVERS);
}
}
}
}
} catch (IOException e) {
......@@ -110,19 +85,16 @@ public class CollectorClient implements Runnable {
/**
* Prepare the given message for HTTP Post service.
*
* @param messageJson to send
* @return {@link HttpPost}, when is ready to send. otherwise, null.
* @return {@link HttpGet}, when is ready to send. otherwise, null.
*/
private HttpPost ready2Send(String messageJson) {
private HttpGet buildGet() {
if (selectedServer == -1) {
//no available server
return null;
}
HttpPost post = new HttpPost("http://" + serverList[selectedServer] + Config.Collector.SERVICE_NAME);
StringEntity entity = new StringEntity(messageJson, ContentType.APPLICATION_JSON);
post.setEntity(entity);
HttpGet httpGet = new HttpGet("http://" + serverList[selectedServer] + Config.Collector.DISCOVERY_SERVICE_NAME);
return post;
return httpGet;
}
/**
......
......@@ -43,23 +43,15 @@ public class Config {
public static String SERVERS = "";
/**
* Collector receive segments REST-Service name.
* Collector service discovery REST service name
*/
public static String SERVICE_NAME = "/segments";
/**
* The max size to send traces per rest-service call.
*/
public static int BATCH_SIZE = 50;
public static String DISCOVERY_SERVICE_NAME = "grpc/addresses";
}
public static class Buffer {
/**
* The in-memory buffer size. Based on Disruptor, this value must be 2^n.
*
* @see {https://github.com/LMAX-Exchange/disruptor}
*/
public static int SIZE = 512;
public static int CHANNEL_SIZE = 5;
public static int BUFFER_SIZE = 300;
}
public static class Logging {
......
package org.skywalking.apm.agent.core.conf;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
/**
* @author wusheng
*/
public class RemoteDownstreamConfig {
public static class Agent {
public volatile static int APPLICATION_ID = DictionaryUtil.nullValue();
}
public static class Collector {
/**
* Collector GRPC-Service address.
*/
public volatile static List<String> GRPC_SERVERS = new LinkedList<String>();
}
}
......@@ -3,9 +3,11 @@ package org.skywalking.apm.agent.core.context;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanType;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.sampling.SamplingService;
import org.skywalking.apm.util.StringUtil;
......@@ -28,15 +30,22 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
/**
* Can't register to collector, no need to trace anything.
*/
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
context = new IgnoredTracerContext();
}
}
}
CONTEXT.set(context);
......
......@@ -6,7 +6,7 @@ package org.skywalking.apm.agent.core.context.ids;
* @author wusheng
*/
public class NewDistributedTraceId extends DistributedTraceId {
private static final String ID_TYPE = "Trace";
private static final String ID_TYPE = "T";
public NewDistributedTraceId() {
super(GlobalIdGenerator.generate(ID_TYPE));
......
......@@ -25,7 +25,7 @@ import org.skywalking.apm.logging.LogManager;
public class TraceSegment {
private static final ILog logger = LogManager.getLogger(TraceSegment.class);
private static final String ID_TYPE = "Segment";
private static final String ID_TYPE = "S";
/**
* The id of this trace segment.
......
package org.skywalking.apm.agent.core.datacarrier;
import java.util.List;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.datacarrier.consumer.IConsumer;
import static org.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
/**
* @author wusheng
*/
public class DataBufferService implements BootService, IConsumer<TraceSegment>, TracingContextListener {
private volatile DataCarrier<TraceSegment> carrier;
@Override
public void bootUp() throws Throwable {
carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);
carrier.consume(this, 1);
TracingContext.ListenerManager.add(this);
}
@Override
public void init() {
}
@Override
public void consume(List<TraceSegment> data) {
}
@Override
public void onError(List<TraceSegment> data, Throwable t) {
}
@Override
public void onExit() {
}
@Override
public void afterFinished(TraceSegment traceSegment) {
carrier.produce(traceSegment);
}
}
package org.skywalking.apm.agent.core.datacarrier;
import org.skywalking.apm.agent.core.datacarrier.buffer.BufferStrategy;
import org.skywalking.apm.agent.core.datacarrier.buffer.Channels;
import org.skywalking.apm.agent.core.datacarrier.consumer.ConsumerPool;
import org.skywalking.apm.agent.core.datacarrier.consumer.IConsumer;
import org.skywalking.apm.agent.core.datacarrier.partition.IDataPartitioner;
import org.skywalking.apm.agent.core.datacarrier.partition.SimpleRollingPartitioner;
/**
* DataCarrier main class.
* use this instance to set Producer/Consumer Model
* <p>
* Created by wusheng on 2016/10/25.
*/
public class DataCarrier<T> {
private final int bufferSize;
private final int channelSize;
private Channels<T> channels;
private ConsumerPool<T> consumerPool;
public DataCarrier(int channelSize, int bufferSize) {
this.bufferSize = bufferSize;
this.channelSize = channelSize;
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}
/**
* set a new IDataPartitioner.
* It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner)}
*
* @param dataPartitioner
* @return
*/
public DataCarrier setPartitioner(IDataPartitioner<T> dataPartitioner) {
this.channels.setPartitioner(dataPartitioner);
return this;
}
/**
* override the strategy at runtime.
* Notice, {@link Channels<T>} will override several channels one by one.
*
* @param strategy
*/
public DataCarrier setBufferStrategy(BufferStrategy strategy) {
this.channels.setStrategy(strategy);
return this;
}
/**
* produce data to buffer, using the givven {@link BufferStrategy}.
*
* @param data
* @return false means produce data failure. The data will not be consumed.
*/
public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
return false;
}
}
return this.channels.save(data);
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num);
consumerPool.begin();
return this;
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumer, num);
consumerPool.begin();
return this;
}
/**
* shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
* BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
* Better way to change consumers are use {@link DataCarrier#consume}
*/
public void shutdownConsumers() {
if (consumerPool != null) {
consumerPool.close();
}
}
}
package org.skywalking.apm.agent.core.datacarrier.buffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.datacarrier.common.AtomicRangeInteger;
/**
* Created by wusheng on 2016/10/25.
*/
public class Buffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
}
void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
while (buffer[i] != null) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
buffer[i] = data;
return true;
}
public int getBufferSize() {
return buffer.length;
}
public LinkedList<T> obtain(int start, int end) {
LinkedList<T> result = new LinkedList<T>();
for (int i = start; i < end; i++) {
if (buffer[i] != null) {
result.add((T)buffer[i]);
buffer[i] = null;
}
}
return result;
}
}
package org.skywalking.apm.agent.core.datacarrier.buffer;
/**
* Created by wusheng on 2016/10/25.
*/
public enum BufferStrategy {
/**
* 阻塞模式
*/
BLOCKING,
/**
* 复写模式
*/
OVERRIDE,
/**
* 尝试写入模式,无法写入则返回写入失败
*/
IF_POSSIBLE
}
package org.skywalking.apm.agent.core.datacarrier.buffer;
import org.skywalking.apm.agent.core.datacarrier.partition.IDataPartitioner;
/**
* Channels of Buffer
* It contais all buffer data which belongs to this channel.
* It supports several strategy when buffer is full. The Default is BLOCKING
* <p>
* Created by wusheng on 2016/10/25.
*/
public class Channels<T> {
private final Buffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
bufferChannels = new Buffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
}
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
return bufferChannels[index].save(data);
}
public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
this.dataPartitioner = dataPartitioner;
}
/**
* override the strategy at runtime. Notice, this will override several channels one by one. So, when running
* setStrategy, each channel may use different BufferStrategy
*
* @param strategy
*/
public void setStrategy(BufferStrategy strategy) {
for (Buffer<T> buffer : bufferChannels) {
buffer.setStrategy(strategy);
}
}
/**
* get channelSize
*
* @return
*/
public int getChannelSize() {
return this.bufferChannels.length;
}
public Buffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
}
package org.skywalking.apm.agent.core.datacarrier.common;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wusheng on 2016/10/25.
*/
public class AtomicRangeInteger extends Number implements Serializable {
private static final long serialVersionUID = -4099792402691141643L;
private AtomicInteger value;
private int startValue;
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
this.value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
public final int getAndIncrement() {
int current;
int next;
do {
current = this.value.get();
next = current >= this.endValue ? this.startValue : current + 1;
}
while (!this.value.compareAndSet(current, next));
return current;
}
public final int get() {
return this.value.get();
}
public int intValue() {
return this.value.intValue();
}
public long longValue() {
return this.value.longValue();
}
public float floatValue() {
return this.value.floatValue();
}
public double doubleValue() {
return this.value.doubleValue();
}
}
package org.skywalking.apm.agent.core.datacarrier.consumer;
/**
* Created by wusheng on 2016/11/15.
*/
public class ConsumerCannotBeCreatedException extends RuntimeException {
ConsumerCannotBeCreatedException(Throwable t) {
super(t);
}
}
package org.skywalking.apm.agent.core.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.skywalking.apm.agent.core.datacarrier.buffer.Buffer;
import org.skywalking.apm.agent.core.datacarrier.buffer.Channels;
/**
* Pool of consumers
* <p>
* Created by wusheng on 2016/10/25.
*/
public class ConsumerPool<T> {
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
}
}
public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype);
}
}
private ConsumerPool(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}
private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
try {
IConsumer<T> inst = consumerClass.newInstance();
inst.init();
return inst;
} catch (InstantiationException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (IllegalAccessException e) {
throw new ConsumerCannotBeCreatedException(e);
}
}
public void begin() {
if (running) {
return;
}
try {
lock.lock();
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
public boolean isRunning() {
return running;
}
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
if (channelSize < consumerThreads.length) {
/**
* if consumerThreads.length > channelSize
* each channel will be process by several consumers.
*/
ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
int index = threadIndex % channelSize;
if (threadAllocation[index] == null) {
threadAllocation[index] = new ArrayList<Integer>();
}
threadAllocation[index].add(threadIndex);
}
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
Buffer<T> channel = this.channels.getBuffer(channelIndex);
int bufferSize = channel.getBufferSize();
int step = bufferSize / threadAllocationPerChannel.size();
for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
int threadIndex = threadAllocationPerChannel.get(i);
int start = i * step;
int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
consumerThreads[threadIndex].addDataSource(channel, start, end);
}
}
} else {
/**
* if consumerThreads.length < channelSize
* each consumer will process several channels.
*
* if consumerThreads.length == channelSize
* each consumer will process one channel.
*/
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
}
public void close() {
try {
lock.lock();
this.running = false;
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.shutdown();
}
} finally {
lock.unlock();
}
}
}
package org.skywalking.apm.agent.core.datacarrier.consumer;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.datacarrier.buffer.Buffer;
/**
* Created by wusheng on 2016/10/25.
*/
public class ConsumerThread<T> extends Thread {
private volatile boolean running;
private IConsumer<T> consumer;
private List<DataSource> dataSources;
ConsumerThread(String threadName, IConsumer<T> consumer) {
super((threadName));
this.consumer = consumer;
running = false;
dataSources = new LinkedList<DataSource>();
}
/**
* add partition of buffer to consume
*
* @param sourceBuffer
* @param start
* @param end
*/
void addDataSource(Buffer<T> sourceBuffer, int start, int end) {
this.dataSources.add(new DataSource(sourceBuffer, start, end));
}
/**
* add whole buffer to consume
*
* @param sourceBuffer
*/
void addDataSource(Buffer<T> sourceBuffer) {
this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize()));
}
@Override
public void run() {
running = true;
while (running) {
boolean hasData = consume();
if (!hasData) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
}
// consumer thread is going to stop
// consume the last time
consume();
consumer.onExit();
}
private boolean consume() {
boolean hasData = false;
LinkedList<T> consumeList = new LinkedList<T>();
for (DataSource dataSource : dataSources) {
LinkedList<T> data = dataSource.obtain();
if (data.size() == 0) {
continue;
}
for (T element : data) {
consumeList.add(element);
}
hasData = true;
}
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
}
return hasData;
}
void shutdown() {
running = false;
}
/**
* DataSource is a refer to {@link Buffer}.
*/
class DataSource {
private Buffer<T> sourceBuffer;
private int start;
private int end;
DataSource(Buffer<T> sourceBuffer, int start, int end) {
this.sourceBuffer = sourceBuffer;
this.start = start;
this.end = end;
}
LinkedList<T> obtain() {
return sourceBuffer.obtain(start, end);
}
}
}
package org.skywalking.apm.agent.core.datacarrier.consumer;
import java.util.List;
/**
* Created by wusheng on 2016/10/25.
*/
public interface IConsumer<T> {
void init();
void consume(List<T> data);
void onError(List<T> data, Throwable t);
void onExit();
}
package org.skywalking.apm.agent.core.datacarrier.partition;
/**
* Created by wusheng on 2016/10/25.
*/
public interface IDataPartitioner<T> {
int partition(int total, T data);
}
package org.skywalking.apm.agent.core.datacarrier.partition;
/**
* use threadid % total to partition
*
* Created by wusheng on 2016/10/25.
*/
public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
@Override
public int partition(int total, T data) {
return (int)Thread.currentThread().getId() % total;
}
}
package org.skywalking.apm.agent.core.datacarrier.partition;
/**
* use normal int to rolling.
*
*
* Created by wusheng on 2016/10/25.
*/
public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
private volatile int i = 0;
@Override
public int partition(int total, T data) {
return Math.abs(i++ % total);
}
public static void main(String[] args) {
SimpleRollingPartitioner s = new SimpleRollingPartitioner();
System.out.print(s.i++ % 10);
}
}
......@@ -25,6 +25,4 @@ public interface ConstructorInterceptPoint {
* {@link org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor}
*/
String getConstructorInterceptor();
boolean isOverrideArgs();
}
......@@ -103,26 +103,14 @@ public abstract class ClassEnhancePluginDefine extends AbstractClassEnhancePlugi
*/
if (existedConstructorInterceptPoint) {
for (ConstructorInterceptPoint constructorInterceptPoint : constructorInterceptPoints) {
if (constructorInterceptPoint.isOverrideArgs()) {
newClassBuilder = newClassBuilder.constructor(ElementMatchers.<MethodDescription>any()).intercept(SuperMethodCall.INSTANCE
.andThen(MethodDelegation.withDefaultConfiguration()
.withBinders(
FieldProxy.Binder.install(FieldGetter.class, FieldSetter.class),
Morph.Binder.install(Constructible.class)
)
.to(new ConstructorInter(constructorInterceptPoint.getConstructorInterceptor()))
newClassBuilder = newClassBuilder.constructor(ElementMatchers.<MethodDescription>any()).intercept(SuperMethodCall.INSTANCE
.andThen(MethodDelegation.withDefaultConfiguration()
.withBinders(
FieldProxy.Binder.install(FieldGetter.class, FieldSetter.class)
)
);
} else {
newClassBuilder = newClassBuilder.constructor(ElementMatchers.<MethodDescription>any()).intercept(SuperMethodCall.INSTANCE
.andThen(MethodDelegation.withDefaultConfiguration()
.withBinders(
FieldProxy.Binder.install(FieldGetter.class, FieldSetter.class)
)
.to(new ConstructorInter(constructorInterceptPoint.getConstructorInterceptor()))
)
);
}
.to(new ConstructorInter(constructorInterceptPoint.getConstructorInterceptor()))
)
);
}
}
......
package org.skywalking.apm.agent.core.queue;
import com.lmax.disruptor.EventFactory;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
/**
* Just a holder of {@link TraceSegment} instance.
* <p>
* Created by wusheng on 2017/2/17.
*/
public final class TraceSegmentHolder {
private TraceSegment value;
public TraceSegment getValue() {
return value;
}
public void setValue(TraceSegment value) {
this.value = value;
}
public void clear() {
this.value = null;
}
public enum Factory implements EventFactory<TraceSegmentHolder> {
INSTANCE;
@Override
public TraceSegmentHolder newInstance() {
return new TraceSegmentHolder();
}
}
}
package org.skywalking.apm.agent.core.queue;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.boot.StatusBootService;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
/**
* {@link TraceSegmentProcessQueue} is a proxy of {@link Disruptor}, High Performance Inter-Thread MQ.
* <p>
* {@see https://github.com/LMAX-Exchange/disruptor}
* <p>
* Created by wusheng on 2017/2/17.
*/
public class TraceSegmentProcessQueue extends StatusBootService implements TracingContextListener, EventHandler<TraceSegmentHolder> {
private static final ILog logger = LogManager.getLogger(TraceSegmentProcessQueue.class);
private Disruptor<TraceSegmentHolder> disruptor;
private RingBuffer<TraceSegmentHolder> buffer;
private TraceSegment[] secondLevelCache;
private volatile int cacheIndex;
public TraceSegmentProcessQueue() {
disruptor = new Disruptor<TraceSegmentHolder>(TraceSegmentHolder.Factory.INSTANCE, Config.Buffer.SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Buffer.SIZE];
cacheIndex = 0;
disruptor.handleEventsWith(this);
buffer = disruptor.getRingBuffer();
}
@Override
protected void bootUpWithStatus() {
TracerContext.ListenerManager.add(this);
disruptor.start();
}
/**
* Append the given traceSegment to the queue, wait for sending to Collector.
*
* @param traceSegment finished {@link TraceSegment}
*/
@Override
public void afterFinished(TraceSegment traceSegment) {
if (isStarted() && !traceSegment.isIgnore()) {
long sequence = this.buffer.next(); // Grab the next sequence
try {
TraceSegmentHolder data = this.buffer.get(sequence);
data.setValue(traceSegment);
} finally {
this.buffer.publish(sequence);
}
}
}
@Override
public void onEvent(TraceSegmentHolder event, long sequence, boolean endOfBatch) throws Exception {
TraceSegment traceSegment = event.getValue();
try {
if (secondLevelCache[cacheIndex] == null) {
secondLevelCache[cacheIndex] = traceSegment;
} else {
/**
* If your application has very high throughput(also called tps/qps),
* this log message will be output in very high frequency.
* And this is not suppose to happen. Disable log.warn or expend {@link Config.Disruptor.BUFFER_SIZE}
*/
logger.warn("TraceSegmentProcessQueue has data conflict. Discard the new TraceSegment.");
}
/**
* increase the {@link #cacheIndex}, if it is out of range, reset it.
*/
cacheIndex++;
if (cacheIndex == secondLevelCache.length) {
cacheIndex = 0;
}
} finally {
event.clear();
}
}
public List<TraceSegment> getCachedTraceSegments() {
List<TraceSegment> segmentList = new LinkedList<TraceSegment>();
for (int i = 0; i < secondLevelCache.length; i++) {
TraceSegment segment = secondLevelCache[i];
if (segment != null) {
segmentList.add(segment);
secondLevelCache[i] = null;
}
}
return segmentList;
}
}
org.skywalking.apm.agent.core.queue.TraceSegmentProcessQueue
org.skywalking.apm.agent.core.datacarrier.DataBufferService
org.skywalking.apm.agent.core.context.ContextManager
org.skywalking.apm.agent.core.client.CollectorClientService
org.skywalking.apm.agent.core.sampling.SamplingService
\ No newline at end of file
org.skywalking.apm.agent.core.client.CollectorDiscoveryService
org.skywalking.apm.agent.core.sampling.SamplingService
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册