From b3b7417e3b46017e0a87b4e683db6092c12dc3b1 Mon Sep 17 00:00:00 2001 From: qiaojialin <646274302@qq.com> Date: Mon, 2 Sep 2019 16:20:05 +0800 Subject: [PATCH] add insertRow interface --- .../UserGuide/7-Session API/1-Session API.md | 46 +++++++++++++----- .../UserGuide/7-Session API/1-Session API.md | 47 ++++++++++++++----- .../java/org/apache/iotdb/SessionExample.java | 31 ++++++++++-- .../iotdb/db/service/TSServiceImpl.java | 21 +++++++++ service-rpc/src/main/thrift/rpc.thrift | 9 ++++ .../org/apache/iotdb/session/Session.java | 16 +++++++ 6 files changed, 144 insertions(+), 26 deletions(-) diff --git a/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md b/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md index 6e733351b2..4e9f0eef9d 100644 --- a/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md +++ b/docs/Documentation-CHN/UserGuide/7-Session API/1-Session API.md @@ -49,10 +49,11 @@ In root directory: ```Java -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.session.IoTDBSessionException; import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.RowBatch; @@ -61,15 +62,39 @@ import org.apache.iotdb.tsfile.write.schema.Schema; public class SessionExample { - public static void main(String[] args) throws ClassNotFoundException, IoTDBSessionException { - Session session = new Session("127.0.0.1", 6667, "root", "root"); + private static Session session; + + public static void main(String[] args) throws IoTDBSessionException { + session = new Session("127.0.0.1", 6667, "root", "root"); session.open(); - + session.setStorageGroup("root.sg1"); - session.createTimeseriesResp("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE); - session.createTimeseriesResp("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE); - session.createTimeseriesResp("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE); + session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + + insert(); +// insertRowBatch(); + + session.close(); + } + private static void insert() throws IoTDBSessionException { + String deviceId = "root.sg1.d1"; + List measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + for (long time = 0; time < 30000; time++) { + List values = new ArrayList<>(); + values.add("1"); + values.add("2"); + values.add("3"); + session.insert(deviceId, time, measurements, values); + } + } + + private static void insertRowBatch() throws IoTDBSessionException { Schema schema = new Schema(); schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); @@ -85,7 +110,7 @@ public class SessionExample { timestamps[row] = time; for (int i = 0; i < 3; i++) { long[] sensor = (long[]) values[i]; - sensor[row] = time; + sensor[row] = i; } if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) { session.insertBatch(rowBatch); @@ -97,7 +122,6 @@ public class SessionExample { session.insertBatch(rowBatch); rowBatch.reset(); } - session.close(); } } ``` diff --git a/docs/Documentation/UserGuide/7-Session API/1-Session API.md b/docs/Documentation/UserGuide/7-Session API/1-Session API.md index 04e35d040a..6312a5c17d 100644 --- a/docs/Documentation/UserGuide/7-Session API/1-Session API.md +++ b/docs/Documentation/UserGuide/7-Session API/1-Session API.md @@ -58,10 +58,11 @@ This chapter provides an example of how to open an IoTDB session, execute a batc Requires that you include the packages containing the Client classes needed for database programming. ```Java -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.session.IoTDBSessionException; import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.RowBatch; @@ -70,15 +71,39 @@ import org.apache.iotdb.tsfile.write.schema.Schema; public class SessionExample { - public static void main(String[] args) throws ClassNotFoundException, IoTDBSessionException { - Session session = new Session("127.0.0.1", 6667, "root", "root"); + private static Session session; + + public static void main(String[] args) throws IoTDBSessionException { + session = new Session("127.0.0.1", 6667, "root", "root"); session.open(); - + session.setStorageGroup("root.sg1"); - session.createTimeseriesResp("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE); - session.createTimeseriesResp("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE); - session.createTimeseriesResp("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE); + session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + + insert(); +// insertRowBatch(); + + session.close(); + } + private static void insert() throws IoTDBSessionException { + String deviceId = "root.sg1.d1"; + List measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + for (long time = 0; time < 30000; time++) { + List values = new ArrayList<>(); + values.add("1"); + values.add("2"); + values.add("3"); + session.insert(deviceId, time, measurements, values); + } + } + + private static void insertRowBatch() throws IoTDBSessionException { Schema schema = new Schema(); schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); @@ -94,7 +119,7 @@ public class SessionExample { timestamps[row] = time; for (int i = 0; i < 3; i++) { long[] sensor = (long[]) values[i]; - sensor[row] = time; + sensor[row] = i; } if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) { session.insertBatch(rowBatch); @@ -106,8 +131,6 @@ public class SessionExample { session.insertBatch(rowBatch); rowBatch.reset(); } - - session.close(); } } ``` diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d6bc905f5e..1b22ebba0b 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb; +import java.util.ArrayList; +import java.util.List; import org.apache.iotdb.session.IoTDBSessionException; import org.apache.iotdb.session.Session; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -29,8 +31,10 @@ import org.apache.iotdb.tsfile.write.schema.Schema; public class SessionExample { + private static Session session; + public static void main(String[] args) throws IoTDBSessionException { - Session session = new Session("127.0.0.1", 6667, "root", "root"); + session = new Session("127.0.0.1", 6667, "root", "root"); session.open(); session.setStorageGroup("root.sg1"); @@ -38,6 +42,28 @@ public class SessionExample { session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + insert(); +// insertRowBatch(); + + session.close(); + } + + private static void insert() throws IoTDBSessionException { + String deviceId = "root.sg1.d1"; + List measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + for (long time = 0; time < 30000; time++) { + List values = new ArrayList<>(); + values.add("1"); + values.add("2"); + values.add("3"); + session.insert(deviceId, time, measurements, values); + } + } + + private static void insertRowBatch() throws IoTDBSessionException { Schema schema = new Schema(); schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE)); @@ -53,7 +79,7 @@ public class SessionExample { timestamps[row] = time; for (int i = 0; i < 3; i++) { long[] sensor = (long[]) values[i]; - sensor[row] = time; + sensor[row] = i; } if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) { session.insertBatch(rowBatch); @@ -65,6 +91,5 @@ public class SessionExample { session.insertBatch(rowBatch); rowBatch.reset(); } - session.close(); } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 493331cd41..15dc8b1feb 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -930,6 +930,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSExecuteStatementResp insert(TSInsertionReq req) { + // TODO need to refactor this when implementing PreparedStatement if (!checkLogin()) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); return getTSExecuteStatementResp(getStatus(TSStatusType.NOT_LOGIN_ERROR)); @@ -960,6 +961,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } + @Override + public TSRPCResp insertRow(TSInsertReq req) throws TException { + if (!checkLogin()) { + logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); + return new TSRPCResp(getStatus(TSStatusType.NOT_LOGIN_ERROR)); + } + + InsertPlan plan = new InsertPlan(); + plan.setDeviceId(req.getDeviceId()); + plan.setTime(req.getTimestamp()); + plan.setMeasurements(req.getMeasurements().toArray(new String[0])); + plan.setValues(req.getValues().toArray(new String[0])); + + TS_Status status = checkAuthority(plan); + if (status != null) { + return new TSRPCResp(status); + } + return new TSRPCResp(executePlan(plan)); + } + @Override public TSExecuteBatchStatementResp insertBatch(TSBatchInsertionReq req) { long t1 = System.currentTimeMillis(); diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift index ec05a2757c..c4947e797e 100644 --- a/service-rpc/src/main/thrift/rpc.thrift +++ b/service-rpc/src/main/thrift/rpc.thrift @@ -236,6 +236,13 @@ struct TSBatchInsertionReq { 6: required i32 size } +struct TSInsertReq { + 1: required string deviceId + 2: required list measurements + 3: required list values + 4: required i64 timestamp +} + struct TSSetStorageGroupReq { 1: required string storageGroupId } @@ -292,5 +299,7 @@ service TSIService { TSRPCResp createTimeseries(1:TSCreateTimeseriesReq req); + TSRPCResp insertRow(1:TSInsertReq req); + i64 requestStatementId(); } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 771b2d186f..f0d6dcd4e7 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.session; +import java.util.List; import org.apache.iotdb.rpc.IoTDBRPCException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.service.rpc.thrift.*; @@ -156,6 +157,21 @@ public class Session { } } + public TSRPCResp insert(String deviceId, long time, List measurements, List values) + throws IoTDBSessionException { + TSInsertReq request = new TSInsertReq(); + request.setDeviceId(deviceId); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(values); + + try { + return client.insertRow(request); + } catch (TException e) { + throw new IoTDBSessionException(e); + } + } + public TSRPCResp setStorageGroup(String storageGroupId) throws IoTDBSessionException { TSSetStorageGroupReq request = new TSSetStorageGroupReq(); request.setStorageGroupId(storageGroupId); -- GitLab