未验证 提交 f2e89cc8 编写于 作者: H Haonan 提交者: GitHub

Fix sonar issue in DataRegion module (#10311)

上级 cc83670d
......@@ -186,9 +186,6 @@ public class DataRegion implements IDataRegionForQuery {
private static final Logger logger = LoggerFactory.getLogger(DataRegion.class);
/** indicating the file to be loaded overlap with some files. */
private static final int POS_OVERLAP = -3;
private final boolean enableMemControl = config.isEnableMemControl();
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
......@@ -284,7 +281,7 @@ public class DataRegion implements IDataRegionForQuery {
public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
private final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
QueryResourceMetricSet.getInstance();
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
......@@ -427,6 +424,7 @@ public class DataRegion implements IDataRegionForQuery {
}
/** recover from file */
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
private void recover() throws DataRegionException {
try {
recoverCompaction();
......@@ -454,7 +452,7 @@ public class DataRegion implements IDataRegionForQuery {
// split by partition so that we can find the last file of each partition and decide to
// close it or not
DataRegionRecoveryContext dataRegionRecoveryContext =
new DataRegionRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size());
new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size());
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
splitResourcesByPartition(tmpSeqTsFiles);
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
......@@ -743,15 +741,15 @@ public class DataRegion implements IDataRegionForQuery {
return new Pair<>(ret, upgradeRet);
}
private void continueFailedRenames(File fileFolder, String suffix) {
private void continueFailedRenames(File fileFolder, String suffix) throws IOException {
File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
if (files != null) {
for (File tempResource : files) {
File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, ""));
if (originResource.exists()) {
tempResource.delete();
Files.delete(tempResource.toPath());
} else {
tempResource.renameTo(originResource);
Files.move(tempResource.toPath(), originResource.toPath());
}
}
}
......@@ -973,7 +971,7 @@ public class DataRegion implements IDataRegionForQuery {
*
* @throws BatchProcessException if some of the rows failed to be inserted
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
if (enableMemControl) {
......@@ -2839,31 +2837,6 @@ public class DataRegion implements IDataRegionForQuery {
return false;
}
/** remove all partitions that satisfy a filter. */
public void removePartitions(TimePartitionFilter filter) {
// this requires blocking all other activities
writeLock("removePartitions");
try {
// abort ongoing compaction
abortCompaction();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// Wait two seconds for the compaction thread to terminate
}
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet(), true);
removePartitions(filter, workUnsequenceTsFileProcessors.entrySet(), false);
// remove data files
removePartitions(filter, tsFileManager.getIterator(true), true);
removePartitions(filter, tsFileManager.getIterator(false), false);
} finally {
writeUnlock();
}
}
public void abortCompaction() {
tsFileManager.setAllowCompaction(false);
List<AbstractCompactionTask> runningTasks =
......@@ -2873,49 +2846,7 @@ public class DataRegion implements IDataRegionForQuery {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
logger.error("Thread get interrupted when waiting compaction to finish", e);
break;
}
}
}
// may remove the processorEntrys
private void removePartitions(
TimePartitionFilter filter,
Set<Entry<Long, TsFileProcessor>> processorEntrys,
boolean sequence) {
for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator();
iterator.hasNext(); ) {
Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
long partitionId = longTsFileProcessorEntry.getKey();
lastFlushTimeMap.removePartition(partitionId);
TimePartitionManager.getInstance()
.removePartition(new DataRegionId(Integer.valueOf(dataRegionId)), partitionId);
TsFileProcessor processor = longTsFileProcessorEntry.getValue();
if (filter.satisfy(databaseName, partitionId)) {
processor.syncClose();
iterator.remove();
processor.getTsFileResource().remove();
tsFileManager.remove(processor.getTsFileResource(), sequence);
logger.debug(
"{} is removed during deleting partitions",
processor.getTsFileResource().getTsFilePath());
}
}
}
// may remove the iterator's data
private void removePartitions(
TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean sequence) {
while (iterator.hasNext()) {
TsFileResource tsFileResource = iterator.next();
if (filter.satisfy(databaseName, tsFileResource.getTimePartition())) {
tsFileResource.remove();
tsFileManager.remove(tsFileResource, sequence);
lastFlushTimeMap.removePartition(tsFileResource.getTimePartition());
TimePartitionManager.getInstance()
.removePartition(
new DataRegionId(Integer.valueOf(dataRegionId)), tsFileResource.getTimePartition());
logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
Thread.currentThread().interrupt();
}
}
}
......
......@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Locale;
/**
......@@ -72,7 +73,11 @@ public class CompressionRatio {
directory =
SystemFileFactory.INSTANCE.getFile(
FilePathUtils.regularizePath(CONFIG.getSystemDir()) + COMPRESSION_RATIO_DIR);
restore();
try {
restore();
} catch (IOException e) {
LOGGER.error("restore file error caused by ", e);
}
}
/**
......@@ -107,7 +112,7 @@ public class CompressionRatio {
private void persist(File oldFile, File newFile) throws IOException {
checkDirectoryExist();
if (!oldFile.exists()) {
newFile.createNewFile();
Files.createFile(newFile.toPath());
LOGGER.debug(
"Old ratio file {} doesn't exist, force create ratio file {}",
oldFile.getAbsolutePath(),
......@@ -128,7 +133,7 @@ public class CompressionRatio {
}
/** Restore compression ratio statistics from disk when system restart */
void restore() {
void restore() throws IOException {
if (!directory.exists()) {
return;
}
......@@ -157,6 +162,7 @@ public class CompressionRatio {
calcTimes);
for (int i = 0; i < ratioFiles.length; i++) {
if (i != maxRatioIndex) {
Files.delete(ratioFiles[i].toPath());
ratioFiles[i].delete();
}
}
......
......@@ -106,7 +106,7 @@ public class TsFileProcessor {
private final boolean enableMemControl = config.isEnableMemControl();
/** database info for mem control. */
private DataRegionInfo dataRegionInfo;
private final DataRegionInfo dataRegionInfo;
/** tsfile processor info for mem control. */
private TsFileProcessorInfo tsFileProcessorInfo;
......@@ -114,7 +114,7 @@ public class TsFileProcessor {
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
/** modification to memtable mapping. */
private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
private final List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
/** writer for restore tsfile and flushing. */
private RestorableTsFileIOWriter writer;
......@@ -914,11 +914,11 @@ public class TsFileProcessor {
}
}
synchronized (tmpMemTable) {
synchronized (flushingMemTables) {
try {
long startWait = System.currentTimeMillis();
while (flushingMemTables.contains(tmpMemTable)) {
tmpMemTable.wait(1000);
flushingMemTables.wait(1000);
if ((System.currentTimeMillis() - startWait) > 60_000) {
logger.warn(
......@@ -1075,9 +1075,9 @@ public class TsFileProcessor {
/** This method will synchronize the memTable and release its flushing resources */
private void syncReleaseFlushedMemTable(IMemTable memTable) {
synchronized (memTable) {
synchronized (flushingMemTables) {
releaseFlushedMemTable(memTable);
memTable.notifyAll();
flushingMemTables.notifyAll();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} released a memtable (signal={}), flushingMemtables size ={}",
......@@ -1093,7 +1093,7 @@ public class TsFileProcessor {
* Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
* the flush manager pool
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@SuppressWarnings({"squid:S3776", "squid:S2142"}) // Suppress high Cognitive Complexity warning
public void flushOneMemTable() {
IMemTable memTableToFlush = flushingMemTables.getFirst();
......
......@@ -19,12 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -33,13 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TsFileManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileManager.class);
private String storageGroupName;
private String dataRegionId;
private String storageGroupDir;
......@@ -301,25 +295,6 @@ public class TsFileManager {
writeLockHolder = holder;
}
/**
* Acquire write lock with timeout, {@link WriteLockFailedException} will be thrown after timeout.
* The unit of timeout is ms.
*/
public void writeLockWithTimeout(String holder, long timeout) throws WriteLockFailedException {
try {
if (resourceListLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
writeLockHolder = holder;
} else {
throw new WriteLockFailedException(
String.format("cannot get write lock in %d ms", timeout));
}
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage(), e);
Thread.interrupted();
throw new WriteLockFailedException("thread is interrupted");
}
}
public void writeUnlock() {
resourceListLock.writeLock().unlock();
writeLockHolder = "";
......
......@@ -68,6 +68,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
......@@ -99,8 +100,10 @@ public class TsFileResource {
/** time index */
public ITimeIndex timeIndex;
@SuppressWarnings("squid:S3077")
private volatile ModificationFile modFile;
@SuppressWarnings("squid:S3077")
private volatile ModificationFile compactionModFile;
protected AtomicReference<TsFileResourceStatus> atomicStatus =
......@@ -134,7 +137,7 @@ public class TsFileResource {
private long ramSize;
private volatile int tierLevel = 0;
private AtomicInteger tierLevel;
private volatile long tsFileSize = -1L;
......@@ -191,7 +194,7 @@ public class TsFileResource {
this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath());
// This method is invoked when DataNode recovers, so the tierLevel should be calculated when
// restarting
this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
this.tierLevel = new AtomicInteger(TierManager.getInstance().getFileTierLevel(file));
}
/** Used for compaction to create target files. */
......@@ -209,7 +212,7 @@ public class TsFileResource {
this.isSeq = processor.isSequence();
// this method is invoked when a new TsFile is created and a newly created TsFile's the
// tierLevel is 0 by default
this.tierLevel = 0;
this.tierLevel = new AtomicInteger(0);
}
/** unsealed TsFile, for read */
......@@ -382,6 +385,7 @@ public class TsFileResource {
return pathToReadOnlyMemChunkMap.get(seriesPath);
}
@SuppressWarnings("squid:S2886")
public ModificationFile getModFile() {
if (modFile == null) {
synchronized (this) {
......@@ -426,11 +430,11 @@ public class TsFileResource {
}
public void increaseTierLevel() {
this.tierLevel++;
this.tierLevel.addAndGet(1);
}
public int getTierLevel() {
return tierLevel;
return tierLevel.get();
}
public long getTsFileSize() {
......
......@@ -23,19 +23,17 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TsFileResourceList implements List<TsFileResource> {
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileResourceList.class);
private TsFileResource header;
private TsFileResource tail;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
......@@ -379,6 +377,9 @@ public class TsFileResourceList implements List<TsFileResource> {
@Override
public TsFileResource next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return this.tsFileResourceList.get(currentIndex++);
}
}
......@@ -399,6 +400,9 @@ public class TsFileResourceList implements List<TsFileResource> {
@Override
public TsFileResource next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return tsFileResourceList.get(currentIndex--);
}
}
......
......@@ -225,36 +225,6 @@ public class DataRegionTest {
}
}
@Test
public void testInsertDataAndRemovePartitionAndInsert()
throws WriteProcessException, QueryProcessException, IllegalPathException {
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
dataRegion.removePartitions((storageGroupName, timePartitionId) -> true);
for (int j = 0; j < 10; j++) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
dataRegion.insert(buildInsertRowNodeByTSRecord(record));
dataRegion.asyncCloseAllWorkingTsFileProcessors();
}
dataRegion.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
dataRegion.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
deviceId,
context,
null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException, WriteProcessException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册