From 8ebf3aac04fed8a61b0b8f60182ae8b22355448d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Fri, 20 Sep 2019 15:02:28 +0800 Subject: [PATCH] Support timeout configuration in agent and backend. (#3491) * Support timeout configuration in agent and backend. * Fix CI * no message --- .../skywalking/apm/agent/core/conf/Config.java | 4 ++++ .../skywalking/apm/agent/core/jvm/JVMService.java | 4 +++- .../remote/ServiceAndEndpointRegisterClient.java | 12 +++++++----- .../agent/core/remote/TraceSegmentServiceClient.java | 3 ++- docs/en/setup/service-agent/java-agent/README.md | 1 + .../skywalking/oap/server/core/CoreModuleConfig.java | 4 ++++ .../oap/server/core/CoreModuleProvider.java | 2 +- .../server/core/remote/client/GRPCRemoteClient.java | 6 ++++-- .../core/remote/client/RemoteClientManager.java | 11 +++++++++-- .../remote/client/GRPCRemoteClientRealClient.java | 2 +- .../core/remote/client/GRPCRemoteClientTestCase.java | 2 +- .../remote/client/RemoteClientManagerTestCase.java | 2 +- 12 files changed, 38 insertions(+), 15 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index a19f75d322..c8e04593ce 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -132,6 +132,10 @@ public class Config { * Collector skywalking trace receiver service addresses. */ public static String BACKEND_SERVICE = ""; + /** + * How long grpc client will timeout in sending data to upstream. + */ + public static int GRPC_UPSTREAM_TIMEOUT = 30; } public static class Jvm { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java index 040d73360d..598cd36394 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java @@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection; import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + /** * The JVMService represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send * the collected info to Collector through the channel provided by {@link GRPCChannelManager} @@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable { if (buffer.size() > 0) { builder.addAllMetrics(buffer); builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); - Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build()); + Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } catch (Throwable t) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java index 92c22a545f..9c68ac3ee3 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java @@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.apm.util.StringUtil; +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + /** * @author wusheng */ @@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, try { if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) { if (registerBlockingStub != null) { - ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister( + ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister( Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build()); if (serviceRegisterMapping != null) { for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) { @@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, if (registerBlockingStub != null) { if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) { - ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) + ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .doServiceInstanceRegister(ServiceInstances.newBuilder() .addInstances( ServiceInstance.newBuilder() @@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, } } } else { - final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS) + final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .doPing(ServiceInstancePingPkg.newBuilder() .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID) .setTime(System.currentTimeMillis()) .setServiceInstanceUUID(INSTANCE_UUID) .build()); - NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); - EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); + NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)); + EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index 3ffacffb59..edea03b372 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.skywalking.apm.agent.core.boot.*; import org.apache.skywalking.apm.agent.core.commands.CommandService; +import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.context.*; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.logging.api.*; @@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); - StreamObserver upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver() { + StreamObserver upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver() { @Override public void onNext(Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md index 8d798e0805..dcdfba55d4 100755 --- a/docs/en/setup/service-agent/java-agent/README.md +++ b/docs/en/setup/service-agent/java-agent/README.md @@ -82,6 +82,7 @@ property key | Description | Default | `collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`| `collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`| `collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`| +`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds| `logging.level`|The log level. Default is debug.|`DEBUG`| `logging.file_name`|Log file name.|`skywalking-api.log`| `logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`| diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index c7c26e1f75..18581167e8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig { @Setter private int monthMetricsDataTTL; @Setter private int gRPCThreadPoolSize; @Setter private int gRPCThreadPoolQueueSize; + /** + * Timeout for cluster internal communication, in seconds. + */ + @Setter private int remoteTimeout = 20; CoreModuleConfig() { this.downsampling = new ArrayList<>(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 86c611fabf..3bc4e9af77 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider { annotationScan.registerListener(streamAnnotationListener); - this.remoteClientManager = new RemoteClientManager(getManager()); + this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout()); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index fd80c9c744..e671ef4bb3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient { private boolean isConnect; private CounterMetrics remoteOutCounter; private CounterMetrics remoteOutErrorCounter; + private int remoteTimeout; public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize, - int bufferSize) { + int bufferSize, int remoteTimeout) { this.address = address; this.channelSize = channelSize; this.bufferSize = bufferSize; + this.remoteTimeout = remoteTimeout; remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.", @@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient { } } - return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver() { + return getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS).call(new StreamObserver() { @Override public void onNext(Empty empty) { } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java index 06a963ff1a..b585150ae9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java @@ -57,12 +57,19 @@ public class RemoteClientManager implements Service { private final List clientsB; private volatile List usingClients; private GaugeMetrics gauge; + private int remoteTimeout; - public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) { + /** + * Initial the manager for all remote communication clients. + * @param moduleDefineHolder for looking up other modules + * @param remoteTimeout for cluster internal communication, in second unit. + */ + public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) { this.moduleDefineHolder = moduleDefineHolder; this.clientsA = new LinkedList<>(); this.clientsB = new LinkedList<>(); this.usingClients = clientsA; + this.remoteTimeout = remoteTimeout; } public void start() { @@ -203,7 +210,7 @@ public class RemoteClientManager implements Service { RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address); getFreeClients().add(client); } else { - RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000); + RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout); client.connect(); getFreeClients().add(client); } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java index 7f506f9f37..bdc53fbb8a 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java @@ -57,7 +57,7 @@ public class GRPCRemoteClientRealClient { moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine); telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator); - GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); + GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10)); remoteClient.connect(); for (int i = 0; i < 10000; i++) { diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java index a99a767040..b0a3e6ee81 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java @@ -86,7 +86,7 @@ public class GRPCRemoteClientTestCase { grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager)); Address address = new Address("not-important", 11, false); - GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); + GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10)); remoteClient.connect(); doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel(); diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java index e83aa63ff7..c561273b19 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java @@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase { moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine); telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator); - RemoteClientManager clientManager = new RemoteClientManager(moduleManager); + RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10); when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances()); clientManager.refresh(); -- GitLab