diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index a19f75d32270f3ad28b52c7ff70d48b7658944a7..c8e04593ce9ca412d89a7b8a8de95a7278dfb5a1 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -132,6 +132,10 @@ public class Config {
* Collector skywalking trace receiver service addresses.
*/
public static String BACKEND_SERVICE = "";
+ /**
+ * How long grpc client will timeout in sending data to upstream.
+ */
+ public static int GRPC_UPSTREAM_TIMEOUT = 30;
}
public static class Jvm {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 040d73360df404038b9c7db6aad2c547d6389107..598cd36394e25d71e3044ccdab1d92f64a4a6365 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
/**
* The JVMService
represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send
* the collected info to Collector through the channel provided by {@link GRPCChannelManager}
@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
- Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
+ Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index 92c22a545f013bc7a15c58fc515122836b893a96..9c68ac3ee33e3e3f0915934fd12fa2200047c046 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
/**
* @author wusheng
*/
@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if (registerBlockingStub != null) {
- ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
+ ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
if (serviceRegisterMapping != null) {
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
if (registerBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
- ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(
ServiceInstance.newBuilder()
@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
}
}
} else {
- final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
- NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
- EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
+ NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
+ EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 3ffacffb5969f9a4495e5588f1f3f2c39c4c1075..edea03b372899f469ea90d4319b275d40d409ac8 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.*;
@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
- StreamObserver upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver() {
+ StreamObserver upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md
index 8d798e0805a4dc3df7db000ec8c3d7bb3b1c1617..dcdfba55d4311a4884ef907b3fea638abb88ff93 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -82,6 +82,7 @@ property key | Description | Default |
`collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
+`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
`logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index c7c26e1f75a07a07e04155681d767f4c2491b308..18581167e831d65703a64bbbb68cdb7d80988967 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int monthMetricsDataTTL;
@Setter private int gRPCThreadPoolSize;
@Setter private int gRPCThreadPoolQueueSize;
+ /**
+ * Timeout for cluster internal communication, in seconds.
+ */
+ @Setter private int remoteTimeout = 20;
CoreModuleConfig() {
this.downsampling = new ArrayList<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 86c611fabfc340d22c6d8dd5f0b2c0713a56b29f..3bc4e9af774f351d02995c95cec273a6bb82075e 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener);
- this.remoteClientManager = new RemoteClientManager(getManager());
+ this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index fd80c9c744248c1446471b2ddb376ce5e2e7c93c..e671ef4bb3229ca79d6171a6bea6d9fddc51281a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient {
private boolean isConnect;
private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter;
+ private int remoteTimeout;
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize,
- int bufferSize) {
+ int bufferSize, int remoteTimeout) {
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
+ this.remoteTimeout = remoteTimeout;
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient {
}
}
- return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver() {
+ return getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS).call(new StreamObserver() {
@Override public void onNext(Empty empty) {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 06a963ff1a72770c7e4d6282ba85e4cfb1afd09c..b585150ae928b50ee8f0d1decc1d516c0a5af30d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service {
private final List clientsB;
private volatile List usingClients;
private GaugeMetrics gauge;
+ private int remoteTimeout;
- public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+ /**
+ * Initial the manager for all remote communication clients.
+ * @param moduleDefineHolder for looking up other modules
+ * @param remoteTimeout for cluster internal communication, in second unit.
+ */
+ public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
+ this.remoteTimeout = remoteTimeout;
}
public void start() {
@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
- RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
+ RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
client.connect();
getFreeClients().add(client);
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index 7f506f9f37bdc862b81ae8b7ab6ec16342875737..bdc53fbb8afd3e25a04bc14aa743b6ef5c3848c0 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -57,7 +57,7 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
- GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index a99a767040c024a0e6b635fa4f87f94e19050b2a..b0a3e6ee817d624f97f76994c82ff769a478821e 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -86,7 +86,7 @@ public class GRPCRemoteClientTestCase {
grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
- GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index e83aa63ff7d9eec3254c5e34a4efedb20379caa7..c561273b19d623d12361bd39daf891055a6e63d8 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
- RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+ RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();