提交 9f24a93e 编写于 作者: J JackieTien97 提交者: Jialin Qiao

improve recover process

上级 75ad80cf
......@@ -86,11 +86,11 @@ public class HDFSOutput implements TsFileOutput {
}
@Override
public void truncate(long position) throws IOException {
public void truncate(long size) throws IOException {
if (fs.exists(path)) {
fsDataOutputStream.close();
}
fs.truncate(path, position);
fs.truncate(path, size);
if (fs.exists(path)) {
fsDataOutputStream = fs.append(path);
}
......
......@@ -91,7 +91,7 @@ public class TsFileRecoverPerformer {
* data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
*
* @return a RestorableTsFileIOWriter and a list of RestorableTsFileIOWriter of vmfiles, if the
* file and the vmfiles are not closed before crush, so these writers can be used to continue
* file and the vmfiles are not closed before crash, so these writers can be used to continue
* writing
*/
public Pair<RestorableTsFileIOWriter, List<List<RestorableTsFileIOWriter>>> recover()
......@@ -147,13 +147,13 @@ public class TsFileRecoverPerformer {
"recover the resource file failed: " + filePath
+ RESOURCE_SUFFIX + e);
}
} else {
// tsfile has crashed
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
}
// tsfile has crashed
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
// If the vm is not enable, the walTargetWriter points to the tsfile.
// If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
// if the vm is enable and flush log does not exist, the walTargetWriter is null.
......
......@@ -21,7 +21,6 @@ package org.apache.iotdb.tsfile.read;
public class TsFileCheckStatus {
public static final long COMPLETE_FILE = -1;
public static final long ONLY_MAGIC_HEAD = -2;
public static final long INCOMPATIBLE_FILE = -3;
public static final long FILE_NOT_FOUND = -4;
......
......@@ -96,7 +96,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
* @param file -given file name
* @param file -given file name
* @param loadMetadataSize -whether load meta data size
*/
public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
......@@ -137,7 +137,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
* @param input -given input
* @param input -given input
* @param loadMetadataSize -load meta data size
*/
public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
......@@ -155,10 +155,10 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
* @param input the input of a tsfile. The current position should be a markder and then a chunk
* Header, rather than the magic number
* @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
* of the input to the current position
* @param input the input of a tsfile. The current position should be a markder and
* then a chunk Header, rather than the magic number
* @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
* of the input to the current position
* @param fileMetadataSize the byte size of the file metadata in the input
*/
public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
......@@ -508,9 +508,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
*
* @param metadataIndex MetadataIndexEntry
* @param buffer byte buffer
* @param deviceId String
* @param metadataIndex MetadataIndexEntry
* @param buffer byte buffer
* @param deviceId String
* @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
*/
private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer,
......@@ -584,12 +584,13 @@ public class TsFileSequenceReader implements AutoCloseable {
* Get target MetadataIndexEntry and its end offset
*
* @param metadataIndex given MetadataIndexNode
* @param name target device / measurement name
* @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or INTERNAL_MEASUREMENT. When
* searching for a device node, return when it is not INTERNAL_DEVICE. Likewise, when searching
* for a measurement node, return when it is not INTERNAL_MEASUREMENT. This works for the
* situation when the index tree does NOT have the device level and ONLY has the measurement
* level.
* @param name target device / measurement name
* @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or
* INTERNAL_MEASUREMENT. When searching for a device node, return when it is
* not INTERNAL_DEVICE. Likewise, when searching for a measurement node,
* return when it is not INTERNAL_MEASUREMENT. This works for the situation
* when the index tree does NOT have the device level and ONLY has the
* measurement level.
* @return target MetadataIndexEntry, endOffset pair
*/
private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
......@@ -616,7 +617,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
*
* @param position the offset of the chunk group footer in the file
* @param position the offset of the chunk group footer in the file
* @param markerRead true if the offset does not contains the marker , otherwise false
* @return a CHUNK_GROUP_FOOTER
* @throws IOException io error
......@@ -649,9 +650,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read the chunk's header.
*
* @param position the file offset of this chunk's header
* @param position the file offset of this chunk's header
* @param chunkHeaderSize the size of chunk's header
* @param markerRead true if the offset does not contains the marker , otherwise false
* @param markerRead true if the offset does not contains the marker , otherwise false
*/
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
throws IOException {
......@@ -756,8 +757,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* changed.
*
* @param position the start position of data in the tsFileInput, or the current position if
* position = -1
* @param size the size of data that want to read
* position = -1
* @param size the size of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long position, int size) throws IOException {
......@@ -783,8 +784,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* position.
*
* @param start the start position of data in the tsFileInput, or the current position if position
* = -1
* @param end the end position of data that want to read
* = -1
* @param end the end position of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long start, long end) throws IOException {
......@@ -801,11 +802,11 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Self Check the file and return the position before where the data is safe.
*
* @param newSchema the schema on each time series in the file
* @param newSchema the schema on each time series in the file
* @param chunkGroupMetadataList ChunkGroupMetadata List
* @param versionInfo version pair List
* @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
* parameter will be not modified.
* @param versionInfo version pair List
* @param fastFinish if true and the file is complete, then newSchema and
* chunkGroupMetadataList parameter will be not modified.
* @return the position of the file that is fine. All data after the position in the file should
* be truncated.
*/
......@@ -835,15 +836,15 @@ public class TsFileSequenceReader implements AutoCloseable {
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
String magic = readHeadMagic();
tsFileInput.position(headerLength);
if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER
.equals(readTailMagic())) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
tsFileInput.position(headerLength);
if (fileSize == headerLength) {
return TsFileCheckStatus.ONLY_MAGIC_HEAD;
} else if (readTailMagic().equals(magic)) {
return headerLength;
} else if (isComplete()) {
loadMetadataSize();
if (fastFinish) {
return TsFileCheckStatus.COMPLETE_FILE;
......@@ -851,7 +852,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
boolean newChunkGroup = true;
// not a complete file, we will recover it...
long truncatedPosition = headerLength;
long truncatedSize = headerLength;
byte marker;
int chunkCnt = 0;
List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
......@@ -900,7 +901,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
newChunkGroup = true;
truncatedPosition = this.position();
truncatedSize = this.position();
totalChunkNum += chunkCnt;
chunkCnt = 0;
......@@ -909,7 +910,7 @@ public class TsFileSequenceReader implements AutoCloseable {
case MetaMarker.VERSION:
long version = readVersion();
versionInfo.add(new Pair<>(position(), version));
truncatedPosition = this.position();
truncatedSize = this.position();
break;
default:
// the disk file is corrupted, using this file may be dangerous
......@@ -918,14 +919,14 @@ public class TsFileSequenceReader implements AutoCloseable {
}
// now we read the tail of the data section, so we are sure that the last
// ChunkGroupFooter is complete.
truncatedPosition = this.position() - 1;
truncatedSize = this.position() - 1;
} catch (Exception e) {
logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
file, this.position(), e.getMessage());
}
// Despite the completeness of the data section, we will discard current FileMetadata
// so that we can continue to write data into this tsfile.
return truncatedPosition;
return truncatedSize;
}
public int getTotalChunkNum() {
......@@ -992,7 +993,7 @@ public class TsFileSequenceReader implements AutoCloseable {
* get device names which has valid chunks in [start, end)
*
* @param start start of the partition
* @param end end of the partition
* @param end end of the partition
* @return device names in range
*/
public List<String> getDeviceNameInRange(long start, long end) throws IOException {
......@@ -1010,8 +1011,8 @@ public class TsFileSequenceReader implements AutoCloseable {
* Check if the device has at least one Chunk in this partition
*
* @param seriesMetadataMap chunkMetaDataList of each measurement
* @param start the start position of the space partition
* @param end the end position of the space partition
* @param start the start position of the space partition
* @param end the end position of the space partition
*/
private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
long start, long end) {
......
......@@ -79,8 +79,8 @@ public class LocalTsFileOutput implements TsFileOutput {
}
@Override
public void truncate(long position) throws IOException {
outputStream.getChannel().truncate(position);
public void truncate(long size) throws IOException {
outputStream.getChannel().truncate(size);
}
}
......@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
......@@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory;
public class RestorableTsFileIOWriter extends TsFileIOWriter {
private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
private long truncatedPosition = -1;
private long truncatedSize = -1;
private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
private int lastFlushedChunkGroupIndex = 0;
......@@ -91,32 +90,21 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
// this tsfile is complete
if (reader.isComplete()) {
truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
totalChunkNum = reader.getTotalChunkNum();
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
out.close();
return;
}
// uncompleted file
truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
totalChunkNum = reader.getTotalChunkNum();
if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
} else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
crashed = true;
canWrite = true;
out.truncate(
(long) TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length);
} else {
crashed = true;
canWrite = true;
// remove broken data
out.truncate(truncatedPosition);
out.truncate(truncatedSize);
}
}
}
......@@ -153,8 +141,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return new RestorableTsFileIOWriter(file);
}
long getTruncatedPosition() {
return truncatedPosition;
long getTruncatedSize() {
return truncatedSize;
}
public Map<Path, MeasurementSchema> getKnownSchema() {
......
......@@ -78,8 +78,8 @@ public interface TsFileOutput {
/**
* The same with {@link java.nio.channels.FileChannel#truncate(long)}.
*
* @param position -position
* @param size size The new size, a non-negative byte count
*/
void truncate(long position) throws IOException;
void truncate(long size) throws IOException;
}
......@@ -29,6 +29,7 @@ import java.io.File;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.constant.TestConstant;
import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
import org.apache.iotdb.tsfile.file.MetaMarker;
......@@ -81,10 +82,11 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileCheckStatus.ONLY_MAGIC_HEAD, rWriter.getTruncatedPosition());
assertEquals(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length, rWriter.getTruncatedSize());
rWriter = new RestorableTsFileIOWriter(file);
assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedSize());
assertFalse(rWriter.canWrite());
rWriter.close();
assertTrue(file.delete());
......@@ -101,7 +103,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
rWriter.getTruncatedPosition());
rWriter.getTruncatedSize());
assertTrue(file.delete());
}
......@@ -115,7 +117,7 @@ public class RestorableTsFileIOWriterTest {
TsFileWriter writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
rWriter.getTruncatedPosition());
rWriter.getTruncatedSize());
assertTrue(file.delete());
}
......@@ -133,7 +135,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length + TsFileIOWriter.versionNumberBytes.length,
rWriter.getTruncatedPosition());
rWriter.getTruncatedSize());
assertTrue(file.delete());
}
......@@ -159,7 +161,7 @@ public class RestorableTsFileIOWriterTest {
writer = new TsFileWriter(rWriter);
writer.close();
// truncate version marker and version
assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedPosition());
assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize());
assertTrue(file.delete());
}
......@@ -219,7 +221,7 @@ public class RestorableTsFileIOWriterTest {
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertNotEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertNotEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedSize());
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(new Path("d1.s1"));
assertNotNull(chunkMetadataList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册