提交 34a0a0b4 编写于 作者: wu-sheng's avatar wu-sheng

Add codes about grpc client service.

上级 95dcd392
......@@ -9,6 +9,8 @@ import org.skywalking.apm.agent.core.context.trace.SpanType;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.sampling.SamplingService;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.util.StringUtil;
/**
......@@ -22,29 +24,35 @@ import org.skywalking.apm.util.StringUtil;
* @author wusheng
*/
public class ContextManager implements TracingContextListener, BootService, IgnoreTracerContextListener {
private static final ILog logger = LogManager.getLogger(ContextManager.class);
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
if (StringUtil.isEmpty(operationName)) {
throw new IllegalArgumentException("No operation name");
}
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
/**
* Can't register to collector, no need to trace anything.
*/
if (StringUtil.isEmpty(operationName)) {
if (logger.isDebugEnable()) {
logger.debug("No operation name, ignore this trace.");
}
context = new IgnoredTracerContext();
} else {
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
/**
* Can't register to collector, no need to trace anything.
*/
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
context = new IgnoredTracerContext();
}
}
}
}
......
......@@ -21,6 +21,7 @@ public class GRPCChannelManager implements BootService, Runnable {
private volatile Thread channelManagerThread = null;
private volatile ManagedChannel managedChannel = null;
private volatile long nextStartTime = 0;
private Random random = new Random();
private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());
......@@ -31,7 +32,7 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
this.startupInBackground();
this.startupInBackground(false);
}
@Override
......@@ -39,11 +40,20 @@ public class GRPCChannelManager implements BootService, Runnable {
}
private void startupInBackground() {
private void startupInBackground(boolean forceStart) {
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
synchronized (this) {
if (forceStart) {
/**
* The startup has invoked in 30 seconds before, don't invoke again.
*/
if (System.currentTimeMillis() < nextStartTime) {
return;
}
}
resetNextStartTime();
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
if (managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) {
if (forceStart || managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) {
if (managedChannel != null) {
managedChannel.shutdownNow();
}
......@@ -59,6 +69,8 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void run() {
while (true) {
resetNextStartTime();
if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) {
int index = random.nextInt() % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
String server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
......@@ -79,13 +91,14 @@ public class GRPCChannelManager implements BootService, Runnable {
}
}
resetNextStartTime();
int waitTime = 5 * 1000;
logger.debug("Selected collector grpc service is not available. Wait {} seconds to try", waitTime);
try2Sleep(waitTime);
}
}
public void addChannelListener(GRPCChannelListener listener){
public void addChannelListener(GRPCChannelListener listener) {
listeners.add(listener);
}
......@@ -93,6 +106,14 @@ public class GRPCChannelManager implements BootService, Runnable {
return managedChannel;
}
public void reportError() {
this.startupInBackground(true);
}
private void resetNextStartTime() {
nextStartTime = System.currentTimeMillis() + 20 * 1000;
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.agent.core.remote;
* @author wusheng
*/
public enum GRPCChannelStatus {
CONNECTED;
CONNECTED
}
......@@ -14,8 +14,8 @@ public class GRPCStreamServiceStatus {
return status;
}
public void setStatus(boolean status) {
this.status = status;
public void finished() {
this.status = true;
}
/**
......
......@@ -29,6 +29,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub;
private volatile GRPCChannelStatus status = null;
@Override
public void beforeBoot() throws Throwable {
......@@ -54,39 +55,46 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void consume(List<TraceSegment> data) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
@Override
public void onNext(Downstream downstream) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
@Override
public void onNext(Downstream downstream) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError();
}
@Override
public void onCompleted() {
status.finished();
}
});
try {
for (TraceSegment segment : data) {
//TODO
// segment to PROTOBUFF object
upstreamSegmentStreamObserver.onNext(null);
}
} catch (Throwable t) {
logger.error(t, "Send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
@Override
public void onError(Throwable throwable) {
status.setStatus(true);
}
status.wait4Finish(30 * 1000);
@Override
public void onCompleted() {
status.setStatus(true);
if (logger.isDebugEnable()) {
logger.debug("{} trace segments have been sent to collector.", data.size());
}
});
try {
for (TraceSegment segment : data) {
//TODO
// segment to PROTOBUF object
upstreamSegmentStreamObserver.onNext(null);
} else {
if (logger.isDebugEnable()) {
logger.debug("{} trace segments have been abandoned, cause by no available channel.", data.size());
}
} catch (Throwable t) {
logger.error(t, "Send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish(30 * 1000);
if (logger.isDebugEnable()) {
logger.debug("{} trace segments have been sent to collector.", data.size());
}
}
......@@ -110,6 +118,8 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
serviceStub = TraceSegmentServiceGrpc.newStub(channel);
} else {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册