未验证 提交 1ed8df80 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #741 from apache/fix/ScheduledFuture-failsafe

Protect the background processes.
...@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils; ...@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.FileUtils; import org.apache.skywalking.apm.collector.core.util.FileUtils;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -70,7 +71,10 @@ public enum OffsetManager { ...@@ -70,7 +71,10 @@ public enum OffsetManager {
offset.deserialize(offsetRecord); offset.deserialize(offsetRecord);
initialized = true; initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::flush,
t -> logger.error("flush offset file in background failure.", t)
), 10, 3, TimeUnit.SECONDS);
} }
} }
......
...@@ -34,6 +34,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils; ...@@ -34,6 +34,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils; import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.network.proto.UpstreamSegment; import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -50,7 +51,9 @@ public enum SegmentBufferReader { ...@@ -50,7 +51,9 @@ public enum SegmentBufferReader {
public void initialize(ModuleManager moduleManager) { public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::preRead,
t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
} }
public void setSegmentParserListenerManager(SegmentParserListenerManager listenerManager) { public void setSegmentParserListenerManager(SegmentParserListenerManager listenerManager) {
......
...@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLo ...@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLo
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider; import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException; import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory; ...@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/ */
public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<SegmentStandardization, SegmentStandardization> { public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<SegmentStandardization, SegmentStandardization> {
private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class); private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
public SegmentStandardizationWorker(ModuleManager moduleManager) { public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager); super(moduleManager);
...@@ -70,7 +71,9 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme ...@@ -70,7 +71,9 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
} }
private void startTimer(SegmentStandardizationWorker standardizationWorker) { private void startTimer(SegmentStandardizationWorker standardizationWorker) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(standardizationWorker::flushAndSwitch, 10, 3, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(standardizationWorker::flushAndSwitch,
t -> logger.error("Segment standardization failure.", t)), 10, 3, TimeUnit.SECONDS);
} }
} }
} }
...@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc ...@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc
import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule; import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO; import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -49,7 +50,9 @@ public class PersistenceTimer { ...@@ -49,7 +50,9 @@ public class PersistenceTimer {
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000; // final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3; final long timeInterval = 3;
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class); IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO, persistenceWorkers),
t -> logger.error("Extract data and save failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
} }
private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) { private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
......
...@@ -35,11 +35,15 @@ import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetric ...@@ -35,11 +35,15 @@ import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetric
import org.apache.skywalking.apm.collector.storage.dao.memorymp.IMemorySecondMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.memorymp.IMemorySecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpoolmp.IMemoryPoolSecondMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.mpoolmp.IMemoryPoolSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO; import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class DataTTLKeeperTimer { public class DataTTLKeeperTimer {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener; private final StorageModuleEsNamingListener namingListener;
...@@ -55,7 +59,9 @@ public class DataTTLKeeperTimer { ...@@ -55,7 +59,9 @@ public class DataTTLKeeperTimer {
} }
public void start() { public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::delete, 1, 8, TimeUnit.HOURS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::delete,
t -> logger.error("Remove data in background failure.", t)), 1, 8, TimeUnit.HOURS);
} }
private void delete() { private void delete() {
......
...@@ -49,6 +49,11 @@ ...@@ -49,6 +49,11 @@
</modules> </modules>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.util;
/**
* @author wusheng
*/
public class RunnableWithExceptionProtection implements Runnable {
private Runnable run;
private CallbackWhenException callback;
public RunnableWithExceptionProtection(Runnable run, CallbackWhenException callback) {
this.run = run;
this.callback = callback;
}
@Override
public void run() {
try {
run.run();
} catch (Throwable t) {
callback.handle(t);
}
}
public interface CallbackWhenException {
void handle(Throwable t);
}
}
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
* *
*/ */
package org.apache.skywalking.apm.agent.core.jvm; package org.apache.skywalking.apm.agent.core.jvm;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
...@@ -43,6 +42,7 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; ...@@ -43,6 +42,7 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.proto.JVMMetric; import org.apache.skywalking.apm.network.proto.JVMMetric;
import org.apache.skywalking.apm.network.proto.JVMMetrics; import org.apache.skywalking.apm.network.proto.JVMMetrics;
import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc; import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/** /**
* The <code>JVMService</code> represents a timer, * The <code>JVMService</code> represents a timer,
...@@ -57,6 +57,7 @@ public class JVMService implements BootService, Runnable { ...@@ -57,6 +57,7 @@ public class JVMService implements BootService, Runnable {
private volatile ScheduledFuture<?> collectMetricFuture; private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture; private volatile ScheduledFuture<?> sendMetricFuture;
private Sender sender; private Sender sender;
@Override @Override
public void beforeBoot() throws Throwable { public void beforeBoot() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE); queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
...@@ -68,10 +69,19 @@ public class JVMService implements BootService, Runnable { ...@@ -68,10 +69,19 @@ public class JVMService implements BootService, Runnable {
public void boot() throws Throwable { public void boot() throws Throwable {
collectMetricFuture = Executors collectMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce")) .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS); .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService produces metrics failure.", t);
}
}), 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors sendMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume")) .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
.scheduleAtFixedRate(sender, 0, 1, TimeUnit.SECONDS); .scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService consumes and upload failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
} }
@Override @Override
......
...@@ -46,6 +46,7 @@ import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc; ...@@ -46,6 +46,7 @@ import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc; import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc; import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc; import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/** /**
* @author wusheng * @author wusheng
...@@ -87,7 +88,11 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList ...@@ -87,7 +88,11 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
public void boot() throws Throwable { public void boot() throws Throwable {
applicationRegisterFuture = Executors applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient")) .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(this, 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS); .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
} }
@Override @Override
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
* *
*/ */
package org.apache.skywalking.apm.agent.core.remote; package org.apache.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -25,6 +24,9 @@ import java.util.concurrent.TimeUnit; ...@@ -25,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/** /**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}. * The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
...@@ -32,6 +34,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config; ...@@ -32,6 +34,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
* @author wusheng * @author wusheng
*/ */
public class CollectorDiscoveryService implements BootService { public class CollectorDiscoveryService implements BootService {
private static final ILog logger = LogManager.getLogger(CollectorDiscoveryService.class);
private ScheduledFuture<?> future; private ScheduledFuture<?> future;
@Override @Override
...@@ -42,7 +45,12 @@ public class CollectorDiscoveryService implements BootService { ...@@ -42,7 +45,12 @@ public class CollectorDiscoveryService implements BootService {
@Override @Override
public void boot() throws Throwable { public void boot() throws Throwable {
future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService")) future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0, .scheduleAtFixedRate(new RunnableWithExceptionProtection(new DiscoveryRestServiceClient(),
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS); Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
} }
......
...@@ -38,6 +38,7 @@ import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; ...@@ -38,6 +38,7 @@ import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/** /**
* @author wusheng * @author wusheng
...@@ -60,7 +61,11 @@ public class GRPCChannelManager implements BootService, Runnable { ...@@ -60,7 +61,11 @@ public class GRPCChannelManager implements BootService, Runnable {
public void boot() throws Throwable { public void boot() throws Throwable {
connectCheckFuture = Executors connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager")) .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(this, 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS); .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
} }
@Override @Override
......
...@@ -30,6 +30,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config; ...@@ -30,6 +30,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/** /**
* The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s * The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s
...@@ -66,12 +67,16 @@ public class SamplingService implements BootService { ...@@ -66,12 +67,16 @@ public class SamplingService implements BootService {
this.resetSamplingFactor(); this.resetSamplingFactor();
ScheduledExecutorService service = Executors ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService")); .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new Runnable() { scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override @Override
public void run() { public void run() {
resetSamplingFactor(); resetSamplingFactor();
} }
}, 0, 3, TimeUnit.SECONDS); }, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS); logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册