未验证 提交 061a9787 编写于 作者: 周沛辰 提交者: GitHub

[IOTDB-2446]fix deleting bug in a compaction (#4916)

上级 5d09b8cc
......@@ -42,9 +42,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class, ClusterTest.class})
public class IoTDBMergeIT {
public class IoTDBCompactionIT {
private static final Logger logger = LoggerFactory.getLogger(IoTDBMergeIT.class);
private static final Logger logger = LoggerFactory.getLogger(IoTDBCompactionIT.class);
private long prevPartitionInterval;
@Before
......
......@@ -45,7 +45,7 @@
<logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" level="INFO"/>
<logger name="org.apache.iotdb.db.service.metrics.MetricsService" level="INFO"/>
<logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBMergeIT" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBCompactionIT" level="INFO"/>
<logger name="org.apache.iotdb.db.service.RegisterManager" level="INFO"/>
<logger name="org.apache.iotdb.db.service.IoTDB" level="WARN"/>
<logger name="org.apache.iotdb.db.service.RPCService" level="INFO"/>
......
......@@ -284,7 +284,6 @@ public class CompactionUtils {
} else {
fileSuffix = IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
}
// checkAndUpdateTargetFileResources(targetResources, fileSuffix);
for (TsFileResource targetResource : targetResources) {
moveOneTargetFile(targetResource, fileSuffix, fullStorageGroupName);
}
......@@ -304,7 +303,9 @@ public class CompactionUtils {
File newFile =
new File(
targetResource.getTsFilePath().replace(tmpFileSuffix, TsFileConstant.TSFILE_SUFFIX));
FSFactoryProducer.getFSFactory().moveFile(targetResource.getTsFile(), newFile);
if (!newFile.exists()) {
FSFactoryProducer.getFSFactory().moveFile(targetResource.getTsFile(), newFile);
}
// serialize xxx.tsfile.resource
targetResource.setFile(newFile);
......@@ -374,7 +375,6 @@ public class CompactionUtils {
ModificationFile.getCompactionMods(sourceFile);
Collection<Modification> newModification = compactionModificationFile.getModifications();
compactionModificationFile.close();
sourceFile.resetModFile();
// write the new modifications to its old modification file
try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
for (Modification modification : newModification) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.compaction.cross;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractCrossSpaceCompactionRecoverTask
extends AbstractCrossSpaceCompactionTask {
public AbstractCrossSpaceCompactionRecoverTask(
String fullStorageGroupName, long timePartition, AtomicInteger currentTaskNum) {
super(fullStorageGroupName, timePartition, currentTaskNum);
}
}
......@@ -180,7 +180,6 @@ public class InnerSpaceCompactionUtils {
ModificationFile.getCompactionMods(sourceFile);
Collection<Modification> newModification = compactionModificationFile.getModifications();
compactionModificationFile.close();
sourceFile.resetModFile();
// write the new modifications to its old modification file
try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
for (Modification modification : newModification) {
......
......@@ -97,6 +97,8 @@ public class TsFileResource {
private ModificationFile modFile;
private ModificationFile compactionModFile;
protected volatile boolean closed = false;
private volatile boolean deleted = false;
volatile boolean isMerging = false;
......@@ -357,14 +359,14 @@ public class TsFileResource {
}
public ModificationFile getCompactionModFile() {
if (modFile == null) {
if (compactionModFile == null) {
synchronized (this) {
if (modFile == null) {
modFile = ModificationFile.getCompactionMods(this);
if (compactionModFile == null) {
compactionModFile = ModificationFile.getCompactionMods(this);
}
}
}
return modFile;
return compactionModFile;
}
public void resetModFile() {
......@@ -442,6 +444,10 @@ public class TsFileResource {
modFile.close();
modFile = null;
}
if (compactionModFile != null) {
compactionModFile.close();
compactionModFile = null;
}
processor = null;
pathToChunkMetadataListMap = null;
pathToReadOnlyMemChunkMap = null;
......
......@@ -2006,10 +2006,13 @@ public class VirtualStorageGroupProcessor {
// we have to set modification offset to MAX_VALUE, as the offset of source chunk may
// change after compaction
deletion.setFileOffset(Long.MAX_VALUE);
// write deletion into modification file
// write deletion into compaction modification file
tsFileResource.getCompactionModFile().write(deletion);
// write deletion into modification file to enable query during compaction
tsFileResource.getModFile().write(deletion);
// remember to close mod file
tsFileResource.getCompactionModFile().close();
tsFileResource.getModFile().close();
} else {
deletion.setFileOffset(tsFileResource.getTsFileSize());
// write deletion into modification file
......
......@@ -191,15 +191,16 @@ public class AbstractCompactionTest {
TsFileResource resource = new TsFileResource(file);
int deviceStartindex = 0;
if (isAlign) {
deviceStartindex = TsFileGeneratorUtils.getAlignDeviceOffset();
for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
}
} else {
for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
}
deviceStartindex = TsFileGeneratorUtils.getAlignDeviceOffset();
}
for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
}
resource.updatePlanIndexes(fileVersion);
resource.setClosed(true);
......
......@@ -58,7 +58,7 @@ public class FakedCrossSpaceCompactionTask extends RewriteCrossSpaceCompactionTa
((FakedTsFileResource) resource)
.setTsFileSize(resource.getTsFileSize() + avgSizeAddToSeqFile);
}
selectedSeqTsFileResourceList.clear();
selectedUnSeqTsFileResourceList.clear();
// unSeqTsFileResourceList.clear();
}
}
......@@ -45,7 +45,7 @@
<logger name="org.eclipse.jetty.util.thread.QueuedThreadPool" level="INFO"/>
<logger name="org.apache.iotdb.db.service.metrics.MetricsService" level="INFO"/>
<logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBMergeIT" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBCompactionIT" level="INFO"/>
<logger name="org.apache.iotdb.db.service.RegisterManager" level="INFO"/>
<logger name="org.apache.iotdb.db.service.IoTDB" level="WARN"/>
<logger name="org.apache.iotdb.db.service.RPCService" level="INFO"/>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册