提交 8f49c973 编写于 作者: wu-sheng's avatar wu-sheng

Protect the background processes.

上级 36136911
......@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.FileUtils;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -70,7 +71,10 @@ public enum OffsetManager {
offset.deserialize(offsetRecord);
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::flush,
t -> logger.error("flush offset file in background failure.", t)
), 10, 3, TimeUnit.SECONDS);
}
}
......
......@@ -34,6 +34,7 @@ import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,7 +51,9 @@ public enum SegmentBufferReader {
public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::preRead,
t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
}
public void setSegmentParserListenerManager(SegmentParserListenerManager listenerManager) {
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLo
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<SegmentStandardization, SegmentStandardization> {
private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager);
......@@ -70,7 +71,9 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme
}
private void startTimer(SegmentStandardizationWorker standardizationWorker) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(standardizationWorker::flushAndSwitch, 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(standardizationWorker::flushAndSwitch,
t -> logger.error("Segment standardization failure.", t)), 10, 3, TimeUnit.SECONDS);
}
}
}
......@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,7 +50,9 @@ public class PersistenceTimer {
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(batchDAO, persistenceWorkers), 1, timeInterval, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO, persistenceWorkers),
t -> logger.error("Extract data and save failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
......
......@@ -35,11 +35,15 @@ import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetric
import org.apache.skywalking.apm.collector.storage.dao.memorymp.IMemorySecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpoolmp.IMemoryPoolSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DataTTLKeeperTimer {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener;
......@@ -55,7 +59,9 @@ public class DataTTLKeeperTimer {
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::delete, 1, 8, TimeUnit.HOURS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::delete,
t -> logger.error("Remove data in background failure.", t)), 1, 8, TimeUnit.HOURS);
}
private void delete() {
......
......@@ -49,6 +49,11 @@
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.util;
/**
* @author wusheng
*/
public class RunnableWithExceptionProtection implements Runnable {
private Runnable run;
private CallbackWhenException callback;
public RunnableWithExceptionProtection(Runnable run, CallbackWhenException callback) {
this.run = run;
this.callback = callback;
}
@Override
public void run() {
try {
run.run();
} catch (Throwable t) {
callback.handle(t);
}
}
public interface CallbackWhenException {
void handle(Throwable t);
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.jvm;
import io.grpc.ManagedChannel;
......@@ -43,6 +42,7 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.proto.JVMMetric;
import org.apache.skywalking.apm.network.proto.JVMMetrics;
import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* The <code>JVMService</code> represents a timer,
......@@ -57,6 +57,7 @@ public class JVMService implements BootService, Runnable {
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private Sender sender;
@Override
public void beforeBoot() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
......@@ -68,10 +69,19 @@ public class JVMService implements BootService, Runnable {
public void boot() throws Throwable {
collectMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService produces metrics failure.", t);
}
}), 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
.scheduleAtFixedRate(sender, 0, 1, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("JVMService consumes and upload failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
}
@Override
......
......@@ -46,6 +46,7 @@ import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* @author wusheng
......@@ -87,7 +88,11 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(this, 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
......@@ -25,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
......@@ -32,6 +34,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
* @author wusheng
*/
public class CollectorDiscoveryService implements BootService {
private static final ILog logger = LogManager.getLogger(CollectorDiscoveryService.class);
private ScheduledFuture<?> future;
@Override
......@@ -42,7 +45,12 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void boot() throws Throwable {
future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
.scheduleAtFixedRate(new RunnableWithExceptionProtection(new DiscoveryRestServiceClient(),
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}
......
......@@ -38,6 +38,7 @@ import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* @author wusheng
......@@ -60,7 +61,11 @@ public class GRPCChannelManager implements BootService, Runnable {
public void boot() throws Throwable {
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(this, 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......
......@@ -30,6 +30,7 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s
......@@ -66,12 +67,16 @@ public class SamplingService implements BootService {
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, 0, 3, TimeUnit.SECONDS);
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册