From 43aa7db32f3541d08c4b0005893ee833655a6673 Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Tue, 5 Sep 2023 14:16:04 +0800 Subject: [PATCH] Added file overlap analysis tool (#11030) --- .../tools/tsfile/overlap-statistic-tool.bat | 62 +++++ .../tools/tsfile/overlap-statistic-tool.sh | 51 ++++ .../compaction/tool/ITimeRange.java | 30 +++ .../dataregion/compaction/tool/Interval.java | 49 ++++ .../compaction/tool/ListTimeRangeImpl.java | 89 +++++++ .../compaction/tool/OverlapStatistic.java | 91 +++++++ .../compaction/tool/OverlapStatisticTool.java | 248 ++++++++++++++++++ .../dataregion/compaction/tool/PrintUtil.java | 209 +++++++++++++++ .../SequenceFileSubTaskThreadExecutor.java | 41 +++ .../tool/SequenceFileTaskSummary.java | 58 ++++ .../tool/SingleSequenceFileTask.java | 104 ++++++++ .../tool/TimePartitionProcessTask.java | 143 ++++++++++ .../tool/TimePartitionProcessWorker.java | 58 ++++ .../tool/TsFileStatisticReader.java | 96 +++++++ .../compaction/tool/UnseqSpaceStatistics.java | 84 ++++++ .../tools/ListTimeRangeImplTest.java | 138 ++++++++++ .../tools/UnseqSpaceStatisticsTest.java | 63 +++++ 17 files changed, 1614 insertions(+) create mode 100644 iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.bat create mode 100644 iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.sh create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ITimeRange.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ListTimeRangeImpl.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatistic.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatisticTool.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/PrintUtil.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileSubTaskThreadExecutor.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileTaskSummary.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SingleSequenceFileTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessWorker.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TsFileStatisticReader.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/UnseqSpaceStatistics.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/ListTimeRangeImplTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/UnseqSpaceStatisticsTest.java diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.bat b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.bat new file mode 100644 index 0000000000..500c3c7d12 --- /dev/null +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.bat @@ -0,0 +1,62 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM + + +@echo off +echo ```````````````````````` +echo Starting Validating the TsFile +echo ```````````````````````` + +if "%OS%" == "Windows_NT" setlocal + +pushd %~dp0..\.. +if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% +popd + +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.storageengine.dataregion.compaction.tool.OverlapStatisticTool +if NOT DEFINED JAVA_HOME goto :err + +@REM ----------------------------------------------------------------------------- +@REM ***** CLASSPATH library setting ***** +@REM Ensure that any user defined CLASSPATH variables are not used on startup +set CLASSPATH="%IOTDB_HOME%\lib\*" + +goto okClasspath + +:append +set CLASSPATH=%CLASSPATH%;%1 +goto :eof + +@REM ----------------------------------------------------------------------------- +:okClasspath + +"%JAVA_HOME%\bin\java" -Xmx16G -cp "%CLASSPATH%" %MAIN_CLASS% %* + +goto finally + + +:err +echo JAVA_HOME environment variable must be set! +pause + + +@REM ----------------------------------------------------------------------------- +:finally + +ENDLOCAL diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.sh b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.sh new file mode 100644 index 0000000000..cd34eab61a --- /dev/null +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/overlap-statistic-tool.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +echo --------------------- +echo Starting Validating the TsFile +echo --------------------- + +source "$(dirname "$0")/../../sbin/iotdb-common.sh" +#get_iotdb_include and checkAllVariables is in iotdb-common.sh +VARS=$(get_iotdb_include "$*") +checkAllVariables +export IOTDB_HOME="${IOTDB_HOME}/.." +eval set -- "$VARS" + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done + +MAIN_CLASS=org.apache.iotdb.db.storageengine.dataregion.compaction.tool.OverlapStatisticTool + +"$JAVA" -Xmx16G -cp "$CLASSPATH" "$MAIN_CLASS" "$@" +exit $? diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ITimeRange.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ITimeRange.java new file mode 100644 index 0000000000..a04288da46 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ITimeRange.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +public interface ITimeRange { + + // Add a time period to the current time range object. The increase process requires maintenance + // of the current TimeRange to facilitate efficient overlap check in the future + void addInterval(Interval interval); + + // Determines whether the incoming time range overlaps with the current time range + boolean isOverlapped(Interval interval); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java new file mode 100644 index 0000000000..8f2a2da301 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +public class Interval { + private long start; + private long end; + + public Interval(long start, long end) { + this.start = start; + this.end = end; + if (end < start) { + throw new IllegalArgumentException("end must greater than start"); + } + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public void setStart(long start) { + this.start = start; + } + + public void setEnd(long end) { + this.end = end; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ListTimeRangeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ListTimeRangeImpl.java new file mode 100644 index 0000000000..bca7f72aa6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/ListTimeRangeImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +public class ListTimeRangeImpl implements ITimeRange { + + List intervalList = new LinkedList<>(); + // 0-10, 20-30, 50-70 + // 25-60 + // 0-10. 20-70 + + @Override + public void addInterval(Interval interval) { + List mergedIntervals = new ArrayList<>(); + int index = 0; + + // 1. elements that do not overlap with the newly added element are placed directly in the + // result + while (index < intervalList.size() && intervalList.get(index).getEnd() < interval.getStart()) { + mergedIntervals.add(intervalList.get(index)); + index++; + } + + // 2. if the element overlaps with an existing element, start equals the minimum value of the + // overlap and end equals the maximum value of the overlap + while (index < intervalList.size() && intervalList.get(index).getStart() <= interval.getEnd()) { + interval.setStart(Math.min(intervalList.get(index).getStart(), interval.getStart())); + interval.setEnd(Math.max(intervalList.get(index).getEnd(), interval.getEnd())); + index++; + } + mergedIntervals.add(interval); + + // 3. add the remaining elements to the result set + while (index < intervalList.size()) { + mergedIntervals.add(intervalList.get(index)); + index++; + } + + intervalList.clear(); + intervalList.addAll(mergedIntervals); + } + + public List getIntervalList() { + return intervalList; + } + + /** + * case 1: interval.getStart() <= currentInterval.getEnd() + * + *

currentInterval: [5,10], interval: [6,15],[1,7],[0,5],[10,15] + * + *

case 2: interval.getEnd() <= currentInterval.getEnd() + * + *

currentInterval: [5,10], interval:[1,9],[0,9],[1,10] + */ + @Override + public boolean isOverlapped(Interval interval) { + for (Interval currentInterval : intervalList) { + boolean isOverlap = + interval.getStart() <= currentInterval.getEnd() + && interval.getEnd() >= currentInterval.getStart(); + if (isOverlap) { + return true; + } + } + return false; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatistic.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatistic.java new file mode 100644 index 0000000000..2701e37955 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatistic.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.HashSet; + +public class OverlapStatistic { + long totalSequenceFile; + long totalSequenceFileSize; + long totalChunkGroupsInSequenceFile; + long totalChunksInSequenceFile; + + HashSet sequenceNumber = new HashSet<>(); + long sequenceMinStartTime = Long.MAX_VALUE; + long sequenceMaxEndTime = Long.MIN_VALUE; + + long totalUnsequenceFile; + long totalUnsequenceFileSize; + long totalChunkGroupsInUnSequenceFile; + long totalChunksInUnSequenceFile; + long unSequenceMinStartTime = Long.MAX_VALUE; + long unSequenceMaxEndTime = Long.MIN_VALUE; + + long overlappedSequenceFiles; + long overlappedChunkGroupsInSequenceFile; + long overlappedChunksInSequenceFile; + + public void merge(OverlapStatistic other) { + this.totalSequenceFile += other.totalSequenceFile; + this.totalSequenceFileSize += other.totalSequenceFileSize; + this.totalChunkGroupsInSequenceFile += other.totalChunkGroupsInSequenceFile; + this.totalChunksInSequenceFile += other.totalChunksInSequenceFile; + this.sequenceMinStartTime = Math.min(this.sequenceMinStartTime, other.sequenceMinStartTime); + this.sequenceMaxEndTime = Math.max(this.sequenceMaxEndTime, other.sequenceMaxEndTime); + + this.totalUnsequenceFile += other.totalUnsequenceFile; + this.totalUnsequenceFileSize += other.totalUnsequenceFileSize; + this.totalChunkGroupsInUnSequenceFile += other.totalChunkGroupsInUnSequenceFile; + this.totalChunksInUnSequenceFile += other.totalChunksInUnSequenceFile; + this.unSequenceMinStartTime = + Math.min(this.unSequenceMinStartTime, other.unSequenceMinStartTime); + this.unSequenceMaxEndTime = Math.max(this.unSequenceMaxEndTime, other.unSequenceMaxEndTime); + + this.overlappedSequenceFiles += other.overlappedSequenceFiles; + this.overlappedChunkGroupsInSequenceFile += other.overlappedChunkGroupsInSequenceFile; + this.overlappedChunksInSequenceFile += other.overlappedChunksInSequenceFile; + } + + public void mergeSingleSequenceFileTaskResult(SequenceFileTaskSummary summary) { + if (summary.equals(new SequenceFileTaskSummary())) { + return; + } + if (summary.overlapChunkGroup > 0) { + this.overlappedSequenceFiles += 1; + } + this.overlappedChunkGroupsInSequenceFile += summary.overlapChunkGroup; + this.totalChunkGroupsInSequenceFile += summary.totalChunkGroups; + this.overlappedChunksInSequenceFile += summary.overlapChunk; + this.totalChunksInSequenceFile += summary.totalChunks; + this.totalSequenceFile += 1; + this.totalSequenceFileSize += summary.fileSize; + this.sequenceMinStartTime = Math.min(this.sequenceMinStartTime, summary.minStartTime); + this.sequenceMaxEndTime = Math.max(this.sequenceMaxEndTime, summary.maxEndTime); + } + + public void mergeUnSeqSpaceStatistics(UnseqSpaceStatistics statistics) { + this.totalUnsequenceFile += statistics.unsequenceFileNum; + this.totalUnsequenceFileSize += statistics.unsequenceFileSize; + this.totalChunksInUnSequenceFile += statistics.unsequenceChunkNum; + this.totalChunkGroupsInUnSequenceFile += statistics.unsequenceChunkGroupNum; + this.unSequenceMinStartTime = Math.min(this.unSequenceMinStartTime, statistics.minStartTime); + this.unSequenceMaxEndTime = Math.max(this.unSequenceMaxEndTime, statistics.maxEndTime); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatisticTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatisticTool.java new file mode 100644 index 0000000000..39ce158787 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/OverlapStatisticTool.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class OverlapStatisticTool { + + private static final String WORKER_NUM_ARG = "worker_num"; + public static final int DEFAULT_WORKER_NUM = 4; + private static final String SUB_TASK_NUM_ARG = "sub_task_num"; + public static final int DEFAULT_WORKER_SUB_TASK_NUM = 1; + private static final String DATA_DIRS_ARG = "data_dirs"; + + public static int workerNum; + public static int subTaskNum; + public static List dataDirs; + + public static Lock outputInfolock = new ReentrantLock(); + public static long seqFileCount = 0; + public static long processedTimePartitionCount = 0; + public static long processedSeqFileCount = 0; + public static final Map, List>> timePartitionFileMap = + new HashMap<>(); + + public static void main(String[] args) throws InterruptedException { + // process parameters to get the path to the data directory from the input + parseArgs(args); + + OverlapStatisticTool tool = new OverlapStatisticTool(); + long startTime = System.currentTimeMillis(); + tool.process(dataDirs); + System.out.printf( + "Total time cost: %.2fs\n", ((double) System.currentTimeMillis() - startTime) / 1000); + } + + public static void parseArgs(String[] args) { + Options options = createOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine; + try { + commandLine = parser.parse(options, args); + } catch (ParseException e) { + throw new RuntimeException(e); + } + workerNum = + Integer.parseInt( + getArgOrDefault(commandLine, WORKER_NUM_ARG, String.valueOf(DEFAULT_WORKER_NUM))); + subTaskNum = + Integer.parseInt( + getArgOrDefault( + commandLine, SUB_TASK_NUM_ARG, String.valueOf(DEFAULT_WORKER_SUB_TASK_NUM))); + String[] dataDirsParam = commandLine.getOptionValues(DATA_DIRS_ARG); + + if (dataDirsParam == null || dataDirsParam.length == 0) { + throw new RuntimeException("data_dirs must not be empty"); + } + dataDirs = Arrays.asList(dataDirsParam); + } + + private static Options createOptions() { + Options options = new Options(); + options + .addOption( + Option.builder() + .argName(WORKER_NUM_ARG) + .longOpt(WORKER_NUM_ARG) + .hasArg() + .desc("Concurrent time partition num(default: 10)") + .build()) + .addOption( + Option.builder() + .argName(SUB_TASK_NUM_ARG) + .longOpt(SUB_TASK_NUM_ARG) + .hasArg() + .desc("Concurrent file num in one time partition(default: 10)") + .build()) + .addOption( + Option.builder() + .argName(DATA_DIRS_ARG) + .longOpt(DATA_DIRS_ARG) + .hasArg() + .desc("Data dirs(Required)") + .required() + .build()); + return options; + } + + private static String getArgOrDefault(CommandLine commandLine, String arg, String defaultValue) { + String value = commandLine.getOptionValue(arg); + return value == null ? defaultValue : value; + } + + public void process(List dataDirs) throws InterruptedException { + processDataDirs(dataDirs); + + int workerNum = Math.min(timePartitionFileMap.size(), OverlapStatisticTool.workerNum); + TimePartitionProcessWorker[] workers = constructWorkers(workerNum); + + CountDownLatch countDownLatch = new CountDownLatch(workerNum); + for (TimePartitionProcessWorker worker : workers) { + worker.run(countDownLatch); + } + countDownLatch.await(); + + OverlapStatistic statistic = new OverlapStatistic(); + for (TimePartitionProcessWorker worker : workers) { + for (OverlapStatistic partialRet : worker.getWorkerResults()) { + statistic.merge(partialRet); + } + } + PrintUtil.printOneStatistics(statistic, "All EXECUTED"); + } + + public TimePartitionProcessWorker[] constructWorkers(int workerNum) { + TimePartitionProcessWorker[] workers = new TimePartitionProcessWorker[workerNum]; + + int workerIdx = 0; + for (Map.Entry, List>> timePartitionFilesEntry : + timePartitionFileMap.entrySet()) { + String timePartition = timePartitionFilesEntry.getKey(); + Pair, List> timePartitionFiles = timePartitionFilesEntry.getValue(); + + if (workers[workerIdx] == null) { + workers[workerIdx] = new TimePartitionProcessWorker(); + } + + workers[workerIdx].addTask(new TimePartitionProcessTask(timePartition, timePartitionFiles)); + workerIdx = (workerIdx + 1) % workerNum; + } + return workers; + } + + private void processDataDirs(List dataDirs) { + // 1. Traverse all time partitions and construct timePartitions + // 2. Count the total number of sequential files + for (String dataDirPath : dataDirs) { + File dataDir = new File(dataDirPath); + if (!dataDir.exists() || !dataDir.isDirectory()) { + continue; + } + processDataDirWithIsSeq(dataDirPath, true); + processDataDirWithIsSeq(dataDirPath, false); + } + } + + private void processDataDirWithIsSeq(String dataDirPath, boolean isSeq) { + String dataDirWithIsSeq; + if (isSeq) { + dataDirWithIsSeq = dataDirPath + File.separator + "sequence"; + } else { + dataDirWithIsSeq = dataDirPath + File.separator + "unsequence"; + } + File dataDirWithIsSequence = new File(dataDirWithIsSeq); + if (!dataDirWithIsSequence.exists() || !dataDirWithIsSequence.isDirectory()) { + System.out.println(dataDirWithIsSequence + " is not a correct path"); + return; + } + + for (File storageGroupDir : Objects.requireNonNull(dataDirWithIsSequence.listFiles())) { + if (!storageGroupDir.isDirectory()) { + continue; + } + String storageGroup = storageGroupDir.getName(); + for (File dataRegionDir : Objects.requireNonNull(storageGroupDir.listFiles())) { + if (!dataRegionDir.isDirectory()) { + continue; + } + String dataRegion = dataRegionDir.getName(); + for (File timePartitionDir : Objects.requireNonNull(dataRegionDir.listFiles())) { + if (!timePartitionDir.isDirectory()) { + continue; + } + + String timePartitionKey = + calculateTimePartitionKey(storageGroup, dataRegion, timePartitionDir.getName()); + Pair, List> timePartitionFiles = + timePartitionFileMap.computeIfAbsent( + timePartitionKey, v -> new Pair<>(new ArrayList<>(), new ArrayList<>())); + for (File file : Objects.requireNonNull(timePartitionDir.listFiles())) { + if (!file.isFile()) { + continue; + } + if (!file.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) { + continue; + } + String resourceFilePath = file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX; + if (!new File(resourceFilePath).exists()) { + System.out.println( + resourceFilePath + + " is not exist, the tsfile is skipped because it is not closed."); + continue; + } + String filePath = file.getAbsolutePath(); + if (isSeq) { + timePartitionFiles.left.add(filePath); + seqFileCount++; + } else { + timePartitionFiles.right.add(filePath); + } + } + } + } + } + } + + private String calculateTimePartitionKey( + String storageGroup, String dataRegion, String timePartition) { + return storageGroup + "-" + dataRegion + "-" + timePartition; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/PrintUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/PrintUtil.java new file mode 100644 index 0000000000..7df1bffc01 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/PrintUtil.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +class PrintUtil { + static String[] header_1 = {"", "Total", "Overlap", "Overlap/Total"}; + static String[] header_2 = {"", "Total", "Sequence", "UnSequence", "UnSequence/Total"}; + + static long MSize = 1024 * 1024; + + public static void printOneStatistics(OverlapStatistic overlapStatistic, String label) { + System.out.println(); + printTableLog(overlapStatistic); + printProgressLog(label, overlapStatistic); + } + + private static void printProgressLog(String label, OverlapStatistic statistic) { + String[][] log = { + { + "File Number", + statistic.totalSequenceFile + statistic.totalUnsequenceFile + "", + statistic.totalSequenceFile + "", + statistic.totalUnsequenceFile + "", + String.format( + "%.2f%%", + statistic.totalUnsequenceFile + * 100d + / (statistic.totalSequenceFile + statistic.totalUnsequenceFile)) + }, + { + "File Size(MB)", + (statistic.totalSequenceFileSize + statistic.totalUnsequenceFileSize) / MSize + "", + statistic.totalSequenceFileSize / MSize + "", + statistic.totalUnsequenceFileSize / MSize + "", + String.format( + "%.2f%%", + statistic.totalUnsequenceFileSize + * 100d + / (statistic.totalSequenceFileSize + statistic.totalUnsequenceFileSize)) + }, + { + "Duration", + Math.max(statistic.sequenceMaxEndTime, statistic.unSequenceMaxEndTime) + - Math.min(statistic.sequenceMinStartTime, statistic.unSequenceMinStartTime) + + "", + statistic.sequenceMaxEndTime - statistic.sequenceMinStartTime + "", + statistic.unSequenceMaxEndTime - statistic.unSequenceMinStartTime + "", + String.format( + "%.2f%%", + (statistic.unSequenceMaxEndTime - statistic.unSequenceMinStartTime) + * 100d + / (Math.max(statistic.sequenceMaxEndTime, statistic.unSequenceMaxEndTime) + - Math.min(statistic.sequenceMinStartTime, statistic.unSequenceMinStartTime))) + } + }; + System.out.println(System.getProperty("line.separator") + "Data Table:"); + printStaticsTable(log); + + System.out.printf( + "Progress: %s\n" + "Sequence File progress: %d/%d\n" + "Partition progress: %d/%d %s", + label, + OverlapStatisticTool.processedSeqFileCount, + OverlapStatisticTool.seqFileCount, + OverlapStatisticTool.processedTimePartitionCount, + OverlapStatisticTool.timePartitionFileMap.size(), + System.getProperty("line.separator")); + } + + private static void printTableLog(OverlapStatistic overlapStatistic) { + double overlappedSeqFilePercentage = + calculatePercentage( + overlapStatistic.overlappedSequenceFiles, overlapStatistic.totalSequenceFile); + double overlappedChunkGroupPercentage = + calculatePercentage( + overlapStatistic.overlappedChunkGroupsInSequenceFile, + overlapStatistic.totalChunkGroupsInSequenceFile); + double overlappedChunkPercentage = + calculatePercentage( + overlapStatistic.overlappedChunksInSequenceFile, + overlapStatistic.totalChunksInSequenceFile); + String[][] log = { + { + "Sequence File", + overlapStatistic.totalSequenceFile + "", + overlapStatistic.overlappedSequenceFiles + "", + String.format("%.2f%%", overlappedSeqFilePercentage) + }, + { + "ChunkGroup In Sequence File", + overlapStatistic.totalChunkGroupsInSequenceFile + "", + overlapStatistic.overlappedChunkGroupsInSequenceFile + "", + String.format("%.2f%%", overlappedChunkGroupPercentage) + }, + { + "Chunk In Sequence File", + overlapStatistic.totalChunksInSequenceFile + "", + overlapStatistic.overlappedChunksInSequenceFile + "", + String.format("%.2f%%", overlappedChunkPercentage) + } + }; + System.out.println("Overlap Table:"); + printOverlapTable(log); + } + + private static double calculatePercentage(long numerator, long denominator) { + return denominator != 0 ? (double) numerator / denominator * 100 : 0; + } + + public static void printOverlapTable(String[][] data) { + int numRows = data.length; + int[] maxCellWidths = calculateMaxCellWidths(header_1, data); + + printTopBorder(maxCellWidths); + printRow(header_1, maxCellWidths); + + for (int row = 0; row < numRows; row++) { + printSeparator(maxCellWidths); + printRow(data[row], maxCellWidths); + } + + printBottomBorder(maxCellWidths); + } + + public static void printStaticsTable(String[][] data) { + int numRows = data.length; + int[] maxCellWidths = calculateMaxCellWidths(header_2, data); + + printTopBorder(maxCellWidths); + printRow(header_2, maxCellWidths); + + for (int row = 0; row < numRows; row++) { + printSeparator(maxCellWidths); + printRow(data[row], maxCellWidths); + } + + printBottomBorder(maxCellWidths); + } + + private static int[] calculateMaxCellWidths(String[] header, String[][] data) { + int numCols = header.length; + int[] maxCellWidths = new int[numCols]; + + for (int col = 0; col < numCols; col++) { + maxCellWidths[col] = header[col].length(); + for (String[] row : data) { + maxCellWidths[col] = Math.max(maxCellWidths[col], row[col].length()); + } + } + + return maxCellWidths; + } + + private static void printTopBorder(int[] maxCellWidths) { + System.out.print("┌"); + for (int width : maxCellWidths) { + printRepeat("─", width + 2); + System.out.print("┬"); + } + System.out.println(); + } + + private static void printSeparator(int[] maxCellWidths) { + System.out.print("├"); + for (int width : maxCellWidths) { + printRepeat("─", width + 2); + System.out.print("┼"); + } + System.out.println(); + } + + private static void printBottomBorder(int[] maxCellWidths) { + System.out.print("└"); + for (int width : maxCellWidths) { + printRepeat("─", width + 2); + System.out.print("┴"); + } + System.out.println(); + } + + private static void printRow(String[] row, int[] maxCellWidths) { + for (int col = 0; col < row.length; col++) { + System.out.printf("│ %-" + maxCellWidths[col] + "s ", row[col]); + } + System.out.println("│"); + } + + private static void printRepeat(String value, int times) { + for (int i = 0; i < times; i++) { + System.out.print(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileSubTaskThreadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileSubTaskThreadExecutor.java new file mode 100644 index 0000000000..1e8512e69d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileSubTaskThreadExecutor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class SequenceFileSubTaskThreadExecutor { + private ExecutorService executor; + + public SequenceFileSubTaskThreadExecutor(int threadCount) { + executor = Executors.newFixedThreadPool(threadCount); + } + + public Future submit(Callable task) { + return executor.submit(task); + } + + public void shutdown() { + executor.shutdown(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileTaskSummary.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileTaskSummary.java new file mode 100644 index 0000000000..c27f41fe30 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SequenceFileTaskSummary.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.Objects; + +public class SequenceFileTaskSummary { + public long overlapChunk = 0; + public long overlapChunkGroup = 0; + public long totalChunks = 0; + public long totalChunkGroups = 0; + public long fileSize = 0; + + public long minStartTime = Long.MAX_VALUE; + public long maxEndTime = Long.MIN_VALUE; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SequenceFileTaskSummary that = (SequenceFileTaskSummary) o; + return overlapChunk == that.overlapChunk + && overlapChunkGroup == that.overlapChunkGroup + && totalChunks == that.totalChunks + && totalChunkGroups == that.totalChunkGroups + && fileSize == that.fileSize; + } + + public void setMaxEndTime(long maxEndTime) { + this.maxEndTime = Math.max(this.maxEndTime, maxEndTime); + } + + public void setMinStartTime(long minStartTime) { + this.minStartTime = Math.min(this.minStartTime, minStartTime); + } + + @Override + public int hashCode() { + return Objects.hash(overlapChunk, overlapChunkGroup, totalChunks, totalChunkGroups, fileSize); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SingleSequenceFileTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SingleSequenceFileTask.java new file mode 100644 index 0000000000..8addc0efc7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/SingleSequenceFileTask.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; + +import java.io.File; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.List; +import java.util.concurrent.Callable; + +public class SingleSequenceFileTask implements Callable { + private UnseqSpaceStatistics unseqSpaceStatistics; + private String seqFile; + + public SingleSequenceFileTask(UnseqSpaceStatistics unseqSpaceStatistics, String seqFile) { + this.unseqSpaceStatistics = unseqSpaceStatistics; + this.seqFile = seqFile; + } + + @Override + public SequenceFileTaskSummary call() throws Exception { + return checkSeqFile(unseqSpaceStatistics, seqFile); + } + + private SequenceFileTaskSummary checkSeqFile( + UnseqSpaceStatistics unseqSpaceStatistics, String seqFile) { + SequenceFileTaskSummary summary = new SequenceFileTaskSummary(); + File f = new File(seqFile); + if (!f.exists()) { + return summary; + } + summary.fileSize += f.length(); + try (TsFileStatisticReader reader = new TsFileStatisticReader(seqFile)) { + // statistics sequence file information and updates to overlapStatistic + List chunkGroupStatisticsList = + reader.getChunkGroupStatisticsList(); + for (TsFileStatisticReader.ChunkGroupStatistics chunkGroupStatistics : + chunkGroupStatisticsList) { + summary.totalChunks += chunkGroupStatistics.getTotalChunkNum(); + String deviceId = chunkGroupStatistics.getDeviceID(); + + long deviceStartTime = Long.MAX_VALUE, deviceEndTime = Long.MIN_VALUE; + + for (ChunkMetadata chunkMetadata : chunkGroupStatistics.getChunkMetadataList()) { + // skip empty chunk + if (chunkMetadata.getStartTime() > chunkMetadata.getEndTime()) { + continue; + } + // update device start time and end time + deviceStartTime = Math.min(deviceStartTime, chunkMetadata.getStartTime()); + deviceEndTime = Math.max(deviceEndTime, chunkMetadata.getEndTime()); + + summary.setMinStartTime(deviceStartTime); + summary.setMaxEndTime(deviceEndTime); + + // check chunk overlap + Interval interval = + new Interval(chunkMetadata.getStartTime(), chunkMetadata.getEndTime()); + String measurementId = chunkMetadata.getMeasurementUid(); + if (unseqSpaceStatistics.chunkHasOverlap(deviceId, measurementId, interval)) { + summary.overlapChunk++; + } + } + // check device overlap + if (deviceStartTime > deviceEndTime) { + continue; + } + Interval deviceInterval = new Interval(deviceStartTime, deviceEndTime); + if (!unseqSpaceStatistics.chunkGroupHasOverlap(deviceId, deviceInterval)) { + continue; + } + summary.overlapChunkGroup++; + } + summary.totalChunkGroups = chunkGroupStatisticsList.size(); + } catch (IOException e) { + if (e instanceof NoSuchFileException) { + System.out.println(seqFile + " is not exist"); + return new SequenceFileTaskSummary(); + } + e.printStackTrace(); + return new SequenceFileTaskSummary(); + } + return summary; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessTask.java new file mode 100644 index 0000000000..ca37b5f8ce --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessTask.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.File; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; + +public class TimePartitionProcessTask { + private final String timePartition; + private final Pair, List> timePartitionFiles; + private long sequenceSpaceCost = 0; + private long unsequenceSpaceCost = 0; + + public TimePartitionProcessTask( + String timePartition, Pair, List> timePartitionFiles) { + this.timePartition = timePartition; + this.timePartitionFiles = timePartitionFiles; + } + + public OverlapStatistic processTimePartition(SequenceFileSubTaskThreadExecutor fileTaskExecutor) { + long startTime = System.currentTimeMillis(); + UnseqSpaceStatistics unseqSpaceStatistics = buildUnseqSpaceStatistics(timePartitionFiles.right); + + OverlapStatistic partialRet = + processSequenceSpaceAsync(fileTaskExecutor, unseqSpaceStatistics, timePartitionFiles.left); + OverlapStatisticTool.outputInfolock.lock(); + OverlapStatisticTool.processedTimePartitionCount += 1; + OverlapStatisticTool.processedSeqFileCount += partialRet.totalSequenceFile; + PrintUtil.printOneStatistics(partialRet, timePartition); + System.out.printf( + "Worker" + + Thread.currentThread().getName() + + " Time cost: %.2fs, Sequence space cost: %.2fs, Build unsequence space cost: %.2fs.\n", + ((double) System.currentTimeMillis() - startTime) / 1000, + ((double) sequenceSpaceCost / 1000), + ((double) unsequenceSpaceCost / 1000)); + + OverlapStatisticTool.outputInfolock.unlock(); + + return partialRet; + } + + private UnseqSpaceStatistics buildUnseqSpaceStatistics(List unseqFiles) { + UnseqSpaceStatistics unseqSpaceStatistics = new UnseqSpaceStatistics(); + + long startTime = System.currentTimeMillis(); + for (String unseqFile : unseqFiles) { + File f = new File(unseqFile); + if (!f.exists()) { + continue; + } + unseqSpaceStatistics.unsequenceFileSize += f.length(); + try (TsFileStatisticReader reader = new TsFileStatisticReader(unseqFile)) { + List chunkGroupStatisticsList = + reader.getChunkGroupStatisticsList(); + unseqSpaceStatistics.unsequenceChunkGroupNum += chunkGroupStatisticsList.size(); + + for (TsFileStatisticReader.ChunkGroupStatistics statistics : chunkGroupStatisticsList) { + long deviceStartTime = Long.MAX_VALUE, deviceEndTime = Long.MIN_VALUE; + + for (ChunkMetadata chunkMetadata : statistics.getChunkMetadataList()) { + unseqSpaceStatistics.unsequenceChunkNum += chunkMetadata.getNumOfPoints(); + deviceStartTime = Math.min(deviceStartTime, chunkMetadata.getStartTime()); + deviceEndTime = Math.max(deviceEndTime, chunkMetadata.getEndTime()); + + unseqSpaceStatistics.setMinStartTime(deviceStartTime); + unseqSpaceStatistics.setMaxEndTime(deviceEndTime); + + if (chunkMetadata.getStartTime() > chunkMetadata.getEndTime()) { + continue; + } + unseqSpaceStatistics.updateMeasurement( + statistics.getDeviceID(), + chunkMetadata.getMeasurementUid(), + new Interval(chunkMetadata.getStartTime(), chunkMetadata.getEndTime())); + } + if (deviceStartTime > deviceEndTime) { + continue; + } + unseqSpaceStatistics.updateDevice( + statistics.getDeviceID(), new Interval(deviceStartTime, deviceEndTime)); + } + } catch (IOException e) { + if (e instanceof NoSuchFileException) { + System.out.println(((NoSuchFileException) e).getFile() + " is not exist"); + continue; + } + e.printStackTrace(); + } + } + unsequenceSpaceCost += (System.currentTimeMillis() - startTime); + unseqSpaceStatistics.unsequenceFileNum += unseqFiles.size(); + return unseqSpaceStatistics; + } + + public OverlapStatistic processSequenceSpaceAsync( + SequenceFileSubTaskThreadExecutor executor, + UnseqSpaceStatistics unseqSpaceStatistics, + List seqFiles) { + long startTime = System.currentTimeMillis(); + OverlapStatistic overlapStatistic = new OverlapStatistic(); + List> futures = new ArrayList<>(); + for (String seqFile : seqFiles) { + futures.add(executor.submit(new SingleSequenceFileTask(unseqSpaceStatistics, seqFile))); + } + for (Future future : futures) { + try { + SequenceFileTaskSummary sequenceFileTaskSummary = future.get(); + overlapStatistic.mergeSingleSequenceFileTaskResult(sequenceFileTaskSummary); + } catch (Exception e) { + e.printStackTrace(); + } + } + overlapStatistic.mergeUnSeqSpaceStatistics(unseqSpaceStatistics); + + sequenceSpaceCost += (System.currentTimeMillis() - startTime); + return overlapStatistic; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessWorker.java new file mode 100644 index 0000000000..a8e621a7c3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TimePartitionProcessWorker.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class TimePartitionProcessWorker { + private final List workerTaskList; + private final List workerResults; + + public TimePartitionProcessWorker() { + workerTaskList = new ArrayList<>(); + workerResults = new ArrayList<>(); + } + + public void addTask(TimePartitionProcessTask task) { + workerTaskList.add(task); + } + + public void run(CountDownLatch latch) { + new Thread( + () -> { + SequenceFileSubTaskThreadExecutor fileProcessTaskExecutor = + new SequenceFileSubTaskThreadExecutor(OverlapStatisticTool.subTaskNum); + while (!workerTaskList.isEmpty()) { + TimePartitionProcessTask task = workerTaskList.remove(0); + OverlapStatistic partialRet = task.processTimePartition(fileProcessTaskExecutor); + workerResults.add(partialRet); + } + latch.countDown(); + fileProcessTaskExecutor.shutdown(); + }) + .start(); + } + + public List getWorkerResults() { + return workerResults; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TsFileStatisticReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TsFileStatisticReader.java new file mode 100644 index 0000000000..7b414a54fa --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/TsFileStatisticReader.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class TsFileStatisticReader implements Closeable { + + private final TsFileSequenceReader reader; + + public TsFileStatisticReader(String filePath) throws IOException { + reader = new TsFileSequenceReader(filePath); + } + + public List getChunkGroupStatisticsList() throws IOException { + TsFileDeviceIterator allDevicesIteratorWithIsAligned = + reader.getAllDevicesIteratorWithIsAligned(); + List chunkGroupStatisticsList = new ArrayList<>(); + while (allDevicesIteratorWithIsAligned.hasNext()) { + Pair deviceWithIsAligned = allDevicesIteratorWithIsAligned.next(); + String deviceId = deviceWithIsAligned.left; + + ChunkGroupStatistics chunkGroupStatistics = new ChunkGroupStatistics(deviceId); + Iterator>> measurementChunkMetadataListMapIterator = + reader.getMeasurementChunkMetadataListMapIterator(deviceId); + + while (measurementChunkMetadataListMapIterator.hasNext()) { + Map> measurementChunkMetadataListMap = + measurementChunkMetadataListMapIterator.next(); + for (Map.Entry> measurementChunkMetadataList : + measurementChunkMetadataListMap.entrySet()) { + List chunkMetadataList = measurementChunkMetadataList.getValue(); + chunkGroupStatistics.chunkMetadataList.addAll(chunkMetadataList); + chunkGroupStatistics.totalChunkNum += chunkMetadataList.size(); + } + } + chunkGroupStatisticsList.add(chunkGroupStatistics); + } + return chunkGroupStatisticsList; + } + + @Override + public void close() throws IOException { + this.reader.close(); + } + + public static class ChunkGroupStatistics { + private final String deviceID; + private final List chunkMetadataList; + private int totalChunkNum = 0; + + private ChunkGroupStatistics(String deviceId) { + this.deviceID = deviceId; + this.chunkMetadataList = new ArrayList<>(); + } + + public String getDeviceID() { + return deviceID; + } + + public List getChunkMetadataList() { + return chunkMetadataList; + } + + public int getTotalChunkNum() { + return totalChunkNum; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/UnseqSpaceStatistics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/UnseqSpaceStatistics.java new file mode 100644 index 0000000000..2b929bb23e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/UnseqSpaceStatistics.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tool; + +import java.util.HashMap; +import java.util.Map; + +public class UnseqSpaceStatistics { + public long unsequenceFileNum = 0; + public long unsequenceFileSize = 0; + + public long unsequenceChunkNum = 0; + public long unsequenceChunkGroupNum = 0; + + public long minStartTime = Long.MAX_VALUE; + + public long maxEndTime = Long.MIN_VALUE; + private Map> chunkStatisticMap = new HashMap<>(); + + private Map chunkGroupStatisticMap = new HashMap<>(); + + public void updateMeasurement(String device, String measurementUID, Interval interval) { + chunkStatisticMap + .computeIfAbsent(device, key -> new HashMap<>()) + .computeIfAbsent(measurementUID, key -> new ListTimeRangeImpl()) + .addInterval(interval); + } + + public void updateDevice(String device, Interval interval) { + chunkGroupStatisticMap + .computeIfAbsent(device, key -> new ListTimeRangeImpl()) + .addInterval(interval); + } + + public boolean chunkHasOverlap(String device, String measurementUID, Interval interval) { + if (!chunkStatisticMap.containsKey(device)) { + return false; + } + if (!chunkStatisticMap.get(device).containsKey(measurementUID)) { + return false; + } + return chunkStatisticMap.get(device).get(measurementUID).isOverlapped(interval); + } + + public boolean chunkGroupHasOverlap(String device, Interval interval) { + if (!chunkGroupStatisticMap.containsKey(device)) { + return false; + } + return chunkGroupStatisticMap.get(device).isOverlapped(interval); + } + + public Map> getChunkStatisticMap() { + return chunkStatisticMap; + } + + public Map getChunkGroupStatisticMap() { + return chunkGroupStatisticMap; + } + + public void setMaxEndTime(long maxEndTime) { + this.maxEndTime = Math.max(this.maxEndTime, maxEndTime); + } + + public void setMinStartTime(long minStartTime) { + this.minStartTime = Math.min(this.minStartTime, minStartTime); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/ListTimeRangeImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/ListTimeRangeImplTest.java new file mode 100644 index 0000000000..c3caaa16d7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/ListTimeRangeImplTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tools; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.tool.Interval; +import org.apache.iotdb.db.storageengine.dataregion.compaction.tool.ListTimeRangeImpl; + +import org.junit.Assert; +import org.junit.Test; + +public class ListTimeRangeImplTest { + + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + + @Test + public void test01() { + listTimeRange.addInterval(new Interval(30, 40)); + Assert.assertEquals(1, listTimeRange.getIntervalList().size()); + Assert.assertEquals(30, listTimeRange.getIntervalList().get(0).getStart()); + Assert.assertEquals(40, listTimeRange.getIntervalList().get(0).getEnd()); + } + + @Test + public void test02() { + listTimeRange.addInterval(new Interval(30, 40)); + listTimeRange.addInterval(new Interval(10, 20)); + listTimeRange.addInterval(new Interval(15, 20)); + listTimeRange.addInterval(new Interval(50, 60)); + Assert.assertEquals(3, listTimeRange.getIntervalList().size()); + } + + @Test + public void test03() { + listTimeRange.addInterval(new Interval(30, 40)); + listTimeRange.addInterval(new Interval(10, 20)); + listTimeRange.addInterval(new Interval(15, 20)); + listTimeRange.addInterval(new Interval(50, 60)); + listTimeRange.addInterval(new Interval(1, 100)); + Assert.assertEquals(1, listTimeRange.getIntervalList().size()); + Assert.assertEquals(1, listTimeRange.getIntervalList().get(0).getStart()); + Assert.assertEquals(100, listTimeRange.getIntervalList().get(0).getEnd()); + } + + @Test + public void test04() { + listTimeRange.addInterval(new Interval(30, 40)); + listTimeRange.addInterval(new Interval(10, 20)); + listTimeRange.addInterval(new Interval(15, 20)); + listTimeRange.addInterval(new Interval(50, 60)); + listTimeRange.addInterval(new Interval(5, 100)); + Assert.assertFalse(listTimeRange.isOverlapped(new Interval(1, 1))); + Assert.assertFalse(listTimeRange.isOverlapped(new Interval(101, 103))); + } + + @Test + public void test05() { + listTimeRange.addInterval(new Interval(30, 40)); + listTimeRange.addInterval(new Interval(10, 20)); + listTimeRange.addInterval(new Interval(20, 30)); + Assert.assertEquals(1, listTimeRange.getIntervalList().size()); + } + + @Test + public void test06() { + listTimeRange.addInterval(new Interval(1, 100)); + listTimeRange.addInterval(new Interval(1, 2000)); + Assert.assertEquals(1, listTimeRange.getIntervalList().size()); + } + + @Test + public void test07() { + listTimeRange.addInterval(new Interval(1, 10)); + listTimeRange.addInterval(new Interval(60, 70)); + listTimeRange.addInterval(new Interval(51, 55)); + Assert.assertEquals(51, listTimeRange.getIntervalList().get(1).getStart()); + } + + @Test + public void testNoOverlap() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(3, 5)); + Assert.assertFalse(listTimeRange.isOverlapped(new Interval(6, 10))); + Assert.assertFalse(listTimeRange.isOverlapped(new Interval(1, 2))); + } + + @Test + public void testStartTimeOverlap() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(1, 5)); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(4, 8))); + } + + @Test + public void testEndTimeOverlap() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(1, 5)); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(0, 4))); + } + + @Test + public void testFullyOverlap() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(2, 4)); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(1, 5))); + } + + @Test + public void testIntervalInsideCurrentInterval() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(1, 5)); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(2, 4))); + } + + @Test + public void testBoundary() { + ListTimeRangeImpl listTimeRange = new ListTimeRangeImpl(); + listTimeRange.addInterval(new Interval(3, 5)); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(1, 3))); + Assert.assertTrue(listTimeRange.isOverlapped(new Interval(5, 6))); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/UnseqSpaceStatisticsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/UnseqSpaceStatisticsTest.java new file mode 100644 index 0000000000..aad3ef312c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tools/UnseqSpaceStatisticsTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.tools; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.tool.Interval; +import org.apache.iotdb.db.storageengine.dataregion.compaction.tool.UnseqSpaceStatistics; + +import org.junit.Assert; +import org.junit.Test; + +public class UnseqSpaceStatisticsTest { + + @Test + public void test01() { + UnseqSpaceStatistics unseqSpaceStatistics = new UnseqSpaceStatistics(); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s1", new Interval(1, 10)); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s1", new Interval(5, 15)); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s2", new Interval(1, 10)); + unseqSpaceStatistics.updateMeasurement("root.db.d2", "s2", new Interval(1, 10)); + + Assert.assertEquals(2, unseqSpaceStatistics.getChunkStatisticMap().size()); + Assert.assertEquals(2, unseqSpaceStatistics.getChunkStatisticMap().get("root.db.d1").size()); + Assert.assertEquals(1, unseqSpaceStatistics.getChunkStatisticMap().get("root.db.d2").size()); + } + + @Test + public void test02() { + UnseqSpaceStatistics unseqSpaceStatistics = new UnseqSpaceStatistics(); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s1", new Interval(1, 10)); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s1", new Interval(5, 15)); + unseqSpaceStatistics.updateMeasurement("root.db.d1", "s2", new Interval(1, 10)); + unseqSpaceStatistics.updateMeasurement("root.db.d2", "s2", new Interval(1, 10)); + + Assert.assertTrue( + unseqSpaceStatistics.chunkHasOverlap("root.db.d1", "s1", new Interval(1, 10))); + Assert.assertFalse( + unseqSpaceStatistics.chunkHasOverlap("root.db.d1", "s4", new Interval(1, 10))); + Assert.assertFalse( + unseqSpaceStatistics.chunkHasOverlap("root.db.d2", "s1", new Interval(1, 10))); + + Assert.assertFalse( + unseqSpaceStatistics.chunkHasOverlap("root.db.d3", "s1", new Interval(1, 10))); + Assert.assertFalse( + unseqSpaceStatistics.chunkHasOverlap("root.db.d1", "s1", new Interval(21, 30))); + } +} -- GitLab