未验证 提交 afafc790 编写于 作者: W wyh-ztf 提交者: GitHub

[IOTDB-839] Make Tablet api more friendly (#1658)

上级 55c2e208
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
......@@ -247,6 +248,29 @@ public class SessionExample {
Tablet tablet = new Tablet("root.sg1.d1", schemaList, 100);
//Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
long value = new Random().nextLong();
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
timestamp++;
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
//Method 2 to add tablet data
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
......@@ -286,6 +310,38 @@ public class SessionExample {
tabletMap.put("root.sg1.d2", tablet2);
tabletMap.put("root.sg1.d3", tablet3);
//Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
if (tablet1.rowSize != 0) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
//Method 2 to add tablet data
long[] timestamps1 = tablet1.timestamps;
Object[] values1 = tablet1.values;
long[] timestamps2 = tablet2.timestamps;
......
......@@ -611,7 +611,7 @@ public class IoTDBSessionIT {
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
Tablet tablet = new Tablet(deviceId, schemaList, 100);
Tablet tablet = new Tablet(deviceId, schemaList, 200);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
for (int time = 1; time <= 100; time++) {
......@@ -623,6 +623,17 @@ public class IoTDBSessionIT {
tablet.rowSize++;
}
for (int time = 101; time <= 200; time++) {
int rowIndex = time - 1;
tablet.addTimestamp(rowIndex, time);
long value = 0;
for (int s = 0; s < 3; s++) {
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
value++;
}
tablet.rowSize++;
}
session.insertTablet(tablet);
IoTDBDescriptor.getInstance().getConfig().setEnableWal(isEnableWAL);
session.close();
......@@ -813,6 +824,25 @@ public class IoTDBSessionIT {
Tablet tablet = new Tablet(deviceId, schemaList, 100);
for (long time = 0; time < 100; time++) {
int rowIndex = tablet.rowSize++;
long value = 0;
tablet.addTimestamp(rowIndex, time);
for (int s = 0; s < 3; s++) {
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
value++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
......@@ -892,7 +922,7 @@ public class IoTDBSessionIT {
}
Assert.assertEquals("root.sg1.d1,\'11\',0,\'11\',", sb.toString());
}
Assert.assertEquals(1000, count);
Assert.assertEquals(2000, count);
sessionDataSet.closeOperationHandle();
}
......@@ -911,7 +941,7 @@ public class IoTDBSessionIT {
}
Assert.assertEquals("root.sg1.d1,'11',0,'11',null,0,null,", sb.toString());
}
Assert.assertEquals(1000, count);
Assert.assertEquals(2000, count);
sessionDataSet.closeOperationHandle();
}
......@@ -1088,6 +1118,25 @@ public class IoTDBSessionIT {
Tablet tablet = new Tablet(deviceId, schemaList, 256);
for (long time = 1000; time < 2000; time++) {
int rowIndex = tablet.rowSize++;
long value = 0;
tablet.addTimestamp(rowIndex, time);
for (int s = 0; s < 3; s++) {
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
value++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
......@@ -1120,6 +1169,25 @@ public class IoTDBSessionIT {
Tablet tablet = new Tablet(deviceId, schemaList, 200);
for (long time = 500; time < 1500; time++) {
int rowIndex = tablet.rowSize++;
long value = 0;
tablet.addTimestamp(rowIndex, time);
for (int s = 0; s < 3; s++) {
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
value++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
......@@ -1157,6 +1225,25 @@ public class IoTDBSessionIT {
Tablet tablet = new Tablet(deviceId, schemaList, 1000);
for (long time = begin; time < count + begin; time++) {
int rowIndex = tablet.rowSize++;
long value = 0;
tablet.addTimestamp(rowIndex, time);
for (int i = 0; i < 6; i++) {
tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, value);
value++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
......@@ -1177,7 +1264,6 @@ public class IoTDBSessionIT {
session.insertTablet(tablet);
tablet.reset();
}
}
private void queryForBatch() throws ClassNotFoundException, SQLException {
......@@ -1204,7 +1290,7 @@ public class IoTDBSessionIT {
}
Assert.assertEquals(standard, resultStr.toString());
// d1 and d2 will align
Assert.assertEquals(7000, count);
Assert.assertEquals(14000, count);
}
}
......@@ -1272,7 +1358,7 @@ public class IoTDBSessionIT {
}
Assert.assertEquals(standard, resultStr.toString());
// d1 and d2 will align
Assert.assertEquals(10500, count);
Assert.assertEquals(14000, count);
}
}
......
......@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.tsfile.write.record;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
......@@ -52,6 +54,11 @@ public class Tablet {
*/
private List<MeasurementSchema> schemas;
/**
* measurementId->indexOf(measurementSchema)
*/
private Map<String, Integer> measurementIndex;
/**
* timestamps in this tablet
*/
......@@ -95,12 +102,62 @@ public class Tablet {
this.deviceId = deviceId;
this.schemas = schemas;
this.maxRowNumber = maxRowNumber;
measurementIndex = new HashMap<>();
for (int i = 0; i < schemas.size(); i++) {
measurementIndex.put(schemas.get(i).getMeasurementId(), i);
}
createColumns();
reset();
}
public void addTimestamp(int rowIndex, long timestamp) {
timestamps[rowIndex] = timestamp;
}
public void addValue(String measurementId, int rowIndex, Object value) {
int indexOfValue = measurementIndex.get(measurementId);
MeasurementSchema measurementSchema = schemas.get(indexOfValue);
switch (measurementSchema.getType()) {
case TEXT: {
Binary[] sensor = (Binary[]) values[indexOfValue];
sensor[rowIndex] = (Binary) value;
break;
}
case FLOAT: {
float[] sensor = (float[]) values[indexOfValue];
sensor[rowIndex] = (float) value;
break;
}
case INT32: {
int[] sensor = (int[]) values[indexOfValue];
sensor[rowIndex] = (int) value;
break;
}
case INT64: {
long[] sensor = (long[]) values[indexOfValue];
sensor[rowIndex] = (long) value;
break;
}
case DOUBLE: {
double[] sensor = (double[]) values[indexOfValue];
sensor[rowIndex] = (double) value;
break;
}
case BOOLEAN: {
boolean[] sensor = (boolean[]) values[indexOfValue];
sensor[rowIndex] = (boolean) value;
break;
}
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", measurementSchema.getType()));
}
}
public List<MeasurementSchema> getSchemas() {
return schemas;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册