From 34a0a0b4f3cc87c22eb098d3bde98d2853428b1a Mon Sep 17 00:00:00 2001 From: wusheng Date: Sat, 1 Jul 2017 22:40:38 +0800 Subject: [PATCH] Add codes about grpc client service. --- .../agent/core/context/ContextManager.java | 34 ++++++---- .../agent/core/remote/GRPCChannelManager.java | 29 ++++++-- .../agent/core/remote/GRPCChannelStatus.java | 2 +- .../core/remote/GRPCStreamServiceStatus.java | 4 +- .../remote/TraceSegmentServiceClient.java | 66 +++++++++++-------- 5 files changed, 87 insertions(+), 48 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java index 13a28e9d83..202a36258b 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java @@ -9,6 +9,8 @@ import org.skywalking.apm.agent.core.context.trace.SpanType; import org.skywalking.apm.agent.core.context.trace.TraceSegment; import org.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.skywalking.apm.agent.core.sampling.SamplingService; +import org.skywalking.apm.logging.ILog; +import org.skywalking.apm.logging.LogManager; import org.skywalking.apm.util.StringUtil; /** @@ -22,29 +24,35 @@ import org.skywalking.apm.util.StringUtil; * @author wusheng */ public class ContextManager implements TracingContextListener, BootService, IgnoreTracerContextListener { + private static final ILog logger = LogManager.getLogger(ContextManager.class); + private static ThreadLocal CONTEXT = new ThreadLocal(); private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) { - if (StringUtil.isEmpty(operationName)) { - throw new IllegalArgumentException("No operation name"); - } AbstractTracerContext context = CONTEXT.get(); if (context == null) { - if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) { - /** - * Can't register to collector, no need to trace anything. - */ + if (StringUtil.isEmpty(operationName)) { + if (logger.isDebugEnable()) { + logger.debug("No operation name, ignore this trace."); + } context = new IgnoredTracerContext(); } else { - int suffixIdx = operationName.lastIndexOf("."); - if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) { + if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) { + /** + * Can't register to collector, no need to trace anything. + */ context = new IgnoredTracerContext(); } else { - SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); - if (forceSampling || samplingService.trySampling()) { - context = new TracingContext(); - } else { + int suffixIdx = operationName.lastIndexOf("."); + if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) { context = new IgnoredTracerContext(); + } else { + SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); + if (forceSampling || samplingService.trySampling()) { + context = new TracingContext(); + } else { + context = new IgnoredTracerContext(); + } } } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java index 93899e4cd0..eb40aeb876 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -21,6 +21,7 @@ public class GRPCChannelManager implements BootService, Runnable { private volatile Thread channelManagerThread = null; private volatile ManagedChannel managedChannel = null; + private volatile long nextStartTime = 0; private Random random = new Random(); private List listeners = Collections.synchronizedList(new LinkedList()); @@ -31,7 +32,7 @@ public class GRPCChannelManager implements BootService, Runnable { @Override public void boot() throws Throwable { - this.startupInBackground(); + this.startupInBackground(false); } @Override @@ -39,11 +40,20 @@ public class GRPCChannelManager implements BootService, Runnable { } - private void startupInBackground() { + private void startupInBackground(boolean forceStart) { if (channelManagerThread == null || !channelManagerThread.isAlive()) { synchronized (this) { + if (forceStart) { + /** + * The startup has invoked in 30 seconds before, don't invoke again. + */ + if (System.currentTimeMillis() < nextStartTime) { + return; + } + } + resetNextStartTime(); if (channelManagerThread == null || !channelManagerThread.isAlive()) { - if (managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) { + if (forceStart || managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) { if (managedChannel != null) { managedChannel.shutdownNow(); } @@ -59,6 +69,8 @@ public class GRPCChannelManager implements BootService, Runnable { @Override public void run() { while (true) { + resetNextStartTime(); + if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) { int index = random.nextInt() % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(); String server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index); @@ -79,13 +91,14 @@ public class GRPCChannelManager implements BootService, Runnable { } } + resetNextStartTime(); int waitTime = 5 * 1000; logger.debug("Selected collector grpc service is not available. Wait {} seconds to try", waitTime); try2Sleep(waitTime); } } - public void addChannelListener(GRPCChannelListener listener){ + public void addChannelListener(GRPCChannelListener listener) { listeners.add(listener); } @@ -93,6 +106,14 @@ public class GRPCChannelManager implements BootService, Runnable { return managedChannel; } + public void reportError() { + this.startupInBackground(true); + } + + private void resetNextStartTime() { + nextStartTime = System.currentTimeMillis() + 20 * 1000; + } + /** * Try to sleep, and ignore the {@link InterruptedException} * diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelStatus.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelStatus.java index 78060a69a4..303330c9cd 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelStatus.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelStatus.java @@ -4,5 +4,5 @@ package org.skywalking.apm.agent.core.remote; * @author wusheng */ public enum GRPCChannelStatus { - CONNECTED; + CONNECTED } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java index 0717e59acd..e9e1bba0f7 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java @@ -14,8 +14,8 @@ public class GRPCStreamServiceStatus { return status; } - public void setStatus(boolean status) { - this.status = status; + public void finished() { + this.status = true; } /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index cbc3127826..8094dfab1a 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -29,6 +29,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer carrier; private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub; + private volatile GRPCChannelStatus status = null; @Override public void beforeBoot() throws Throwable { @@ -54,39 +55,46 @@ public class TraceSegmentServiceClient implements BootService, IConsumer data) { - final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); - StreamObserver upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver() { - @Override - public void onNext(Downstream downstream) { - + if (CONNECTED.equals(status)) { + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver() { + @Override + public void onNext(Downstream downstream) { + + } + + @Override + public void onError(Throwable throwable) { + status.finished(); + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + + try { + for (TraceSegment segment : data) { + //TODO + // segment to PROTOBUFF object + upstreamSegmentStreamObserver.onNext(null); + } + } catch (Throwable t) { + logger.error(t, "Send UpstreamSegment to collector fail."); } + upstreamSegmentStreamObserver.onCompleted(); - @Override - public void onError(Throwable throwable) { - status.setStatus(true); - } + status.wait4Finish(30 * 1000); - @Override - public void onCompleted() { - status.setStatus(true); + if (logger.isDebugEnable()) { + logger.debug("{} trace segments have been sent to collector.", data.size()); } - }); - - try { - for (TraceSegment segment : data) { - //TODO - // segment to PROTOBUF object - upstreamSegmentStreamObserver.onNext(null); + } else { + if (logger.isDebugEnable()) { + logger.debug("{} trace segments have been abandoned, cause by no available channel.", data.size()); } - } catch (Throwable t) { - logger.error(t, "Send UpstreamSegment to collector fail."); - } - upstreamSegmentStreamObserver.onCompleted(); - - status.wait4Finish(30 * 1000); - - if (logger.isDebugEnable()) { - logger.debug("{} trace segments have been sent to collector.", data.size()); } } @@ -110,6 +118,8 @@ public class TraceSegmentServiceClient implements BootService, IConsumer