diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java index ce70aecb1dac89a87013efe68716cf658632ab25..ea1cd5846ca28fb23f8858366d5e6eed692eb07e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java @@ -31,7 +31,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -349,10 +348,11 @@ public class Tablet { /** serialize Tablet */ public ByteBuffer serialize() throws IOException { - PublicBAOS byteArrayOutputStream = new PublicBAOS(); - DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream); - return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + serialize(outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } } public void serialize(DataOutputStream stream) throws IOException { @@ -366,35 +366,41 @@ public class Tablet { /** Serialize measurement schemas */ private void writeMeasurementSchemas(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(schemas.size(), stream); - for (MeasurementSchema schema : schemas) { - if (schema == null) { - ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); - } else { - ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); - schema.serializeTo(stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream); + if (schemas != null) { + ReadWriteIOUtils.write(schemas.size(), stream); + for (MeasurementSchema schema : schemas) { + if (schema == null) { + ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); + } else { + ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); + schema.serializeTo(stream); + } } } } private void writeTimes(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(timestamps.length, stream); - for (long time : timestamps) { - ReadWriteIOUtils.write(time, stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(timestamps != null), stream); + if (timestamps != null) { + for (int i = 0; i < rowSize; i++) { + ReadWriteIOUtils.write(timestamps[i], stream); + } } } /** Serialize bitmaps */ private void writeBitMaps(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(bitMaps != null ? 1 : 0, stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream); if (bitMaps != null) { - for (BitMap bitMap : bitMaps) { - if (bitMap == null) { - ReadWriteIOUtils.write(0, stream); + int size = (schemas == null ? 0 : schemas.size()); + for (int i = 0; i < size; i++) { + if (bitMaps[i] == null) { + ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); } else { - ReadWriteIOUtils.write(1, stream); - ReadWriteIOUtils.write(bitMap.getSize(), stream); - ReadWriteIOUtils.write(new Binary(bitMap.getByteArray()), stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); + ReadWriteIOUtils.write(bitMaps[i].getSize(), stream); + ReadWriteIOUtils.write(new Binary(bitMaps[i].getByteArray()), stream); } } } @@ -402,63 +408,64 @@ public class Tablet { /** Serialize values */ private void writeValues(DataOutputStream stream) throws IOException { - for (int i = 0; i < values.length; i++) { - serializeColumn(schemas.get(i).getType(), values[i], stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(values != null), stream); + if (values != null) { + int size = (schemas == null ? 0 : schemas.size()); + for (int i = 0; i < size; i++) { + serializeColumn(schemas.get(i).getType(), values[i], stream); + } } } private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream) throws IOException { - switch (dataType) { - case INT32: - int[] intValues = (int[]) column; - ReadWriteIOUtils.write(intValues.length, stream); - for (int j = 0; j < intValues.length; j++) { - ReadWriteIOUtils.write(intValues[j], stream); - } - break; - case INT64: - long[] longValues = (long[]) column; - ReadWriteIOUtils.write(longValues.length, stream); - for (int j = 0; j < longValues.length; j++) { - ReadWriteIOUtils.write(longValues[j], stream); - } - break; - case FLOAT: - float[] floatValues = (float[]) column; - ReadWriteIOUtils.write(floatValues.length, stream); - for (int j = 0; j < floatValues.length; j++) { - ReadWriteIOUtils.write(floatValues[j], stream); - } - break; - case DOUBLE: - double[] doubleValues = (double[]) column; - ReadWriteIOUtils.write(doubleValues.length, stream); - for (int j = 0; j < doubleValues.length; j++) { - ReadWriteIOUtils.write(doubleValues[j], stream); - } - break; - case BOOLEAN: - boolean[] boolValues = (boolean[]) column; - ReadWriteIOUtils.write(boolValues.length, stream); - for (int j = 0; j < boolValues.length; j++) { - ReadWriteIOUtils.write(boolValues[j] ? 1 : 0, stream); - } - break; - case TEXT: - Binary[] binaryValues = (Binary[]) column; - ReadWriteIOUtils.write(binaryValues.length, stream); - for (int j = 0; j < binaryValues.length; j++) { - boolean isNull = (binaryValues[j] == null); - ReadWriteIOUtils.write(isNull ? 1 : 0, stream); - if (!isNull) { - ReadWriteIOUtils.write(binaryValues[j], stream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(column != null), stream); + + if (column != null) { + switch (dataType) { + case INT32: + int[] intValues = (int[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(intValues[j], stream); } - } - break; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", dataType)); + break; + case INT64: + long[] longValues = (long[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(longValues[j], stream); + } + break; + case FLOAT: + float[] floatValues = (float[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(floatValues[j], stream); + } + break; + case DOUBLE: + double[] doubleValues = (double[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(doubleValues[j], stream); + } + break; + case BOOLEAN: + boolean[] boolValues = (boolean[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream); + } + break; + case TEXT: + Binary[] binaryValues = (Binary[]) column; + for (int j = 0; j < rowSize; j++) { + ReadWriteIOUtils.write(BytesUtils.boolToByte(binaryValues[j] != null), stream); + if (binaryValues[j] != null) { + ReadWriteIOUtils.write(binaryValues[j], stream); + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } } } @@ -468,33 +475,43 @@ public class Tablet { int rowSize = ReadWriteIOUtils.readInt(byteBuffer); // deserialize schemas - int schemaSize = ReadWriteIOUtils.readInt(byteBuffer); + int schemaSize = 0; List schemas = new ArrayList<>(); - for (int i = 0; i < schemaSize; i++) { - boolean hasSchema = BytesUtils.byteToBool(byteBuffer.get()); - if (hasSchema) { - schemas.add(MeasurementSchema.deserializeFrom(byteBuffer)); + boolean isSchemasNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isSchemasNotNull) { + schemaSize = ReadWriteIOUtils.readInt(byteBuffer); + for (int i = 0; i < schemaSize; i++) { + boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (hasSchema) { + schemas.add(MeasurementSchema.deserializeFrom(byteBuffer)); + } } } // deserialize times - int timesSize = ReadWriteIOUtils.readInt(byteBuffer); - long[] times = new long[timesSize]; - for (int i = 0; i < timesSize; i++) { - times[i] = ReadWriteIOUtils.readLong(byteBuffer); + long[] times = new long[rowSize]; + boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isTimesNotNull) { + for (int i = 0; i < rowSize; i++) { + times[i] = ReadWriteIOUtils.readLong(byteBuffer); + } } // deserialize bitmaps - boolean hasBitMaps = (ReadWriteIOUtils.readInt(byteBuffer) == 1); - BitMap[] bitMaps = null; - if (hasBitMaps) { + BitMap[] bitMaps = new BitMap[schemaSize]; + boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isBitMapsNotNull) { bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize); } // deserialize values TSDataType[] dataTypes = schemas.stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new); - Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes, schemaSize); + Object[] values = new Object[schemaSize]; + boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isValuesNotNull) { + values = readTabletValuesFromBuffer(byteBuffer, dataTypes, schemaSize, rowSize); + } Tablet tablet = new Tablet(deviceId, schemas, times, values, bitMaps, rowSize); tablet.constructMeasurementIndexMap(); @@ -502,78 +519,82 @@ public class Tablet { } /** deserialize bitmaps */ - public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns) { + public static BitMap[] readBitMapsFromBuffer(ByteBuffer byteBuffer, int columns) { BitMap[] bitMaps = new BitMap[columns]; for (int i = 0; i < columns; i++) { - boolean hasBitMap = (ReadWriteIOUtils.readInt(buffer) == 1); + boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); if (hasBitMap) { - int bitMapSize = ReadWriteIOUtils.readInt(buffer); - byte[] bytes = ReadWriteIOUtils.readBinary(buffer).getValues(); - bitMaps[i] = new BitMap(bitMapSize, bytes); + final int size = ReadWriteIOUtils.readInt(byteBuffer); + final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); + bitMaps[i] = new BitMap(size, valueBinary.getValues()); } } return bitMaps; } /** - * @param buffer data values + * @param byteBuffer data values * @param columns column number */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public static Object[] readTabletValuesFromBuffer( - ByteBuffer buffer, TSDataType[] types, int columns) { + ByteBuffer byteBuffer, TSDataType[] types, int columns, int rowSize) { Object[] values = new Object[columns]; for (int i = 0; i < columns; i++) { - int arraySize = ReadWriteIOUtils.readInt(buffer); - switch (types[i]) { - case BOOLEAN: - boolean[] boolValues = new boolean[arraySize]; - for (int index = 0; index < arraySize; index++) { - boolValues[index] = ReadWriteIOUtils.readInt(buffer) == 1; - } - values[i] = boolValues; - break; - case INT32: - int[] intValues = new int[arraySize]; - for (int index = 0; index < arraySize; index++) { - intValues[index] = ReadWriteIOUtils.readInt(buffer); - } - values[i] = intValues; - break; - case INT64: - long[] longValues = new long[arraySize]; - for (int index = 0; index < arraySize; index++) { - longValues[index] = ReadWriteIOUtils.readLong(buffer); - } - values[i] = longValues; - break; - case FLOAT: - float[] floatValues = new float[arraySize]; - for (int index = 0; index < arraySize; index++) { - floatValues[index] = ReadWriteIOUtils.readFloat(buffer); - } - values[i] = floatValues; - break; - case DOUBLE: - double[] doubleValues = new double[arraySize]; - for (int index = 0; index < arraySize; index++) { - doubleValues[index] = ReadWriteIOUtils.readDouble(buffer); - } - values[i] = doubleValues; - break; - case TEXT: - Binary[] binaryValues = new Binary[arraySize]; - for (int index = 0; index < arraySize; index++) { - boolean isNull = (ReadWriteIOUtils.readInt(buffer) == 1); - if (!isNull) { - binaryValues[index] = ReadWriteIOUtils.readBinary(buffer); + boolean isValueColumnsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + + if (isValueColumnsNotNull) { + switch (types[i]) { + case BOOLEAN: + boolean[] boolValues = new boolean[rowSize]; + for (int index = 0; index < rowSize; index++) { + boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); } - } - values[i] = binaryValues; - break; - default: - throw new UnSupportedDataTypeException( - String.format("data type %s is not supported when convert data at client", types[i])); + values[i] = boolValues; + break; + case INT32: + int[] intValues = new int[rowSize]; + for (int index = 0; index < rowSize; index++) { + intValues[index] = ReadWriteIOUtils.readInt(byteBuffer); + } + values[i] = intValues; + break; + case INT64: + long[] longValues = new long[rowSize]; + for (int index = 0; index < rowSize; index++) { + longValues[index] = ReadWriteIOUtils.readLong(byteBuffer); + } + values[i] = longValues; + break; + case FLOAT: + float[] floatValues = new float[rowSize]; + for (int index = 0; index < rowSize; index++) { + floatValues[index] = ReadWriteIOUtils.readFloat(byteBuffer); + } + values[i] = floatValues; + break; + case DOUBLE: + double[] doubleValues = new double[rowSize]; + for (int index = 0; index < rowSize; index++) { + doubleValues[index] = ReadWriteIOUtils.readDouble(byteBuffer); + } + values[i] = doubleValues; + break; + case TEXT: + Binary[] binaryValues = new Binary[rowSize]; + for (int index = 0; index < rowSize; index++) { + boolean isNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isNotNull) { + binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer); + } + } + values[i] = binaryValues; + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "data type %s is not supported when convert data at client", types[i])); + } } } return values; @@ -584,58 +605,121 @@ public class Tablet { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (o == null || !getClass().equals(o.getClass())) { return false; } Tablet that = (Tablet) o; boolean flag = that.rowSize == rowSize - && Arrays.equals(that.timestamps, timestamps) - && Arrays.equals(that.bitMaps, bitMaps) + && Objects.equals(that.deviceId, deviceId) && Objects.equals(that.schemas, schemas) - && Objects.equals(that.measurementIndex, measurementIndex) - && Objects.equals(that.deviceId, deviceId); + && Objects.equals(that.measurementIndex, measurementIndex); if (!flag) { return false; } + // assert timestamps and bitmaps + int columns = (schemas == null ? 0 : schemas.size()); + if (!isTimestampsEqual(this.timestamps, that.timestamps, rowSize) + || !isBitMapsEqual(this.bitMaps, that.bitMaps, columns)) { + return false; + } + // assert values Object[] thatValues = that.values; + if (thatValues == values) { + return true; + } + if (thatValues == null || values == null) { + return false; + } if (thatValues.length != values.length) { return false; } for (int i = 0, n = values.length; i < n; i++) { + if (thatValues[i] == values[i]) { + continue; + } + if (thatValues[i] == null || values[i] == null) { + return false; + } + if (!thatValues[i].getClass().equals(values[i].getClass())) { + return false; + } + switch (schemas.get(i).getType()) { case INT32: - if (!Arrays.equals((int[]) thatValues[i], (int[]) values[i])) { + int[] thisIntValues = (int[]) values[i]; + int[] thatIntValues = (int[]) thatValues[i]; + if (thisIntValues.length < rowSize || thatIntValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (thisIntValues[j] != thatIntValues[j]) { + return false; + } + } break; case INT64: - if (!Arrays.equals((long[]) thatValues[i], (long[]) values[i])) { + long[] thisLongValues = (long[]) values[i]; + long[] thatLongValues = (long[]) thatValues[i]; + if (thisLongValues.length < rowSize || thatLongValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (thisLongValues[j] != thatLongValues[j]) { + return false; + } + } break; case FLOAT: - if (!Arrays.equals((float[]) thatValues[i], (float[]) values[i])) { + float[] thisFloatValues = (float[]) values[i]; + float[] thatFloatValues = (float[]) thatValues[i]; + if (thisFloatValues.length < rowSize || thatFloatValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (thisFloatValues[j] != thatFloatValues[j]) { + return false; + } + } break; case DOUBLE: - if (!Arrays.equals((double[]) thatValues[i], (double[]) values[i])) { + double[] thisDoubleValues = (double[]) values[i]; + double[] thatDoubleValues = (double[]) thatValues[i]; + if (thisDoubleValues.length < rowSize || thatDoubleValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (thisDoubleValues[j] != thatDoubleValues[j]) { + return false; + } + } break; case BOOLEAN: - if (!Arrays.equals((boolean[]) thatValues[i], (boolean[]) values[i])) { + boolean[] thisBooleanValues = (boolean[]) values[i]; + boolean[] thatBooleanValues = (boolean[]) thatValues[i]; + if (thisBooleanValues.length < rowSize || thatBooleanValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (thisBooleanValues[j] != thatBooleanValues[j]) { + return false; + } + } break; case TEXT: - if (!Arrays.equals((Binary[]) thatValues[i], (Binary[]) values[i])) { + Binary[] thisBinaryValues = (Binary[]) values[i]; + Binary[] thatBinaryValues = (Binary[]) thatValues[i]; + if (thisBinaryValues.length < rowSize || thatBinaryValues.length < rowSize) { return false; } + for (int j = 0; j < rowSize; j++) { + if (!thisBinaryValues[j].equals(thatBinaryValues[j])) { + return false; + } + } break; default: throw new UnSupportedDataTypeException( @@ -645,4 +729,36 @@ public class Tablet { return true; } + + private boolean isTimestampsEqual(long[] thisTimestamps, long[] thatTimestamps, int rowSize) { + if (thisTimestamps == thatTimestamps) { + return true; + } + if (thisTimestamps == null || thatTimestamps == null) { + return false; + } + + for (int i = 0; i < rowSize; i++) { + if (thisTimestamps[i] != thatTimestamps[i]) { + return false; + } + } + return true; + } + + private boolean isBitMapsEqual(BitMap[] thisBitMaps, BitMap[] thatBitMaps, int columns) { + if (thisBitMaps == thatBitMaps) { + return true; + } + if (thisBitMaps == null || thatBitMaps == null) { + return false; + } + + for (int i = 0; i < columns; i++) { + if (!thisBitMaps[i].equals(thatBitMaps[i])) { + return false; + } + } + return true; + } } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java index 34ad5a5a7d9e57576989706f2f454d48a08ec3cd..f91172501f75998ef2d70debf8317eb1ffc47516 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -61,6 +62,43 @@ public class TabletTest { values, new BitMap[] {new BitMap(1024), new BitMap(1024)}, rowSize); + try { + ByteBuffer byteBuffer = tablet.serialize(); + Tablet newTablet = Tablet.deserialize(byteBuffer); + assertEquals(newTablet, tablet); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testSerializationAndDeSerializationWithMoreData() { + String deviceId = "root.sg"; + List measurementSchemas = new ArrayList<>(); + measurementSchemas.add(new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s3", TSDataType.DOUBLE, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s4", TSDataType.BOOLEAN, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)); + + int rowSize = 1000; + Tablet tablet = new Tablet(deviceId, measurementSchemas); + tablet.rowSize = rowSize; + tablet.initBitMaps(); + for (int i = 0; i < rowSize; i++) { + tablet.addTimestamp(i, i); + tablet.addValue(measurementSchemas.get(0).getMeasurementId(), i, i); + tablet.addValue(measurementSchemas.get(1).getMeasurementId(), i, (long) i); + tablet.addValue(measurementSchemas.get(2).getMeasurementId(), i, (float) i); + tablet.addValue(measurementSchemas.get(3).getMeasurementId(), i, (double) i); + tablet.addValue(measurementSchemas.get(4).getMeasurementId(), i, (i % 2) == 0); + tablet.addValue(measurementSchemas.get(5).getMeasurementId(), i, String.valueOf(i)); + + tablet.bitMaps[i % measurementSchemas.size()].mark(i); + } + try { ByteBuffer byteBuffer = tablet.serialize(); Tablet newTablet = Tablet.deserialize(byteBuffer);