未验证 提交 8ebf3aac 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support timeout configuration in agent and backend. (#3491)

* Support timeout configuration in agent and backend.

* Fix CI

* no message
上级 09064e97
...@@ -132,6 +132,10 @@ public class Config { ...@@ -132,6 +132,10 @@ public class Config {
* Collector skywalking trace receiver service addresses. * Collector skywalking trace receiver service addresses.
*/ */
public static String BACKEND_SERVICE = ""; public static String BACKEND_SERVICE = "";
/**
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
} }
public static class Jvm { public static class Jvm {
......
...@@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection; ...@@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc; import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/** /**
* The <code>JVMService</code> represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send * The <code>JVMService</code> represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send
* the collected info to Collector through the channel provided by {@link GRPCChannelManager} * the collected info to Collector through the channel provided by {@link GRPCChannelManager}
...@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable { ...@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) { if (buffer.size() > 0) {
builder.addAllMetrics(buffer); builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build()); Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
} }
} catch (Throwable t) { } catch (Throwable t) {
......
...@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services; ...@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.apm.util.StringUtil;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/** /**
* @author wusheng * @author wusheng
*/ */
...@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, ...@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
try { try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) { if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if (registerBlockingStub != null) { if (registerBlockingStub != null) {
ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister( ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build()); Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
if (serviceRegisterMapping != null) { if (serviceRegisterMapping != null) {
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) { for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
...@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, ...@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
if (registerBlockingStub != null) { if (registerBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) { if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doServiceInstanceRegister(ServiceInstances.newBuilder() .doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances( .addInstances(
ServiceInstance.newBuilder() ServiceInstance.newBuilder()
...@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, ...@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
} }
} }
} else { } else {
final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS) final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doPing(ServiceInstancePingPkg.newBuilder() .doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID) .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis()) .setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID) .setServiceInstanceUUID(INSTANCE_UUID)
.build()); .build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
} }
} }
......
...@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; ...@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.*; import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.commands.CommandService; import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.*; import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.*; import org.apache.skywalking.apm.agent.core.logging.api.*;
...@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe ...@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
public void consume(List<TraceSegment> data) { public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) { if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override @Override
public void onNext(Commands commands) { public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
......
...@@ -82,6 +82,7 @@ property key | Description | Default | ...@@ -82,6 +82,7 @@ property key | Description | Default |
`collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`| `collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`| `collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`| `collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
`logging.level`|The log level. Default is debug.|`DEBUG`| `logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`| `logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`| `logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
......
...@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig { ...@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int monthMetricsDataTTL; @Setter private int monthMetricsDataTTL;
@Setter private int gRPCThreadPoolSize; @Setter private int gRPCThreadPoolSize;
@Setter private int gRPCThreadPoolQueueSize; @Setter private int gRPCThreadPoolQueueSize;
/**
* Timeout for cluster internal communication, in seconds.
*/
@Setter private int remoteTimeout = 20;
CoreModuleConfig() { CoreModuleConfig() {
this.downsampling = new ArrayList<>(); this.downsampling = new ArrayList<>();
......
...@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener); annotationScan.registerListener(streamAnnotationListener);
this.remoteClientManager = new RemoteClientManager(getManager()); this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
......
...@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient { ...@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient {
private boolean isConnect; private boolean isConnect;
private CounterMetrics remoteOutCounter; private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter; private CounterMetrics remoteOutErrorCounter;
private int remoteTimeout;
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize, public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize,
int bufferSize) { int bufferSize, int remoteTimeout) {
this.address = address; this.address = address;
this.channelSize = channelSize; this.channelSize = channelSize;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.remoteTimeout = remoteTimeout;
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.", .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
...@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient { ...@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient {
} }
} }
return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() { return getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) { @Override public void onNext(Empty empty) {
} }
......
...@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service { ...@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service {
private final List<RemoteClient> clientsB; private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients; private volatile List<RemoteClient> usingClients;
private GaugeMetrics gauge; private GaugeMetrics gauge;
private int remoteTimeout;
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) { /**
* Initial the manager for all remote communication clients.
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
this.moduleDefineHolder = moduleDefineHolder; this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>(); this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>(); this.clientsB = new LinkedList<>();
this.usingClients = clientsA; this.usingClients = clientsA;
this.remoteTimeout = remoteTimeout;
} }
public void start() { public void start() {
...@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service { ...@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address); RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client); getFreeClients().add(client);
} else { } else {
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000); RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
client.connect(); client.connect();
getFreeClients().add(client); getFreeClients().add(client);
} }
......
...@@ -57,7 +57,7 @@ public class GRPCRemoteClientRealClient { ...@@ -57,7 +57,7 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine); moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator); telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10));
remoteClient.connect(); remoteClient.connect();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
......
...@@ -86,7 +86,7 @@ public class GRPCRemoteClientTestCase { ...@@ -86,7 +86,7 @@ public class GRPCRemoteClientTestCase {
grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager)); grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false); Address address = new Address("not-important", 11, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10)); GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, address, 1, 10, 10));
remoteClient.connect(); remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel(); doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
......
...@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase { ...@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine); moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator); telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
RemoteClientManager clientManager = new RemoteClientManager(moduleManager); RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances()); when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh(); clientManager.refresh();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册