diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 038b62a65c78cf7cc30f8fe84d8239526106e58a..a819e3a19f2e113426c058c5c2d5aff5bf576ba8 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; @@ -247,6 +248,29 @@ public class SessionExample { Tablet tablet = new Tablet("root.sg1.d1", schemaList, 100); + //Method 1 to add tablet data + long timestamp = System.currentTimeMillis(); + + for (long row = 0; row < 100; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + long value = new Random().nextLong(); + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet, true); + tablet.reset(); + } + timestamp++; + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + + //Method 2 to add tablet data long[] timestamps = tablet.timestamps; Object[] values = tablet.values; @@ -286,6 +310,38 @@ public class SessionExample { tabletMap.put("root.sg1.d2", tablet2); tabletMap.put("root.sg1.d3", tablet3); + //Method 1 to add tablet data + long timestamp = System.currentTimeMillis(); + for (long row = 0; row < 100; row++) { + int row1 = tablet1.rowSize++; + int row2 = tablet2.rowSize++; + int row3 = tablet3.rowSize++; + tablet1.addTimestamp(row1, timestamp); + tablet2.addTimestamp(row2, timestamp); + tablet3.addTimestamp(row3, timestamp); + for (int i = 0; i < 3; i++) { + long value = new Random().nextLong(); + tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value); + tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value); + tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value); + } + if (tablet1.rowSize == tablet1.getMaxRowNumber()) { + session.insertTablets(tabletMap, true); + tablet1.reset(); + tablet2.reset(); + tablet3.reset(); + } + timestamp++; + } + + if (tablet1.rowSize != 0) { + session.insertTablets(tabletMap, true); + tablet1.reset(); + tablet2.reset(); + tablet3.reset(); + } + + //Method 2 to add tablet data long[] timestamps1 = tablet1.timestamps; Object[] values1 = tablet1.values; long[] timestamps2 = tablet2.timestamps; diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java index 5986089cb3d69dab6afac71868f925a5a12edcdc..89fc3a3839328c22b0f1a9c6063c4c26769cf615 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java @@ -611,7 +611,7 @@ public class IoTDBSessionIT { schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE)); - Tablet tablet = new Tablet(deviceId, schemaList, 100); + Tablet tablet = new Tablet(deviceId, schemaList, 200); long[] timestamps = tablet.timestamps; Object[] values = tablet.values; for (int time = 1; time <= 100; time++) { @@ -623,6 +623,17 @@ public class IoTDBSessionIT { tablet.rowSize++; } + for (int time = 101; time <= 200; time++) { + int rowIndex = time - 1; + tablet.addTimestamp(rowIndex, time); + long value = 0; + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + value++; + } + tablet.rowSize++; + } + session.insertTablet(tablet); IoTDBDescriptor.getInstance().getConfig().setEnableWal(isEnableWAL); session.close(); @@ -813,6 +824,25 @@ public class IoTDBSessionIT { Tablet tablet = new Tablet(deviceId, schemaList, 100); + for (long time = 0; time < 100; time++) { + int rowIndex = tablet.rowSize++; + long value = 0; + tablet.addTimestamp(rowIndex, time); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + value++; + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + long[] timestamps = tablet.timestamps; Object[] values = tablet.values; @@ -892,7 +922,7 @@ public class IoTDBSessionIT { } Assert.assertEquals("root.sg1.d1,\'11\',0,\'11\',", sb.toString()); } - Assert.assertEquals(1000, count); + Assert.assertEquals(2000, count); sessionDataSet.closeOperationHandle(); } @@ -911,7 +941,7 @@ public class IoTDBSessionIT { } Assert.assertEquals("root.sg1.d1,'11',0,'11',null,0,null,", sb.toString()); } - Assert.assertEquals(1000, count); + Assert.assertEquals(2000, count); sessionDataSet.closeOperationHandle(); } @@ -1088,6 +1118,25 @@ public class IoTDBSessionIT { Tablet tablet = new Tablet(deviceId, schemaList, 256); + for (long time = 1000; time < 2000; time++) { + int rowIndex = tablet.rowSize++; + long value = 0; + tablet.addTimestamp(rowIndex, time); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + value++; + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + long[] timestamps = tablet.timestamps; Object[] values = tablet.values; @@ -1120,6 +1169,25 @@ public class IoTDBSessionIT { Tablet tablet = new Tablet(deviceId, schemaList, 200); + for (long time = 500; time < 1500; time++) { + int rowIndex = tablet.rowSize++; + long value = 0; + tablet.addTimestamp(rowIndex, time); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + value++; + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + long[] timestamps = tablet.timestamps; Object[] values = tablet.values; @@ -1157,6 +1225,25 @@ public class IoTDBSessionIT { Tablet tablet = new Tablet(deviceId, schemaList, 1000); + for (long time = begin; time < count + begin; time++) { + int rowIndex = tablet.rowSize++; + long value = 0; + tablet.addTimestamp(rowIndex, time); + for (int i = 0; i < 6; i++) { + tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, value); + value++; + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + long[] timestamps = tablet.timestamps; Object[] values = tablet.values; @@ -1177,7 +1264,6 @@ public class IoTDBSessionIT { session.insertTablet(tablet); tablet.reset(); } - } private void queryForBatch() throws ClassNotFoundException, SQLException { @@ -1204,7 +1290,7 @@ public class IoTDBSessionIT { } Assert.assertEquals(standard, resultStr.toString()); // d1 and d2 will align - Assert.assertEquals(7000, count); + Assert.assertEquals(14000, count); } } @@ -1272,7 +1358,7 @@ public class IoTDBSessionIT { } Assert.assertEquals(standard, resultStr.toString()); // d1 and d2 will align - Assert.assertEquals(10500, count); + Assert.assertEquals(14000, count); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java index d159f632c2097432b8e23218f48a7aa7a52fd09a..05f37bd1f38130bc4dc3a3b209c51ca033a9e614 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java @@ -18,7 +18,9 @@ */ package org.apache.iotdb.tsfile.write.record; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -52,6 +54,11 @@ public class Tablet { */ private List schemas; + /** + * measurementId->indexOf(measurementSchema) + */ + private Map measurementIndex; + /** * timestamps in this tablet */ @@ -95,12 +102,62 @@ public class Tablet { this.deviceId = deviceId; this.schemas = schemas; this.maxRowNumber = maxRowNumber; + measurementIndex = new HashMap<>(); + + for (int i = 0; i < schemas.size(); i++) { + measurementIndex.put(schemas.get(i).getMeasurementId(), i); + } createColumns(); reset(); } + public void addTimestamp(int rowIndex, long timestamp) { + timestamps[rowIndex] = timestamp; + } + + public void addValue(String measurementId, int rowIndex, Object value) { + int indexOfValue = measurementIndex.get(measurementId); + MeasurementSchema measurementSchema = schemas.get(indexOfValue); + + switch (measurementSchema.getType()) { + case TEXT: { + Binary[] sensor = (Binary[]) values[indexOfValue]; + sensor[rowIndex] = (Binary) value; + break; + } + case FLOAT: { + float[] sensor = (float[]) values[indexOfValue]; + sensor[rowIndex] = (float) value; + break; + } + case INT32: { + int[] sensor = (int[]) values[indexOfValue]; + sensor[rowIndex] = (int) value; + break; + } + case INT64: { + long[] sensor = (long[]) values[indexOfValue]; + sensor[rowIndex] = (long) value; + break; + } + case DOUBLE: { + double[] sensor = (double[]) values[indexOfValue]; + sensor[rowIndex] = (double) value; + break; + } + case BOOLEAN: { + boolean[] sensor = (boolean[]) values[indexOfValue]; + sensor[rowIndex] = (boolean) value; + break; + } + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", measurementSchema.getType())); + } + } + public List getSchemas() { return schemas; }