未验证 提交 279b41c2 编写于 作者: Z Zesong Sun 提交者: GitHub

Fix merge caused errors for TsFile storage in HDFS (#1153)

上级 515ea6b1
......@@ -130,7 +130,7 @@ public class HDFSFile extends File {
@Override
public File getParentFile() {
return new HDFSFile(hdfsPath.getParent().getName());
return new HDFSFile(hdfsPath.getParent().toUri().toString());
}
@Override
......@@ -151,10 +151,7 @@ public class HDFSFile extends File {
@Override
public boolean mkdirs() {
try {
if (exists()) {
return false;
}
return fs.mkdirs(hdfsPath);
return !exists() && fs.mkdirs(hdfsPath);
} catch (IOException e) {
logger.error("Fail to create directory {}. ", hdfsPath.toUri().toString(), e);
return false;
......@@ -208,10 +205,7 @@ public class HDFSFile extends File {
@Override
public boolean equals(Object obj) {
if ((obj != null) && (obj instanceof HDFSFile)) {
return compareTo((HDFSFile) obj) == 0;
}
return false;
return obj instanceof HDFSFile && compareTo((HDFSFile) obj) == 0;
}
@Override
......@@ -235,7 +229,7 @@ public class HDFSFile extends File {
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
try {
return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath))));
return new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filePath))));
} catch (IOException e) {
logger.error("Failed to get buffered writer for {}. ", filePath, e);
return null;
......@@ -290,22 +284,22 @@ public class HDFSFile extends File {
}
@Override
public String getParent() {
throw new UnsupportedOperationException("Unsupported operation.");
public File getAbsoluteFile() {
return new HDFSFile(getAbsolutePath());
}
@Override
public boolean isAbsolute() {
public String getParent() {
throw new UnsupportedOperationException("Unsupported operation.");
}
@Override
public File[] listFiles(FileFilter filter) {
public boolean isAbsolute() {
throw new UnsupportedOperationException("Unsupported operation.");
}
@Override
public File getAbsoluteFile() {
public File[] listFiles(FileFilter filter) {
throw new UnsupportedOperationException("Unsupported operation.");
}
......
......@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
......@@ -38,6 +37,8 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -63,6 +64,8 @@ class MergeFileTask {
private MergeResource resource;
private List<TsFileResource> unmergedFiles;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
this.taskName = taskName;
......@@ -157,10 +160,11 @@ class MergeFileTask {
newFileWriter.getFile().delete();
File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
FileUtils
.moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
fsFactory.moveFile(seqFile.getFile(), nextMergeVersionFile);
fsFactory.moveFile(fsFactory
.getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
......@@ -226,10 +230,11 @@ class MergeFileTask {
seqFile.getFile().delete();
File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
FileUtils
.moveFile(new File(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
fsFactory.moveFile(fsFactory
.getFile(seqFile.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
......@@ -240,7 +245,7 @@ class MergeFileTask {
String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
.split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
int mergeVersion = Integer.parseInt(splits[2]) + 1;
return new File(seqFile.getParentFile(),
return fsFactory.getFile(seqFile.getParentFile(),
splits[0] + IoTDBConstant.TSFILE_NAME_SEPARATOR + splits[1]
+ IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeVersion + TSFILE_SUFFIX);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册