提交 6d2ab563 编写于 作者: wu-sheng's avatar wu-sheng

Finish codes of grpc segment client about channel control.

上级 34a0a0b4
package org.skywalking.apm.network.trace.proto;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.Assert;
import org.skywalking.apm.network.collecor.proto.Downstream;
/**
* @author wusheng
*/
public class GRPCNoServerTest {
public static void main(String[] args) throws InterruptedException {
ManagedChannelBuilder<?> channelBuilder =
NettyChannelBuilder.forAddress("127.0.0.1", 8080)
.nameResolverFactory(new DnsNameResolverProvider())
.maxInboundMessageSize(1024 * 1024 * 50)
.usePlaintext(true);
ManagedChannel channel = channelBuilder.build();
TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub = TraceSegmentServiceGrpc.newStub(channel);
final Status[] status = {null};
StreamObserver<UpstreamSegment> streamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream value) {
}
@Override public void onError(Throwable t) {
status[0] = ((StatusRuntimeException)t).getStatus();
}
@Override public void onCompleted() {
}
});
streamObserver.onNext(null);
streamObserver.onCompleted();
Thread.sleep(2 * 1000);
Assert.assertEquals(status[0].getCode(), Status.UNAVAILABLE.getCode());
}
}
......@@ -2,6 +2,8 @@ package org.skywalking.apm.agent.core.remote;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import java.util.Collections;
......@@ -32,7 +34,7 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
this.startupInBackground(false);
this.connectInBackground(false);
}
@Override
......@@ -40,7 +42,7 @@ public class GRPCChannelManager implements BootService, Runnable {
}
private void startupInBackground(boolean forceStart) {
private void connectInBackground(boolean forceStart) {
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
synchronized (this) {
if (forceStart) {
......@@ -106,8 +108,37 @@ public class GRPCChannelManager implements BootService, Runnable {
return managedChannel;
}
public void reportError() {
this.startupInBackground(true);
/**
* If the given expcetion is triggered by network problem, connect in background.
* @param throwable
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
this.connectInBackground(true);
}
}
private boolean isNetworkError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException)throwable;
return statusEquals(statusRuntimeException.getStatus(),
Status.UNAVAILABLE,
Status.PERMISSION_DENIED,
Status.UNAUTHENTICATED,
Status.RESOURCE_EXHAUSTED,
Status.UNKNOWN
);
}
return false;
}
private boolean statusEquals(Status sourceStatus, Status... potentialStatus) {
for (Status status : potentialStatus) {
if (sourceStatus.getCode() == status.getCode()) {
return true;
}
}
return false;
}
private void resetNextStartTime() {
......
......@@ -66,7 +66,10 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void onError(Throwable throwable) {
status.finished();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册