提交 01b963c1 编写于 作者: G Gao Hongtao 提交者: wu-sheng

Adding deadline to gRPC client (#2987)

* Set up 10 seconds deadline after gRPC client sending
 * The duration of deadline contains three segments: connecting,
   request and response

For blocking stub, I just set deadline before invoke service. For
bi-streaming stub, I found all of them are used as a blocking style,
that after getting streaming response client stub just close current
streaming. Base on above reality, I pick the same way as blocking one.
上级 9d7649d9
......@@ -138,7 +138,7 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
stub.collect(builder.build());
stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
}
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
......
......@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
......@@ -112,7 +113,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if (registerBlockingStub != null) {
ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(
ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
if (serviceRegisterMapping != null) {
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
......@@ -127,7 +128,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
if (registerBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder()
ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(
ServiceInstance.newBuilder()
.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
......@@ -144,14 +146,14 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
}
}
} else {
serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder()
serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
}
}
}
......
......@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
......@@ -85,7 +87,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Commands>() {
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
......@@ -69,7 +70,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
public void initSubscriptionList() {
SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
subscription.getMetricNamesList().forEach(subscriptionSet::add);
logger.debug("Get exporter subscription list, {}", subscriptionSet);
}
......@@ -84,7 +85,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
ExportStatus status = new ExportStatus();
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.export(
StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
new StreamObserver<ExportResponse>() {
@Override public void onNext(ExportResponse response) {
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
......@@ -183,7 +184,7 @@ public class GRPCRemoteClient implements RemoteClient {
}
}
return getStub().call(new StreamObserver<Empty>() {
return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册