diff --git a/apm-network/src/test/java/org/skywalking/apm/network/trace/proto/GRPCNoServerTest.java b/apm-network/src/test/java/org/skywalking/apm/network/trace/proto/GRPCNoServerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1e4d66d8c08fcee1ec66145597ded70e342c2b30 --- /dev/null +++ b/apm-network/src/test/java/org/skywalking/apm/network/trace/proto/GRPCNoServerTest.java @@ -0,0 +1,47 @@ +package org.skywalking.apm.network.trace.proto; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.Assert; +import org.skywalking.apm.network.collecor.proto.Downstream; + +/** + * @author wusheng + */ +public class GRPCNoServerTest { + public static void main(String[] args) throws InterruptedException { + ManagedChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress("127.0.0.1", 8080) + .nameResolverFactory(new DnsNameResolverProvider()) + .maxInboundMessageSize(1024 * 1024 * 50) + .usePlaintext(true); + ManagedChannel channel = channelBuilder.build(); + TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub = TraceSegmentServiceGrpc.newStub(channel); + final Status[] status = {null}; + StreamObserver streamObserver = serviceStub.collect(new StreamObserver() { + @Override public void onNext(Downstream value) { + + } + + @Override public void onError(Throwable t) { + status[0] = ((StatusRuntimeException)t).getStatus(); + } + + @Override public void onCompleted() { + + } + }); + + streamObserver.onNext(null); + streamObserver.onCompleted(); + + Thread.sleep(2 * 1000); + + Assert.assertEquals(status[0].getCode(), Status.UNAVAILABLE.getCode()); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java index eb40aeb87644f87d4143859d0f1f0fd5f572cf61..8a92a41e44b50055b3b6016109a60d15c25e6b8f 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -2,6 +2,8 @@ package org.skywalking.apm.agent.core.remote; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.DnsNameResolverProvider; import io.grpc.netty.NettyChannelBuilder; import java.util.Collections; @@ -32,7 +34,7 @@ public class GRPCChannelManager implements BootService, Runnable { @Override public void boot() throws Throwable { - this.startupInBackground(false); + this.connectInBackground(false); } @Override @@ -40,7 +42,7 @@ public class GRPCChannelManager implements BootService, Runnable { } - private void startupInBackground(boolean forceStart) { + private void connectInBackground(boolean forceStart) { if (channelManagerThread == null || !channelManagerThread.isAlive()) { synchronized (this) { if (forceStart) { @@ -106,8 +108,37 @@ public class GRPCChannelManager implements BootService, Runnable { return managedChannel; } - public void reportError() { - this.startupInBackground(true); + /** + * If the given expcetion is triggered by network problem, connect in background. + * @param throwable + */ + public void reportError(Throwable throwable) { + if (isNetworkError(throwable)) { + this.connectInBackground(true); + } + } + + private boolean isNetworkError(Throwable throwable) { + if (throwable instanceof StatusRuntimeException) { + StatusRuntimeException statusRuntimeException = (StatusRuntimeException)throwable; + return statusEquals(statusRuntimeException.getStatus(), + Status.UNAVAILABLE, + Status.PERMISSION_DENIED, + Status.UNAUTHENTICATED, + Status.RESOURCE_EXHAUSTED, + Status.UNKNOWN + ); + } + return false; + } + + private boolean statusEquals(Status sourceStatus, Status... potentialStatus) { + for (Status status : potentialStatus) { + if (sourceStatus.getCode() == status.getCode()) { + return true; + } + } + return false; } private void resetNextStartTime() { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index 8094dfab1a64a4a76c858b9cbd2ec48b93f7e676..c827e4a770b80a4cf8cee790bf9132c8e7b4d8b9 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -66,7 +66,10 @@ public class TraceSegmentServiceClient implements BootService, IConsumer