未验证 提交 4e87d43d 编写于 作者: L Liu Xuxin 提交者: GitHub

[IOTDB-2453] Remove all not necessarily lock in compaction process (#4952)

上级 f9011617
......@@ -66,11 +66,6 @@ public class IoTDBRemovePartitionIT {
StorageEngine.setEnablePartition(false);
StorageEngine.setTimePartitionInterval(-1);
EnvironmentUtils.cleanEnv();
ch.qos.logback.classic.Logger rootLogger =
(ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
rootLogger.setLevel(Level.toLevel("warn"));
}
@Test
......
......@@ -1417,7 +1417,7 @@ public class IoTDBConfig {
return crossCompactionMemoryBudget;
}
void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
public void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
this.crossCompactionMemoryBudget = crossCompactionMemoryBudget;
}
......
......@@ -28,9 +28,6 @@ import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFacto
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* CompactionScheduler schedules and submits the compaction task periodically, and it counts the
* total number of running compaction task. There are three compaction strategy: BALANCE,
......@@ -44,9 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class CompactionScheduler {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
// fullStorageGroupName -> timePartition -> compactionCount
private static volatile Map<String, Map<Long, Long>> compactionCountInPartition =
new ConcurrentHashMap<>();
public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
if (!tsFileManager.isAllowCompaction()) {
......
......@@ -373,8 +373,12 @@ public class CompactionTaskManager implements IService {
}
@TestOnly
public void restart() {
public void restart() throws InterruptedException {
if (IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() > 0) {
if (taskExecutionPool != null) {
this.taskExecutionPool.shutdownNow();
this.taskExecutionPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
this.taskExecutionPool =
(WrappedScheduledExecutorService)
IoTDBThreadPoolFactory.newScheduledThreadPool(
......@@ -389,4 +393,9 @@ public class CompactionTaskManager implements IService {
currentTaskNum = new AtomicInteger(0);
logger.info("Compaction task manager started.");
}
@TestOnly
public void clearCandidateQueue() {
candidateCompactionTaskQueue.clear();
}
}
......@@ -20,7 +20,9 @@ package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import java.util.List;
public abstract class AbstractCrossSpaceCompactionSelector extends AbstractCompactionSelector {
protected String logicalStorageGroupName;
......@@ -28,8 +30,8 @@ public abstract class AbstractCrossSpaceCompactionSelector extends AbstractCompa
protected String storageGroupDir;
protected long timePartition;
protected TsFileManager tsFileManager;
protected TsFileResourceList sequenceFileList;
protected TsFileResourceList unsequenceFileList;
protected List<TsFileResource> sequenceFileList;
protected List<TsFileResource> unsequenceFileList;
protected CrossSpaceCompactionTaskFactory taskFactory;
public AbstractCrossSpaceCompactionSelector(
......
......@@ -50,8 +50,7 @@ public class CrossSpaceCompactionExceptionHandler {
List<TsFileResource> targetResourceList,
List<TsFileResource> seqResourceList,
List<TsFileResource> unseqResourceList,
TsFileManager tsFileManager,
long timePartiionId) {
TsFileManager tsFileManager) {
try {
if (logFile == null || !logFile.exists()) {
// the log file is null or the log file does not exists
......
......@@ -71,17 +71,6 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
public void selectAndSubmit() {
if ((CompactionTaskManager.currentTaskNum.get() >= config.getConcurrentCompactionThread())
|| (!config.isEnableCrossSpaceCompaction())) {
if (CompactionTaskManager.currentTaskNum.get() >= config.getConcurrentCompactionThread()) {
LOGGER.debug("End selection because too many threads");
} else if (!config.isEnableCrossSpaceCompaction()) {
LOGGER.debug("End selection because cross compaction is not enable");
} else {
LOGGER.debug(
"End selection because {}-{} is compacting, task num in CompactionTaskManager is {}",
logicalStorageGroupName,
virtualGroupId,
CompactionTaskManager.currentTaskNum.get());
}
return;
}
Iterator<TsFileResource> seqIterator = sequenceFileList.iterator();
......
......@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.AbstractCrossSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.cross.CrossSpaceCompactionExceptionHandler;
......@@ -29,7 +28,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
......@@ -62,9 +60,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
private List<TsFileResource> targetTsfileResourceList;
private List<TsFileResource> holdReadLockList = new ArrayList<>();
private List<TsFileResource> holdWriteLockList = new ArrayList<>();
private boolean getWriteLockOfManager = false;
private final long ACQUIRE_WRITE_LOCK_TIMEOUT =
IoTDBDescriptor.getInstance().getConfig().getCompactionAcquireWriteLockTimeout();
public RewriteCrossSpaceCompactionTask(
String logicalStorageGroupName,
......@@ -97,14 +92,10 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
targetTsfileResourceList,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
tsFileManager,
timePartition);
tsFileManager);
throw throwable;
} finally {
releaseAllLock();
if (getWriteLockOfManager) {
tsFileManager.writeUnlock();
}
}
}
......@@ -158,22 +149,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
CompactionUtils.combineModsInCompaction(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);
try {
tsFileManager.writeLockWithTimeout(
"rewrite-cross-space compaction", ACQUIRE_WRITE_LOCK_TIMEOUT);
getWriteLockOfManager = true;
} catch (WriteLockFailedException e) {
// if current compaction thread couldn't get write lock
// a WriteLockFailException will be thrown, then terminate the thread itself
logger.error(
"{} [Compaction] CrossSpaceCompactionTask failed to get write lock, abort the task.",
fullStorageGroupName,
e);
throw new InterruptedException(
String.format(
"%s [Compaction] compaction abort because cannot acquire write lock",
fullStorageGroupName));
}
deleteOldFiles(selectedSeqTsFileResourceList);
deleteOldFiles(selectedUnSeqTsFileResourceList);
......
......@@ -20,13 +20,15 @@ package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import java.util.List;
public abstract class AbstractInnerSpaceCompactionSelector extends AbstractCompactionSelector {
protected String logicalStorageGroupName;
protected String virtualStorageGroupName;
protected long timePartition;
protected TsFileResourceList tsFileResources;
protected List<TsFileResource> tsFileResources;
protected boolean sequence;
protected InnerSpaceCompactionTaskFactory taskFactory;
protected TsFileManager tsFileManager;
......@@ -45,9 +47,10 @@ public abstract class AbstractInnerSpaceCompactionSelector extends AbstractCompa
this.sequence = sequence;
this.taskFactory = taskFactory;
if (sequence) {
tsFileResources = tsFileManager.getSequenceListByTimePartition(timePartition);
tsFileResources = tsFileManager.getSequenceListByTimePartition(timePartition).getArrayList();
} else {
tsFileResources = tsFileManager.getUnsequenceListByTimePartition(timePartition);
tsFileResources =
tsFileManager.getUnsequenceListByTimePartition(timePartition).getArrayList();
}
}
......
......@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import java.util.List;
......@@ -33,7 +32,6 @@ public class InnerSpaceCompactionTaskFactory {
String virtualStorageGroup,
long timePartition,
TsFileManager tsFileManager,
TsFileResourceList tsFileResourceList,
List<TsFileResource> selectedTsFileResourceList,
boolean sequence) {
return IoTDBDescriptor.getInstance()
......
......@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionPriority;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
......@@ -80,9 +79,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
*/
@Override
public void selectAndSubmit() {
final CompactionPriority priority =
IoTDBDescriptor.getInstance().getConfig().getCompactionPriority();
tsFileResources.readLock();
PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
......@@ -97,8 +93,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
}
} catch (Exception e) {
LOGGER.error("Exception occurs while selecting files", e);
} finally {
tsFileResources.readUnlock();
}
}
......@@ -178,7 +172,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
virtualStorageGroupName,
timePartition,
tsFileManager,
tsFileResources,
selectedFileList,
sequence);
return CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
......
......@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionExceptionHandler;
......@@ -29,7 +28,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
......@@ -90,9 +88,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
return;
}
long startTime = System.currentTimeMillis();
boolean getWriteLockOfManager = false;
final long ACQUIRE_WRITE_LOCK_TIMEOUT =
IoTDBDescriptor.getInstance().getConfig().getCompactionAcquireWriteLockTimeout();
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
// Here is tmpTargetFile, which is xxx.target
......@@ -160,27 +155,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
InnerSpaceCompactionUtils.combineModsInCompaction(
selectedTsFileResourceList, targetTsFileResource);
LOGGER.info(
"{} [Compaction] Get the write lock of files, try to get the write lock of TsFileResourceList",
fullStorageGroupName);
// get write lock for TsFileResource list with timeout
try {
tsFileManager.writeLockWithTimeout("size-tired compaction", ACQUIRE_WRITE_LOCK_TIMEOUT);
getWriteLockOfManager = true;
} catch (WriteLockFailedException e) {
// if current compaction thread couldn't get write lock
// a WriteLockFailException will be thrown, then terminate the thread itself
LOGGER.warn(
"{} [SizeTiredCompactionTask] failed to get write lock, abort the task and delete the target file {}",
fullStorageGroupName,
targetTsFileResource.getTsFile(),
e);
throw new InterruptedException(
String.format(
"%s [Compaction] compaction abort because cannot acquire write lock",
fullStorageGroupName));
}
if (targetTsFileResource.getTsFile().length()
< TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
// the file size is smaller than magic string and version number
......@@ -235,9 +209,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
tsFileResourceList);
} finally {
releaseFileLocksAndResetMergingStatus(true);
if (getWriteLockOfManager) {
tsFileManager.writeUnlock();
}
}
}
......@@ -255,29 +226,25 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
@Override
public boolean checkValidAndSetMerging() {
tsFileResourceList.readLock();
try {
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
TsFileResource resource = selectedTsFileResourceList.get(i);
resource.readLock();
isHoldingReadLock[i] = true;
if (resource.isCompacting() | !resource.isClosed()
|| !resource.getTsFile().exists()
|| resource.isDeleted()) {
// this source file cannot be compacted
// release the lock of locked files, and return
releaseFileLocksAndResetMergingStatus(false);
return false;
}
}
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
TsFileResource resource = selectedTsFileResourceList.get(i);
for (TsFileResource resource : selectedTsFileResourceList) {
resource.setCompacting(true);
if (resource.isCompacting() | !resource.isClosed()
|| !resource.getTsFile().exists()
|| resource.isDeleted()) {
// this source file cannot be compacted
// release the lock of locked files, and return
releaseFileLocksAndResetMergingStatus(false);
return false;
}
return true;
} finally {
tsFileResourceList.readUnlock();
resource.readLock();
isHoldingReadLock[i] = true;
}
for (TsFileResource resource : selectedTsFileResourceList) {
resource.setCompacting(true);
}
return true;
}
/**
......
......@@ -67,6 +67,8 @@ public class TsFileManager {
}
public List<TsFileResource> getTsFileList(boolean sequence) {
// the iteration of ConcurrentSkipListMap is not concurrent secure
// so we must add read lock here
readLock();
try {
List<TsFileResource> allResources = new ArrayList<>();
......@@ -98,7 +100,7 @@ public class TsFileManager {
}
public void remove(TsFileResource tsFileResource, boolean sequence) {
writeLock("remove");
readLock();
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) {
......@@ -109,7 +111,7 @@ public class TsFileManager {
}
}
} finally {
writeUnlock();
readUnlock();
}
}
......
......@@ -507,6 +507,10 @@ public class TsFileResource {
return tsFileLock.tryWriteLock();
}
public boolean tryReadLock() {
return tsFileLock.tryReadLock();
}
void doUpgrade() {
UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
}
......
......@@ -46,6 +46,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
if (tempSGDir.exists()) {
FileUtils.deleteDirectory(tempSGDir);
}
CompactionTaskManager.getInstance().restart();
Assert.assertTrue(tempSGDir.mkdirs());
super.setUp();
}
......@@ -62,7 +63,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
SizeTieredCompactionTask task2 =
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
seqResources.get(0).readLock();
CompactionTaskManager manager = CompactionTaskManager.getInstance();
try {
Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
......@@ -72,7 +73,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(manager.getTotalTaskCount(), 1);
manager.submitTaskFromTaskQueue();
} finally {
tsFileManager.writeUnlock();
seqResources.get(0).readUnlock();
}
Thread.sleep(5000);
Assert.assertEquals(0, manager.getTotalTaskCount());
......@@ -101,7 +102,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
SizeTieredCompactionTask task2 =
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
seqResources.get(0).readLock();
try {
CompactionTaskManager manager = CompactionTaskManager.getInstance();
manager.addTaskToWaitingQueue(task1);
......@@ -111,7 +112,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(manager.getExecutingTaskCount(), 1);
Assert.assertFalse(manager.addTaskToWaitingQueue(task2));
} finally {
tsFileManager.writeUnlock();
seqResources.get(0).readUnlock();
}
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) {
......@@ -144,11 +145,12 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
while (manager.getTotalTaskCount() > 0) {
Thread.sleep(10);
}
tsFileManager.writeLock("test");
seqResources.get(0).readLock();
// an invalid task can be submitted to waiting queue, but should not be submitted to thread pool
Assert.assertTrue(manager.addTaskToWaitingQueue(task2));
manager.submitTaskFromTaskQueue();
Assert.assertEquals(manager.getExecutingTaskCount(), 0);
seqResources.get(0).readUnlock();
long waitingTime = 0;
while (manager.getRunningCompactionTaskList().size() > 0) {
Thread.sleep(100);
......@@ -172,7 +174,8 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
CompactionTaskManager manager = CompactionTaskManager.getInstance();
tsFileManager.writeLock("test");
manager.restart();
seqResources.get(0).readLock();
try {
manager.addTaskToWaitingQueue(task1);
manager.submitTaskFromTaskQueue();
......@@ -182,7 +185,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(1, runningList.size());
Assert.assertTrue(runningList.contains(task1));
} finally {
tsFileManager.writeUnlock();
seqResources.get(0).readUnlock();
}
// after execution, task should remove itself from running list
Thread.sleep(5000);
......
......@@ -102,8 +102,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
tsFileManager,
0);
tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
......@@ -172,8 +171,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
tsFileManager,
0);
tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
......@@ -243,8 +241,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
tsFileManager,
0);
tsFileManager);
// all source file should not exist
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.getTsFile().exists());
......@@ -332,8 +329,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
tsFileManager,
0);
tsFileManager);
// All source file should not exist. All compaction mods file and old mods file of each source
// file should not exist
for (TsFileResource resource : seqResources) {
......@@ -435,8 +431,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
tsFileManager,
0);
tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
......
......@@ -84,8 +84,11 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(4);
IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(1000000);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
registerTimeseriesInMManger(2, 3, false);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
registerTimeseriesInMManger(3, 5, false);
createFiles(2, 5, 5, 50, 600, 800, 50, 50, false, true);
registerTimeseriesInMManger(5, 5, false);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicInteger;
public class SizeTieredCompactionHandleExceptionTest extends AbstractInnerSpaceCompactionTest {
@Before
public void setUp() throws IOException, MetadataException, WriteProcessException {
this.seqFileNum = 10;
super.setUp();
}
@After
public void tearDown() throws StorageEngineException, IOException {
new CompactionConfigRestorer().restoreCompactionConfig();
super.tearDown();
}
@Test
public void testHandleExceptionTargetCompleteAndSourceExists() {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(2_000);
try {
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
SizeTieredCompactionTask task =
new SizeTieredCompactionTask(
COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
try {
new Thread(
() -> {
try {
task.call();
} catch (Exception e) {
}
})
.start();
Thread.sleep(4_000);
} catch (Exception e) {
} finally {
tsFileManager.writeUnlock();
}
Assert.assertTrue(tsFileManager.isAllowCompaction());
Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
} finally {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000);
}
}
@Test
public void testHandleExceptionTargetNotCompleteAndSourceNotExists() {
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
SizeTieredCompactionTask task =
new SizeTieredCompactionTask(
COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
try {
seqResources.get(seqResources.size() - 1).remove();
new Thread(
() -> {
try {
task.call();
} catch (Exception e) {
}
})
.start();
Thread.sleep(5_000);
} catch (Exception e) {
} finally {
tsFileManager.writeUnlock();
}
Assert.assertFalse(tsFileManager.isAllowCompaction());
Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
}
@Test
public void testHandleExceptionTargetCompleteAndSourceNotExists() {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(10_000);
try {
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
SizeTieredCompactionTask task =
new SizeTieredCompactionTask(
COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
try {
new Thread(
() -> {
try {
task.call();
} catch (Exception e) {
}
})
.start();
Thread.sleep(8_000);
seqResources.get(0).remove();
tsFileManager.getTsFileList(true).remove(seqResources.get(0));
Thread.sleep(3_000);
} catch (Exception e) {
} finally {
tsFileManager.writeUnlock();
}
Assert.assertTrue(tsFileManager.isAllowCompaction());
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
} finally {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000L);
}
}
@Test
public void testHandleExceptionTargetNotCompleteAndSourceExists() {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(10_000L);
try {
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
SizeTieredCompactionTask task =
new SizeTieredCompactionTask(
COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
tsFileManager.writeLock("test");
try {
new Thread(
() -> {
try {
task.call();
} catch (Exception e) {
}
})
.start();
Thread.sleep(8_000);
File targetFile =
TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true)
.getTsFile();
FileChannel channel = new FileOutputStream(targetFile, true).getChannel();
channel.truncate(10);
channel.close();
Thread.sleep(3_000);
} catch (Exception e) {
} finally {
tsFileManager.writeUnlock();
}
Assert.assertTrue(tsFileManager.isAllowCompaction());
Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
} finally {
IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000L);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册