From 87a8dcf207780d5c33d65e25139b9db348d141fd Mon Sep 17 00:00:00 2001 From: Sail <37333508+SailVR@users.noreply.github.com> Date: Fri, 10 Apr 2020 22:58:12 +0800 Subject: [PATCH] [IOTDB-573]Wrap Runnable (#1015) * add WrappedRunnable class --- .../iotdb/db/concurrent/WrappedRunnable.java | 42 +++++++++++++++++++ .../iotdb/db/cost/statistic/Measurement.java | 9 ++-- .../iotdb/db/engine/flush/FlushManager.java | 6 ++- .../storagegroup/StorageGroupProcessor.java | 1 + .../iotdb/db/engine/upgrade/UpgradeTask.java | 5 ++- .../apache/iotdb/db/monitor/StatMonitor.java | 5 ++- .../query/dataset/NonAlignEngineDataSet.java | 5 ++- .../RawQueryDataSetWithoutValueFilter.java | 5 ++- .../iotdb/db/service/MetricsService.java | 5 ++- .../db/tools/memestimation/MemEstToolCmd.java | 5 ++- .../IoTDBThreadPoolFactoryTest.java | 4 +- .../adapter/ActiveTimeSeriesCounterTest.java | 9 ++-- 12 files changed, 78 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/apache/iotdb/db/concurrent/WrappedRunnable.java diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/WrappedRunnable.java b/server/src/main/java/org/apache/iotdb/db/concurrent/WrappedRunnable.java new file mode 100644 index 0000000000..7b143a3fd6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/WrappedRunnable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.concurrent; + +import com.google.common.base.Throwables; +import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class WrappedRunnable implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(WrappedRunnable.class); + + public final void run() { + try { + runMayThrow(); + } catch (Exception e) { + LOGGER.error("error", e); + throw Throwables.propagate(e); + } + } + + abstract public void runMayThrow() throws Exception; + +} diff --git a/server/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java b/server/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java index 026421435d..716a509edc 100644 --- a/server/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java +++ b/server/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -380,18 +381,18 @@ public class Measurement implements MeasurementMBean, IService { "================================================================================================================="); } - class DisplayRunnable implements Runnable { + class DisplayRunnable extends WrappedRunnable { @Override - public void run() { + public void runMayThrow() { showMeasurements(); } } - class QueueConsumerThread implements Runnable { + class QueueConsumerThread extends WrappedRunnable { @Override - public void run() { + public void runMayThrow() { consumer(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java index 677c6f672f..b41708f5d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.flush; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; @@ -81,10 +83,10 @@ public class FlushManager implements FlushManagerMBean, IService { return FlushSubTaskPoolManager.getInstance().getWaitingTasksNumber(); } - class FlushThread implements Runnable { + class FlushThread extends WrappedRunnable{ @Override - public void run() { + public void runMayThrow() { TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll(); tsFileProcessor.flushOneMemTable(); tsFileProcessor.setManagedByFlushManager(false); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 27d0acfa1f..72139848fb 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -239,6 +239,7 @@ public class StorageGroupProcessor { } recover(); + } private void recover() throws StorageGroupProcessorException { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java index 0d0c38e23d..d13e9828be 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.engine.upgrade; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.service.UpgradeSevice; import org.apache.iotdb.db.utils.UpgradeUtils; @@ -25,7 +26,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class UpgradeTask implements Runnable { +public class UpgradeTask extends WrappedRunnable { private final TsFileResource upgradeResource; private static final Logger logger = LoggerFactory.getLogger(UpgradeTask.class); @@ -37,7 +38,7 @@ public class UpgradeTask implements Runnable { } @Override - public void run() { + public void runMayThrow() { try { upgradeResource.getWriteQueryLock().readLock().lock(); String tsfilePathBefore = upgradeResource.getFile().getAbsolutePath(); diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 9372313051..3d59a8906a 100644 --- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -333,12 +334,12 @@ public class StatMonitor implements IService { private static final StatMonitor INSTANCE = new StatMonitor(); } - class StatBackLoop implements Runnable { + class StatBackLoop extends WrappedRunnable { FileSize fileSize = FileSize.getInstance(); @Override - public void run() { + public void runMayThrow() { try { long currentTimeMillis = System.currentTimeMillis(); long seconds = (currentTimeMillis - runningTimeMillis) / 1000; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java index d3c52ab4b4..3550b3afca 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.dataset; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; @@ -46,7 +47,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; public class NonAlignEngineDataSet extends QueryDataSet { - private class ReadTask implements Runnable { + private class ReadTask extends WrappedRunnable { private final ManagedSeriesReader reader; private BlockingQueue> blockingQueue; @@ -64,7 +65,7 @@ public class NonAlignEngineDataSet extends QueryDataSet { } @Override - public void run() { + public void runMayThrow() { PublicBAOS timeBAOS = new PublicBAOS(); PublicBAOS valueBAOS = new PublicBAOS(); try { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java index 057dc6f06c..6f8c756159 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.dataset; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; @@ -43,7 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue; public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { - private static class ReadTask implements Runnable { + private static class ReadTask extends WrappedRunnable { private final ManagedSeriesReader reader; private final String pathName; @@ -57,7 +58,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet { } @Override - public void run() { + public void runMayThrow() { try { synchronized (reader) { // if the task is submitted, there must be free space in the queue diff --git a/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java b/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java index 90541fee80..aede01ace3 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/MetricsService.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -171,7 +172,7 @@ public class MetricsService implements MetricsServiceMBean, IService { private MetricsServiceHolder() {} } - private class MetricsServiceThread implements Runnable { + private class MetricsServiceThread extends WrappedRunnable { private Server server; @@ -180,7 +181,7 @@ public class MetricsService implements MetricsServiceMBean, IService { } @Override - public void run() { + public void runMayThrow() { try { Thread.currentThread().setName(ThreadName.METRICS_SERVICE.getName()); server.start(); diff --git a/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java index 2df8e9feae..bdc1ce081b 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/memestimation/MemEstToolCmd.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.tools.memestimation; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -28,7 +29,7 @@ import org.apache.iotdb.db.exception.ConfigAdjusterException; import org.apache.iotdb.db.metadata.MManager; @Command(name = "calmem", description = "calculate minimum memory required for writing based on the number of storage groups and timeseries") -public class MemEstToolCmd implements Runnable { +public class MemEstToolCmd extends WrappedRunnable { @Option(title = "storage group number", name = {"-sg", "--storagegroup"}, description = "Storage group number") @@ -43,7 +44,7 @@ public class MemEstToolCmd implements Runnable { private String maxTsNumString = "0"; @Override - public void run() { + public void runMayThrow() { // backup origin config parameters IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); long memTableSize = config.getMemtableSizeThreshold(); diff --git a/server/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java b/server/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java index 08252881fd..def7c4dac3 100644 --- a/server/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactoryTest.java @@ -202,7 +202,7 @@ public class IoTDBThreadPoolFactoryTest { } } - class TestThread implements Runnable { + class TestThread extends WrappedRunnable { private String name; @@ -211,7 +211,7 @@ public class IoTDBThreadPoolFactoryTest { } @Override - public void run() { + public void runMayThrow() { throw new RuntimeException(name); } diff --git a/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java index ae75ecab7e..0cd020f5d3 100644 --- a/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java +++ b/server/src/test/java/org/apache/iotdb/db/conf/adapter/ActiveTimeSeriesCounterTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + +import org.apache.iotdb.db.concurrent.WrappedRunnable; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -96,7 +98,7 @@ public class ActiveTimeSeriesCounterTest { } } - private static class OfferThreads implements Runnable { + private static class OfferThreads extends WrappedRunnable { private int sensorNum; private String storageGroup; private CountDownLatch finished; @@ -108,15 +110,16 @@ public class ActiveTimeSeriesCounterTest { } @Override - public void run() { + public void runMayThrow() { try { for (int j = 0; j < sensorNum; j++) { ActiveTimeSeriesCounter.getInstance().offer(storageGroup, "device_0", "sensor_" + j); } - } finally { + }finally { finished.countDown(); } } } + } -- GitLab