提交 15a4d199 编写于 作者: Y yschengzi 提交者: Steve Yurong Su

[IOTDB-5966] Pipe: Revision of Tablet serialize and deserialize (#10045)

Co-authored-by: NSteve Yurong Su <rong@apache.org>
(cherry picked from commit 562048d4)
上级 56255be4
......@@ -31,7 +31,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -349,11 +348,12 @@ public class Tablet {
/** serialize Tablet */
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
serialize(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(deviceId, stream);
......@@ -366,6 +366,8 @@ public class Tablet {
/** Serialize measurement schemas */
private void writeMeasurementSchemas(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream);
if (schemas != null) {
ReadWriteIOUtils.write(schemas.size(), stream);
for (MeasurementSchema schema : schemas) {
if (schema == null) {
......@@ -376,25 +378,29 @@ public class Tablet {
}
}
}
}
private void writeTimes(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(timestamps.length, stream);
for (long time : timestamps) {
ReadWriteIOUtils.write(time, stream);
ReadWriteIOUtils.write(BytesUtils.boolToByte(timestamps != null), stream);
if (timestamps != null) {
for (int i = 0; i < rowSize; i++) {
ReadWriteIOUtils.write(timestamps[i], stream);
}
}
}
/** Serialize bitmaps */
private void writeBitMaps(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(bitMaps != null ? 1 : 0, stream);
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
if (bitMaps != null) {
for (BitMap bitMap : bitMaps) {
if (bitMap == null) {
ReadWriteIOUtils.write(0, stream);
int size = (schemas == null ? 0 : schemas.size());
for (int i = 0; i < size; i++) {
if (bitMaps[i] == null) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
} else {
ReadWriteIOUtils.write(1, stream);
ReadWriteIOUtils.write(bitMap.getSize(), stream);
ReadWriteIOUtils.write(new Binary(bitMap.getByteArray()), stream);
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
ReadWriteIOUtils.write(bitMaps[i].getSize(), stream);
ReadWriteIOUtils.write(new Binary(bitMaps[i].getByteArray()), stream);
}
}
}
......@@ -402,56 +408,56 @@ public class Tablet {
/** Serialize values */
private void writeValues(DataOutputStream stream) throws IOException {
for (int i = 0; i < values.length; i++) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(values != null), stream);
if (values != null) {
int size = (schemas == null ? 0 : schemas.size());
for (int i = 0; i < size; i++) {
serializeColumn(schemas.get(i).getType(), values[i], stream);
}
}
}
private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream)
throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(column != null), stream);
if (column != null) {
switch (dataType) {
case INT32:
int[] intValues = (int[]) column;
ReadWriteIOUtils.write(intValues.length, stream);
for (int j = 0; j < intValues.length; j++) {
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(intValues[j], stream);
}
break;
case INT64:
long[] longValues = (long[]) column;
ReadWriteIOUtils.write(longValues.length, stream);
for (int j = 0; j < longValues.length; j++) {
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(longValues[j], stream);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
ReadWriteIOUtils.write(floatValues.length, stream);
for (int j = 0; j < floatValues.length; j++) {
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(floatValues[j], stream);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
ReadWriteIOUtils.write(doubleValues.length, stream);
for (int j = 0; j < doubleValues.length; j++) {
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(doubleValues[j], stream);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
ReadWriteIOUtils.write(boolValues.length, stream);
for (int j = 0; j < boolValues.length; j++) {
ReadWriteIOUtils.write(boolValues[j] ? 1 : 0, stream);
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
}
break;
case TEXT:
Binary[] binaryValues = (Binary[]) column;
ReadWriteIOUtils.write(binaryValues.length, stream);
for (int j = 0; j < binaryValues.length; j++) {
boolean isNull = (binaryValues[j] == null);
ReadWriteIOUtils.write(isNull ? 1 : 0, stream);
if (!isNull) {
for (int j = 0; j < rowSize; j++) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(binaryValues[j] != null), stream);
if (binaryValues[j] != null) {
ReadWriteIOUtils.write(binaryValues[j], stream);
}
}
......@@ -461,6 +467,7 @@ public class Tablet {
String.format("Data type %s is not supported.", dataType));
}
}
}
/** Deserialize Tablet */
public static Tablet deserialize(ByteBuffer byteBuffer) {
......@@ -468,33 +475,43 @@ public class Tablet {
int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
// deserialize schemas
int schemaSize = ReadWriteIOUtils.readInt(byteBuffer);
int schemaSize = 0;
List<MeasurementSchema> schemas = new ArrayList<>();
boolean isSchemasNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isSchemasNotNull) {
schemaSize = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < schemaSize; i++) {
boolean hasSchema = BytesUtils.byteToBool(byteBuffer.get());
boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (hasSchema) {
schemas.add(MeasurementSchema.deserializeFrom(byteBuffer));
}
}
}
// deserialize times
int timesSize = ReadWriteIOUtils.readInt(byteBuffer);
long[] times = new long[timesSize];
for (int i = 0; i < timesSize; i++) {
long[] times = new long[rowSize];
boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isTimesNotNull) {
for (int i = 0; i < rowSize; i++) {
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
}
}
// deserialize bitmaps
boolean hasBitMaps = (ReadWriteIOUtils.readInt(byteBuffer) == 1);
BitMap[] bitMaps = null;
if (hasBitMaps) {
BitMap[] bitMaps = new BitMap[schemaSize];
boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isBitMapsNotNull) {
bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize);
}
// deserialize values
TSDataType[] dataTypes =
schemas.stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new);
Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes, schemaSize);
Object[] values = new Object[schemaSize];
boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isValuesNotNull) {
values = readTabletValuesFromBuffer(byteBuffer, dataTypes, schemaSize, rowSize);
}
Tablet tablet = new Tablet(deviceId, schemas, times, values, bitMaps, rowSize);
tablet.constructMeasurementIndexMap();
......@@ -502,78 +519,82 @@ public class Tablet {
}
/** deserialize bitmaps */
public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns) {
public static BitMap[] readBitMapsFromBuffer(ByteBuffer byteBuffer, int columns) {
BitMap[] bitMaps = new BitMap[columns];
for (int i = 0; i < columns; i++) {
boolean hasBitMap = (ReadWriteIOUtils.readInt(buffer) == 1);
boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (hasBitMap) {
int bitMapSize = ReadWriteIOUtils.readInt(buffer);
byte[] bytes = ReadWriteIOUtils.readBinary(buffer).getValues();
bitMaps[i] = new BitMap(bitMapSize, bytes);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
bitMaps[i] = new BitMap(size, valueBinary.getValues());
}
}
return bitMaps;
}
/**
* @param buffer data values
* @param byteBuffer data values
* @param columns column number
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static Object[] readTabletValuesFromBuffer(
ByteBuffer buffer, TSDataType[] types, int columns) {
ByteBuffer byteBuffer, TSDataType[] types, int columns, int rowSize) {
Object[] values = new Object[columns];
for (int i = 0; i < columns; i++) {
int arraySize = ReadWriteIOUtils.readInt(buffer);
boolean isValueColumnsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isValueColumnsNotNull) {
switch (types[i]) {
case BOOLEAN:
boolean[] boolValues = new boolean[arraySize];
for (int index = 0; index < arraySize; index++) {
boolValues[index] = ReadWriteIOUtils.readInt(buffer) == 1;
boolean[] boolValues = new boolean[rowSize];
for (int index = 0; index < rowSize; index++) {
boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
}
values[i] = boolValues;
break;
case INT32:
int[] intValues = new int[arraySize];
for (int index = 0; index < arraySize; index++) {
intValues[index] = ReadWriteIOUtils.readInt(buffer);
int[] intValues = new int[rowSize];
for (int index = 0; index < rowSize; index++) {
intValues[index] = ReadWriteIOUtils.readInt(byteBuffer);
}
values[i] = intValues;
break;
case INT64:
long[] longValues = new long[arraySize];
for (int index = 0; index < arraySize; index++) {
longValues[index] = ReadWriteIOUtils.readLong(buffer);
long[] longValues = new long[rowSize];
for (int index = 0; index < rowSize; index++) {
longValues[index] = ReadWriteIOUtils.readLong(byteBuffer);
}
values[i] = longValues;
break;
case FLOAT:
float[] floatValues = new float[arraySize];
for (int index = 0; index < arraySize; index++) {
floatValues[index] = ReadWriteIOUtils.readFloat(buffer);
float[] floatValues = new float[rowSize];
for (int index = 0; index < rowSize; index++) {
floatValues[index] = ReadWriteIOUtils.readFloat(byteBuffer);
}
values[i] = floatValues;
break;
case DOUBLE:
double[] doubleValues = new double[arraySize];
for (int index = 0; index < arraySize; index++) {
doubleValues[index] = ReadWriteIOUtils.readDouble(buffer);
double[] doubleValues = new double[rowSize];
for (int index = 0; index < rowSize; index++) {
doubleValues[index] = ReadWriteIOUtils.readDouble(byteBuffer);
}
values[i] = doubleValues;
break;
case TEXT:
Binary[] binaryValues = new Binary[arraySize];
for (int index = 0; index < arraySize; index++) {
boolean isNull = (ReadWriteIOUtils.readInt(buffer) == 1);
if (!isNull) {
binaryValues[index] = ReadWriteIOUtils.readBinary(buffer);
Binary[] binaryValues = new Binary[rowSize];
for (int index = 0; index < rowSize; index++) {
boolean isNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isNotNull) {
binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
}
}
values[i] = binaryValues;
break;
default:
throw new UnSupportedDataTypeException(
String.format("data type %s is not supported when convert data at client", types[i]));
String.format(
"data type %s is not supported when convert data at client", types[i]));
}
}
}
return values;
......@@ -584,58 +605,121 @@ public class Tablet {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || !getClass().equals(o.getClass())) {
return false;
}
Tablet that = (Tablet) o;
boolean flag =
that.rowSize == rowSize
&& Arrays.equals(that.timestamps, timestamps)
&& Arrays.equals(that.bitMaps, bitMaps)
&& Objects.equals(that.deviceId, deviceId)
&& Objects.equals(that.schemas, schemas)
&& Objects.equals(that.measurementIndex, measurementIndex)
&& Objects.equals(that.deviceId, deviceId);
&& Objects.equals(that.measurementIndex, measurementIndex);
if (!flag) {
return false;
}
// assert timestamps and bitmaps
int columns = (schemas == null ? 0 : schemas.size());
if (!isTimestampsEqual(this.timestamps, that.timestamps, rowSize)
|| !isBitMapsEqual(this.bitMaps, that.bitMaps, columns)) {
return false;
}
// assert values
Object[] thatValues = that.values;
if (thatValues == values) {
return true;
}
if (thatValues == null || values == null) {
return false;
}
if (thatValues.length != values.length) {
return false;
}
for (int i = 0, n = values.length; i < n; i++) {
if (thatValues[i] == values[i]) {
continue;
}
if (thatValues[i] == null || values[i] == null) {
return false;
}
if (!thatValues[i].getClass().equals(values[i].getClass())) {
return false;
}
switch (schemas.get(i).getType()) {
case INT32:
if (!Arrays.equals((int[]) thatValues[i], (int[]) values[i])) {
int[] thisIntValues = (int[]) values[i];
int[] thatIntValues = (int[]) thatValues[i];
if (thisIntValues.length < rowSize || thatIntValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (thisIntValues[j] != thatIntValues[j]) {
return false;
}
}
break;
case INT64:
if (!Arrays.equals((long[]) thatValues[i], (long[]) values[i])) {
long[] thisLongValues = (long[]) values[i];
long[] thatLongValues = (long[]) thatValues[i];
if (thisLongValues.length < rowSize || thatLongValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (thisLongValues[j] != thatLongValues[j]) {
return false;
}
}
break;
case FLOAT:
if (!Arrays.equals((float[]) thatValues[i], (float[]) values[i])) {
float[] thisFloatValues = (float[]) values[i];
float[] thatFloatValues = (float[]) thatValues[i];
if (thisFloatValues.length < rowSize || thatFloatValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (thisFloatValues[j] != thatFloatValues[j]) {
return false;
}
}
break;
case DOUBLE:
if (!Arrays.equals((double[]) thatValues[i], (double[]) values[i])) {
double[] thisDoubleValues = (double[]) values[i];
double[] thatDoubleValues = (double[]) thatValues[i];
if (thisDoubleValues.length < rowSize || thatDoubleValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (thisDoubleValues[j] != thatDoubleValues[j]) {
return false;
}
}
break;
case BOOLEAN:
if (!Arrays.equals((boolean[]) thatValues[i], (boolean[]) values[i])) {
boolean[] thisBooleanValues = (boolean[]) values[i];
boolean[] thatBooleanValues = (boolean[]) thatValues[i];
if (thisBooleanValues.length < rowSize || thatBooleanValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (thisBooleanValues[j] != thatBooleanValues[j]) {
return false;
}
}
break;
case TEXT:
if (!Arrays.equals((Binary[]) thatValues[i], (Binary[]) values[i])) {
Binary[] thisBinaryValues = (Binary[]) values[i];
Binary[] thatBinaryValues = (Binary[]) thatValues[i];
if (thisBinaryValues.length < rowSize || thatBinaryValues.length < rowSize) {
return false;
}
for (int j = 0; j < rowSize; j++) {
if (!thisBinaryValues[j].equals(thatBinaryValues[j])) {
return false;
}
}
break;
default:
throw new UnSupportedDataTypeException(
......@@ -645,4 +729,36 @@ public class Tablet {
return true;
}
private boolean isTimestampsEqual(long[] thisTimestamps, long[] thatTimestamps, int rowSize) {
if (thisTimestamps == thatTimestamps) {
return true;
}
if (thisTimestamps == null || thatTimestamps == null) {
return false;
}
for (int i = 0; i < rowSize; i++) {
if (thisTimestamps[i] != thatTimestamps[i]) {
return false;
}
}
return true;
}
private boolean isBitMapsEqual(BitMap[] thisBitMaps, BitMap[] thatBitMaps, int columns) {
if (thisBitMaps == thatBitMaps) {
return true;
}
if (thisBitMaps == null || thatBitMaps == null) {
return false;
}
for (int i = 0; i < columns; i++) {
if (!thisBitMaps[i].equals(thatBitMaps[i])) {
return false;
}
}
return true;
}
}
......@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -61,6 +62,43 @@ public class TabletTest {
values,
new BitMap[] {new BitMap(1024), new BitMap(1024)},
rowSize);
try {
ByteBuffer byteBuffer = tablet.serialize();
Tablet newTablet = Tablet.deserialize(byteBuffer);
assertEquals(newTablet, tablet);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
public void testSerializationAndDeSerializationWithMoreData() {
String deviceId = "root.sg";
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
measurementSchemas.add(new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.DOUBLE, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s4", TSDataType.BOOLEAN, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN));
int rowSize = 1000;
Tablet tablet = new Tablet(deviceId, measurementSchemas);
tablet.rowSize = rowSize;
tablet.initBitMaps();
for (int i = 0; i < rowSize; i++) {
tablet.addTimestamp(i, i);
tablet.addValue(measurementSchemas.get(0).getMeasurementId(), i, i);
tablet.addValue(measurementSchemas.get(1).getMeasurementId(), i, (long) i);
tablet.addValue(measurementSchemas.get(2).getMeasurementId(), i, (float) i);
tablet.addValue(measurementSchemas.get(3).getMeasurementId(), i, (double) i);
tablet.addValue(measurementSchemas.get(4).getMeasurementId(), i, (i % 2) == 0);
tablet.addValue(measurementSchemas.get(5).getMeasurementId(), i, String.valueOf(i));
tablet.bitMaps[i % measurementSchemas.size()].mark(i);
}
try {
ByteBuffer byteBuffer = tablet.serialize();
Tablet newTablet = Tablet.deserialize(byteBuffer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册