未验证 提交 21a07404 编写于 作者: Z Zesong Sun 提交者: GitHub

[IOTDB-805] Fix BufferUnderflowException when querying TsFile stored in HDFS (#1519)

上级 f1adbb41
......@@ -72,7 +72,6 @@ public class HDFSOutput implements TsFileOutput {
@Override
public void close() throws IOException {
flush();
fsDataOutputStream.close();
}
......@@ -83,7 +82,7 @@ public class HDFSOutput implements TsFileOutput {
@Override
public void flush() throws IOException {
this.fsDataOutputStream.flush();
this.fsDataOutputStream.hflush();
}
@Override
......
......@@ -1973,7 +1973,7 @@ public class StorageGroupProcessor {
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName));
newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
}
}
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
......@@ -2264,10 +2264,9 @@ public class StorageGroupProcessor {
File targetFile;
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
storageGroupName + File.separatorChar + filePartitionId + File.separator
+ tsFileResource
.getTsFile().getName());
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
......@@ -2279,7 +2278,7 @@ public class StorageGroupProcessor {
break;
case LOAD_SEQUENCE:
targetFile =
new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
storageGroupName + File.separatorChar + filePartitionId + File.separator
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
......@@ -2310,9 +2309,9 @@ public class StorageGroupProcessor {
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
}
File syncedResourceFile = new File(
File syncedResourceFile = fsFactory.getFile(
syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
File targetResourceFile = new File(
File targetResourceFile = fsFactory.getFile(
targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
try {
FileUtils.moveFile(syncedResourceFile, targetResourceFile);
......
......@@ -50,8 +50,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
filePath,
e);
filePath, e);
return null;
}
}
......
......@@ -28,7 +28,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,6 +44,7 @@ public class HDFSFactory implements FSFactory {
private static Method getBufferedOutputStream;
private static Method listFilesBySuffix;
private static Method listFilesByPrefix;
private static Method renameTo;
static {
try {
......@@ -59,6 +59,7 @@ public class HDFSFactory implements FSFactory {
getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
renameTo = clazz.getMethod("renameTo", File.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger
.error("Failed to get Hadoop file system. Please check your dependency of Hadoop module.",
......@@ -173,9 +174,12 @@ public class HDFSFactory implements FSFactory {
}
public void moveFile(File srcFile, File destFile) {
boolean rename = srcFile.renameTo(destFile);
if (!rename) {
logger.error("Failed to rename file from {} to {}. ", srcFile.getName(), destFile.getName());
try {
renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
srcFile.getName(), destFile.getName());
}
}
......
......@@ -33,7 +33,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
......@@ -768,8 +767,11 @@ public class TsFileSequenceReader implements AutoCloseable {
throw new IOException("reach the end of the data");
}
} else {
if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
throw new IOException("reach the end of the data");
long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size);
if (actualReadSize != size) {
throw new IOException(
String.format("reach the end of the data. Size of data that want to read: %s,"
+ "actual read size: %s, posiotion: %s", size, actualReadSize, position));
}
}
buffer.flip();
......
......@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,7 +45,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
if (logger.isDebugEnabled()) {
logger.debug("{} writer is opened.", file.getName());
}
this.out = new LocalTsFileOutput(file, true);
this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
this.file = file;
// file doesn't exist
......
......@@ -177,14 +177,11 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public void startFlushChunk(MeasurementSchema measurementSchema,
CompressionType compressionCodecName,
TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, int dataSize,
int numOfPages)
throws IOException {
CompressionType compressionCodecName, TSDataType tsDataType, TSEncoding encodingType,
Statistics<?> statistics, int dataSize, int numOfPages) throws IOException {
currentChunkMetadata = new ChunkMetadata(measurementSchema.getMeasurementId(), tsDataType,
out.getPosition(),
statistics);
out.getPosition(), statistics);
ChunkHeader header = new ChunkHeader(measurementSchema.getMeasurementId(), dataSize, tsDataType,
compressionCodecName, encodingType, numOfPages);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册