提交 db8c4546 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Introduce dead line settings from gRPC document. (#2917)

上级 fe66fecc
...@@ -151,7 +151,7 @@ public class JVMService implements BootService, Runnable { ...@@ -151,7 +151,7 @@ public class JVMService implements BootService, Runnable {
public void statusChanged(GRPCChannelStatus status) { public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) { if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
stub = JVMMetricReportServiceGrpc.newBlockingStub(channel); stub = JVMMetricReportServiceGrpc.newBlockingStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);
} }
this.status = status; this.status = status;
} }
......
...@@ -65,8 +65,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, ...@@ -65,8 +65,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
public void statusChanged(GRPCChannelStatus status) { public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) { if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
registerBlockingStub = RegisterGrpc.newBlockingStub(channel); registerBlockingStub = RegisterGrpc.newBlockingStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);
serviceInstancePingStub = ServiceInstancePingGrpc.newBlockingStub(channel); serviceInstancePingStub = ServiceInstancePingGrpc.newBlockingStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);
} else { } else {
registerBlockingStub = null; registerBlockingStub = null;
serviceInstancePingStub = null; serviceInstancePingStub = null;
......
...@@ -21,18 +21,24 @@ package org.apache.skywalking.apm.agent.core.remote; ...@@ -21,18 +21,24 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.util.List; import java.util.List;
import org.apache.skywalking.apm.agent.core.boot.*; import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.context.*; import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.*; import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.common.Commands; import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc; import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.*; import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED; import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/** /**
...@@ -166,7 +172,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe ...@@ -166,7 +172,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
public void statusChanged(GRPCChannelStatus status) { public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) { if (CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
serviceStub = TraceSegmentReportServiceGrpc.newStub(channel); serviceStub = TraceSegmentReportServiceGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);
} }
this.status = status; this.status = status;
} }
......
...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; ...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import lombok.*; import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
...@@ -49,7 +50,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS ...@@ -49,7 +50,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort()); GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
client.connect(); client.connect();
ManagedChannel channel = client.getChannel(); ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel); exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize()); exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200); exportBuffer.consume(this, 1, 200);
......
...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client; ...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
...@@ -100,7 +101,7 @@ public class GRPCRemoteClient implements RemoteClient { ...@@ -100,7 +101,7 @@ public class GRPCRemoteClient implements RemoteClient {
} }
RemoteServiceGrpc.RemoteServiceStub getStub() { RemoteServiceGrpc.RemoteServiceStub getStub() {
return RemoteServiceGrpc.newStub(getChannel()); return RemoteServiceGrpc.newStub(getChannel()).withDeadlineAfter(10, TimeUnit.SECONDS);
} }
DataCarrier<RemoteMessage> getDataCarrier() { DataCarrier<RemoteMessage> getDataCarrier() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册