未验证 提交 463b99c2 编写于 作者: X Xiangdong Huang 提交者: GitHub

[IOTDB-36]Enable recover data from a incomplete TsFile and continue to write (#87)

* add a restorable tsfile writer without restore file
上级 0af05d53
......@@ -51,6 +51,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class RestorableTsFileIOWriterTest {
private RestorableTsFileIOWriter writer;
......
......@@ -113,6 +113,10 @@ public class TSFileDescriptor {
TSFileConfig.pageSizeInByte = Integer
.parseInt(properties
.getProperty("page_size_in_byte", Integer.toString(TSFileConfig.pageSizeInByte)));
if (TSFileConfig.pageSizeInByte > TSFileConfig.groupSizeInByte) {
LOGGER.warn("page_size is greater than group size, will set it as the same with group size");
TSFileConfig.pageSizeInByte = TSFileConfig.groupSizeInByte;
}
TSFileConfig.maxNumberOfPointsInPage = Integer.parseInt(
properties
.getProperty("max_number_of_points_in_page",
......
......@@ -122,8 +122,23 @@ public class TsFileSequenceReader {
* this function does not modify the position of the file reader.
*/
public String readHeadMagic() throws IOException {
return readHeadMagic(false);
}
/**
* this function does not modify the position of the file reader.
*
* @param movePosition whether move the position of the file reader after reading the magic header
* to the end of the magic head string.
*/
public String readHeadMagic(boolean movePosition) throws IOException {
ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
tsFileInput.read(magicStringBytes, 0);
if (movePosition) {
tsFileInput.position(0);
tsFileInput.read(magicStringBytes);
} else {
tsFileInput.read(magicStringBytes, 0);
}
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
......@@ -267,6 +282,22 @@ public class TsFileSequenceReader {
return tsFileInput.position();
}
public void skipPageData(PageHeader header) throws IOException {
tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
}
/**
*
* @param header
* @param position
* @return
* @throws IOException
*/
public long skipPageData(PageHeader header, long position) throws IOException {
return position + header.getCompressedSize();
}
public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
return readPage(header, type, -1);
}
......@@ -293,7 +324,9 @@ public class TsFileSequenceReader {
*/
public byte readMarker() throws IOException {
markerBuffer.clear();
ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), markerBuffer);
if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), markerBuffer) == 0) {
throw new IOException("reach the end of the file.");
}
markerBuffer.flip();
return markerBuffer.get();
}
......@@ -310,6 +343,10 @@ public class TsFileSequenceReader {
return this.file;
}
public long fileSize() throws IOException {
return tsFileInput.size();
}
/**
* read data from tsFileInput, from the current position (if position = -1), or the given
* position. <br> if position = -1, the tsFileInput's position will be changed to the current
......@@ -324,9 +361,13 @@ public class TsFileSequenceReader {
private ByteBuffer readData(long position, int size) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
if (position == -1) {
ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer);
if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer) != size) {
throw new IOException("reach the end of the data");
}
} else {
ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer, position, size);
if (ReadWriteIOUtils.readAsPossible(tsFileInput.wrapAsFileChannel(), buffer, position, size) != size) {
throw new IOException("reach the end of the data");
}
}
buffer.flip();
return buffer;
......@@ -339,4 +380,5 @@ public class TsFileSequenceReader {
return ReadWriteIOUtils
.readAsPossible(tsFileInput.wrapAsFileChannel(), target, position, length);
}
}
......@@ -22,12 +22,12 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.write.chunk.ChunkGroupWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
......@@ -88,6 +88,15 @@ public class TsFileWriter {
this(new TsFileIOWriter(file), new FileSchema(), TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
* @param fileWriter the io writer of this TsFile
*/
public TsFileWriter(TsFileIOWriter fileWriter) throws IOException {
this(fileWriter, new FileSchema(), TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
......@@ -126,11 +135,22 @@ public class TsFileWriter {
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema, TSFileConfig conf) {
protected TsFileWriter(TsFileIOWriter fileWriter, FileSchema schema, TSFileConfig conf)
throws IOException {
if (!fileWriter.canWrite()) {
throw new IOException(
"the given file Writer does not support writing any more. Maybe it is an complete TsFile");
}
this.fileWriter = fileWriter;
this.schema = schema;
this.schema.registerMeasurements(fileWriter.getKnownSchema());
this.pageSize = TSFileConfig.pageSizeInByte;
this.chunkGroupSizeThreshold = TSFileConfig.groupSizeInByte;
if (this.pageSize >= chunkGroupSizeThreshold) {
LOG.warn(
"TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
+ " size or decrease page size. ", pageSize, chunkGroupSizeThreshold);
}
}
/**
......@@ -290,4 +310,20 @@ public class TsFileWriter {
flushAllChunkGroups();
fileWriter.endFile(this.schema);
}
/**
* this function is only for Test.
* @return
*/
public TsFileIOWriter getIOWriter() {
return this.fileWriter;
}
/**
* this function is only for Test
* @throws IOException
*/
public void flushForTest() throws IOException {
flushAllChunkGroups();
}
}
......@@ -64,8 +64,9 @@ public class TSRecord {
*
* @param tuple data point to be added
*/
public void addTuple(DataPoint tuple) {
public TSRecord addTuple(DataPoint tuple) {
this.dataPointList.add(tuple);
return this;
}
/**
......
......@@ -104,7 +104,7 @@ public class FileSchema {
/**
* register all measurementSchemas in measurements.
*/
private void registerMeasurements(Map<String, MeasurementSchema> measurements) {
public void registerMeasurements(Map<String, MeasurementSchema> measurements) {
measurements.forEach((id, md) -> registerMeasurement(md));
}
......
......@@ -31,12 +31,16 @@ import java.nio.ByteBuffer;
*/
public class DefaultTsFileOutput implements TsFileOutput {
FileOutputStream outputStream;
private FileOutputStream outputStream;
public DefaultTsFileOutput(File file) throws FileNotFoundException {
this.outputStream = new FileOutputStream(file);
}
public DefaultTsFileOutput(File file, boolean append) throws FileNotFoundException {
this.outputStream = new FileOutputStream(file, append);
}
public DefaultTsFileOutput(FileOutputStream outputStream) {
this.outputStream = outputStream;
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.writer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a restorable tsfile which do not depend on a restore file.
*/
public class NativeRestorableIOWriter extends TsFileIOWriter {
private static final Logger LOGGER = LoggerFactory
.getLogger(NativeRestorableIOWriter.class);
private long truncatedPosition = -1;
private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
long getTruncatedPosition() {
return truncatedPosition;
}
public NativeRestorableIOWriter(File file) throws IOException {
super();
long fileSize;
if (!file.exists()) {
this.out = new DefaultTsFileOutput(file, true);
startFile();
return;
} else {
fileSize = file.length();
this.out = new DefaultTsFileOutput(file, true);
}
//we need to read data to recover TsFileIOWriter.chunkGroupMetaDataList
//and remove broken data if exists.
ChunkMetaData currentChunk;
String measurementID;
TSDataType dataType;
long fileOffsetOfChunk;
long startTimeOfChunk = 0;
long endTimeOfChunk = 0;
long numOfPoints = 0;
ChunkGroupMetaData currentChunkGroup;
List<ChunkMetaData> chunks = null;
String deviceID;
long startOffsetOfChunkGroup = 0;
long endOffsetOfChunkGroup;
long versionOfChunkGroup = 0;
boolean haveReadAnUnverifiedGroupFooter = false;
boolean newGroup = true;
TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false);
if (fileSize <= magicStringBytes.length) {
LOGGER.debug("{} only has magic header, does not worth to recover.", file.getAbsolutePath());
reader.close();
this.out.truncate(0);
startFile();
truncatedPosition = magicStringBytes.length;
return;
}
String magic = reader.readHeadMagic(true);
if (!magic.equals(new String(magicStringBytes))) {
throw new IOException(String
.format("%s is not using TsFile format, and will be ignored...", file.getAbsolutePath()));
}
if (reader.readTailMagic().equals(magic)) {
LOGGER.debug("{} is an complete TsFile.", file.getAbsolutePath());
canWrite = false;
reader.close();
out.close();
return;
}
// not a complete file, we will recover it...
truncatedPosition = magicStringBytes.length;
boolean goon = true;
byte marker;
try {
while (goon && (marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
//this is a chunk.
if (haveReadAnUnverifiedGroupFooter) {
//now we are sure that the last ChunkGroupFooter is complete.
haveReadAnUnverifiedGroupFooter = false;
truncatedPosition = reader.position() - 1;
newGroup = true;
}
if (newGroup) {
chunks = new ArrayList<>();
startOffsetOfChunkGroup = reader.position() - 1;
newGroup = false;
}
//if there is something wrong with a chunk, we will drop this part of data
// (the whole ChunkGroup)
ChunkHeader header = reader.readChunkHeader();
measurementID = header.getMeasurementID();
knownSchemas.putIfAbsent(measurementID,
new MeasurementSchema(measurementID, header.getDataType(),
header.getEncodingType(), header.getCompressionType()));
dataType = header.getDataType();
fileOffsetOfChunk = reader.position() - 1;
if (header.getNumOfPages() > 0) {
PageHeader pageHeader = reader.readPageHeader(header.getDataType());
numOfPoints += pageHeader.getNumOfValues();
startTimeOfChunk = pageHeader.getMinTimestamp();
endTimeOfChunk = pageHeader.getMaxTimestamp();
reader.skipPageData(pageHeader);
}
for (int j = 1; j < header.getNumOfPages() - 1; j++) {
//a new Page
PageHeader pageHeader = reader.readPageHeader(header.getDataType());
reader.skipPageData(pageHeader);
}
if (header.getNumOfPages() > 1) {
PageHeader pageHeader = reader.readPageHeader(header.getDataType());
endTimeOfChunk = pageHeader.getMaxTimestamp();
reader.skipPageData(pageHeader);
}
currentChunk = new ChunkMetaData(measurementID, dataType, fileOffsetOfChunk,
startTimeOfChunk, endTimeOfChunk);
currentChunk.setNumOfPoints(numOfPoints);
chunks.add(currentChunk);
numOfPoints = 0;
break;
case MetaMarker.CHUNK_GROUP_FOOTER:
//this is a chunk group
//if there is something wrong with the chunkGroup Footer, we will drop this part of data
//because we can not guarantee the correction of the deviceId.
ChunkGroupFooter chunkGroupFooter = reader.readChunkGroupFooter();
deviceID = chunkGroupFooter.getDeviceID();
endOffsetOfChunkGroup = reader.position();
currentChunkGroup = new ChunkGroupMetaData(deviceID, chunks, startOffsetOfChunkGroup);
currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
currentChunkGroup.setVersion(versionOfChunkGroup++);
chunkGroupMetaDataList.add(currentChunkGroup);
// though we have read the current ChunkMetaData from Disk, it may be incomplete.
// because if the file only loses one byte, the ChunkMetaData.deserialize() returns ok,
// while the last filed of the ChunkMetaData is incorrect.
// So, only reading the next MASK, can make sure that this ChunkMetaData is complete.
haveReadAnUnverifiedGroupFooter = true;
break;
default:
// it is impossible that we read an incorrect data.
MetaMarker.handleUnexpectedMarker(marker);
goon = false;
}
}
//now we read the tail of the file, so we are sure that the last ChunkGroupFooter is complete.
truncatedPosition = reader.position() - 1;
} catch (IOException e2) {
//if it is the end of the file, and we read an unverifiedGroupFooter, we must remove this ChunkGroup
if (haveReadAnUnverifiedGroupFooter && !chunkGroupMetaDataList.isEmpty()) {
chunkGroupMetaDataList.remove(chunkGroupMetaDataList.size() - 1);
}
} finally {
//something wrong or all data is complete. We will discard current FileMetadata
// so that we can continue to write data into this tsfile.
LOGGER.info("File {} has {} bytes, and will be truncated from {}.",
file.getAbsolutePath(), file.length(), truncatedPosition);
out.truncate(truncatedPosition);
reader.close();
}
}
@Override
public Map<String, MeasurementSchema> getKnownSchema() {
return knownSchemas;
}
}
......@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -67,6 +68,7 @@ public class TsFileIOWriter {
protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
private ChunkGroupMetaData currentChunkGroupMetaData;
private ChunkMetaData currentChunkMetaData;
protected boolean canWrite = true;
/**
* empty construct function.
......@@ -236,6 +238,7 @@ public class TsFileIOWriter {
// close file
out.close();
canWrite = false;
LOG.info("output stream is closed");
}
......@@ -315,4 +318,29 @@ public class TsFileIOWriter {
return chunkGroupMetaDataList;
}
public boolean canWrite() {
return canWrite;
}
/**
* close the inputstream or file channel in force. This is just used for Testing.
*/
void forceClose() throws IOException {
out.close();
}
void writeSeparatorMaskForTest() throws IOException {
out.write(new byte[]{MetaMarker.SEPARATOR});
}
void writeChunkMaskForTest() throws IOException {
out.write(new byte[]{MetaMarker.CHUNK_HEADER});
}
/**
* @return all Schema that this ioWriter know. By default implementation (TsFileIOWriter.class),
* it is empty
*/
public Map<String, MeasurementSchema> getKnownSchema() {
return Collections.emptyMap();
}
}
......@@ -50,28 +50,27 @@ public class TsFileReadWriteTest {
private final double delta = 0.0000001;
private String path = "read_write_rle.tsfile";
private File f;
private TsFileWriter tsFileWriter;
@Before
public void setUp() throws Exception {
f = new File(path);
if (f.exists()) {
f.delete();
assertTrue(f.delete());
}
tsFileWriter = new TsFileWriter(f);
}
@After
public void tearDown() throws Exception {
f = new File(path);
if (f.exists()) {
f.delete();
assertTrue(f.delete());;
}
}
@Test
public void intTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
TsFileWriter tsFileWriter = new TsFileWriter(f);
// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
......@@ -110,6 +109,7 @@ public class TsFileReadWriteTest {
public void longTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
......@@ -147,6 +147,7 @@ public class TsFileReadWriteTest {
public void floatTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
......@@ -185,6 +186,7 @@ public class TsFileReadWriteTest {
public void doubleTest() throws IOException, WriteProcessException {
int floatCount = 1024 * 1024 * 13 + 1023;
// add measurements into file schema
TsFileWriter tsFileWriter = new TsFileWriter(f);
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.DOUBLE, TSEncoding.RLE));
for (long i = 1; i < floatCount; i++) {
......@@ -220,13 +222,7 @@ public class TsFileReadWriteTest {
@Test
public void readEmptyMeasurementTest() throws IOException, WriteProcessException {
String path = "test.tsfile";
File f = new File(path);
if (f.exists()) {
assertTrue(f.delete());
}
TsFileWriter tsFileWriter = new TsFileWriter(f);
// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.writer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class NativeRestorableIOWriterTest {
private static final String FILE_NAME = "test.ts";
@Test
public void testOnlyHeadMagic() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyFirstMask() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
//we have to flush using inner API.
writer.getIOWriter().out.write(new byte[] {MetaMarker.CHUNK_HEADER});
writer.getIOWriter().forceClose();
assertEquals(TsFileIOWriter.magicStringBytes.length + 1, file.length());
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyOneIncompleteChunkHeader() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
ChunkHeader header = new ChunkHeader("s1", 100, TSDataType.FLOAT, CompressionType.SNAPPY,
TSEncoding.PLAIN, 5);
ByteBuffer buffer = ByteBuffer.allocate(header.getSerializedSize());
header.serializeTo(buffer);
buffer.flip();
byte[] data = new byte[3];
buffer.get(data, 0, 3);
writer.getIOWriter().out.write(data);
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyOneChunkHeader() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.getIOWriter()
.startFlushChunk(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN),
CompressionType.SNAPPY, TSDataType.FLOAT,
TSEncoding.PLAIN, new FloatStatistics(), 100, 50, 100, 10);
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyOneChunkHeaderAndSomePage() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
long pos = writer.getIOWriter().getPos();
//let's delete one byte.
writer.getIOWriter().out.truncate(pos - 1);
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyOneChunkGroup() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
assertTrue(file.delete());
}
@Test
public void testOnlyOneChunkGroupAndOneMask() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
writer.getIOWriter().writeChunkMaskForTest();
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
assertNotEquals(TsFileIOWriter.magicStringBytes.length, rWriter.getTruncatedPosition());
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
TsDeviceMetadataIndex index = reader.readFileMetadata().getDeviceMap().get("d1");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
reader.close();
assertTrue(file.delete());
}
@Test
public void testTwoChunkGroupAndMore() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
TsDeviceMetadataIndex index = reader.readFileMetadata().getDeviceMap().get("d1");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
reader.close();
assertTrue(file.delete());
}
@Test
public void testNoSeperatorMask() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
writer.getIOWriter().writeSeparatorMaskForTest();
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
TsDeviceMetadataIndex index = reader.readFileMetadata().getDeviceMap().get("d1");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
index = reader.readFileMetadata().getDeviceMap().get("d2");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
reader.close();
assertTrue(file.delete());
}
@Test
public void testHavingSomeFileMetadata() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(1, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d2").addTuple(new FloatDataPoint("s1", 6))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushForTest();
writer.getIOWriter().writeSeparatorMaskForTest();
writer.getIOWriter().writeSeparatorMaskForTest();
writer.getIOWriter().forceClose();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME);
TsDeviceMetadataIndex index = reader.readFileMetadata().getDeviceMap().get("d1");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
index = reader.readFileMetadata().getDeviceMap().get("d2");
assertEquals(1, reader.readTsDeviceMetaData(index).getChunkGroupMetaDataList().size());
reader.close();
assertTrue(file.delete());
}
@Test
public void testOpenCompleteFile() throws Exception {
File file = new File(FILE_NAME);
TsFileWriter writer = new TsFileWriter(file);
writer.addMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.addMeasurement(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.close();
NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(file);
assertFalse(rWriter.canWrite());
assertTrue(file.delete());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册