提交 fc03f2d6 编写于 作者: J jack870131

Fix bugs in RowBatch

上级 521be407
......@@ -208,7 +208,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
* It uses the interface:
* public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
*/
public class TsFileWrite {
public class TsFileWriteWithTSRecord {
public static void main(String args[]) {
try {
......@@ -295,7 +295,7 @@ public class TsFileWriteWithRowBatch {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
long[] timestamps = rowBatch.timestamps;
Object[] sensors = rowBatch.sensors;
Object[] values = rowBatch.values;
long timestamp = 1;
long value = 1000000L;
......@@ -304,11 +304,11 @@ public class TsFileWriteWithRowBatch {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) sensors[i];
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
......
......@@ -59,7 +59,7 @@ public class TsFileWriteWithRowBatch {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
long[] timestamps = rowBatch.timestamps;
Object[] sensors = rowBatch.sensors;
Object[] values = rowBatch.values;
long timestamp = 1;
long value = 1000000L;
......@@ -68,11 +68,11 @@ public class TsFileWriteWithRowBatch {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) sensors[i];
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
......
......@@ -91,24 +91,25 @@ public class ChunkGroupWriterImpl implements IChunkGroupWriter {
private void writeByDataType(
RowBatch rowBatch, String measurementId, TSDataType dataType, int index) throws IOException {
int batchSize = rowBatch.batchSize;
switch (dataType) {
case INT32:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index], batchSize);
break;
case INT64:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index], batchSize);
break;
case FLOAT:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index], batchSize);
break;
case DOUBLE:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index], batchSize);
break;
case BOOLEAN:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index], batchSize);
break;
case TEXT:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index], batchSize);
break;
default:
throw new UnSupportedDataTypeException(
......
......@@ -200,85 +200,85 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
public void write(long[] timestamps, int[] values) {
public void write(long[] timestamps, int[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, long[] values) {
public void write(long[] timestamps, long[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, boolean[] values) {
public void write(long[] timestamps, boolean[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, float[] values) {
public void write(long[] timestamps, float[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, double[] values) {
public void write(long[] timestamps, double[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, BigDecimal[] values) {
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
public void write(long[] timestamps, Binary[] values) {
public void write(long[] timestamps, Binary[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
......
......@@ -68,37 +68,37 @@ public interface IChunkWriter {
/**
* write time series
*/
void write(long[] timestamps, int[] values);
void write(long[] timestamps, int[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, long[] values);
void write(long[] timestamps, long[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, boolean[] values);
void write(long[] timestamps, boolean[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, float[] values);
void write(long[] timestamps, float[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, double[] values);
void write(long[] timestamps, double[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, BigDecimal[] values);
void write(long[] timestamps, BigDecimal[] values, int batchSize);
/**
* write time series
*/
void write(long[] timestamps, Binary[] values);
void write(long[] timestamps, Binary[] values, int batchSize);
/**
* flush data to TsFileIOWriter.
......
......@@ -124,8 +124,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, boolean[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, boolean[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -134,8 +134,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, int[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, int[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -144,8 +144,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, long[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, long[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -154,8 +154,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, float[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, float[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -164,8 +164,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, double[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, double[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -174,8 +174,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, BigDecimal[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......@@ -184,8 +184,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
public void write(long[] timestamps, Binary[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, Binary[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
......
......@@ -155,7 +155,7 @@ public class TsFileReadWriteTest {
FileSchema fileSchema = new FileSchema();
fileSchema.registerMeasurement(
new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
int rowNum = 1024 * 1024;
int rowNum = 1024 * 1024 * 13 + 1023;
int sensorNum = 1;
TsFileWriter tsFileWriter = new TsFileWriter(f, fileSchema);
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册