diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h index 61ae5082f31cd9129a3cec1eaa1e0552ada7993b..1038af5abb1d00b14b1c54d2f96522647b71178b 100644 --- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h +++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h @@ -239,6 +239,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp (JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong); +/* + * Class: com_taosdata_jdbc_TSDBJNIConnector + * Method: insertLinesImp + * Signature: ([Ljava/lang/String;JII)I + */ +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp + (JNIEnv *, jobject, jobjectArray, jlong, jint, jint); + #ifdef __cplusplus } #endif diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index ef7387f9760b7d710f77a97b52fcafe9686bd335..5127aaf665b8059a12ef0985140c2a01ea328bfa 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -988,7 +988,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI return JNI_SUCCESS; } -JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, jobjectArray lines, jlong conn, jint protocol, jint precision) { TAOS *taos = (TAOS *)conn; @@ -1019,9 +1019,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(J tfree(c_lines); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, taos, tstrerror(code), taos_errstr(result)); - + taos_free_result((void *)result); return JNI_TDENGINE_ERROR; } + taos_free_result((void *)result); - return (jlong)result; + return JNI_SUCCESS; } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractStatementWrapper.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractStatementWrapper.java new file mode 100644 index 0000000000000000000000000000000000000000..0b46226d1113b82d9333204427eaad074d3572cb --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/AbstractStatementWrapper.java @@ -0,0 +1,51 @@ +package com.taosdata.jdbc; + +import java.sql.*; + +public class AbstractStatementWrapper extends AbstractStatement{ + protected Statement statement; + + public AbstractStatementWrapper(Statement statement) { + this.statement = statement; + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + return statement.executeQuery(sql); + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return statement.executeUpdate(sql); + } + + @Override + public void close() throws SQLException { + statement.close(); + } + + @Override + public boolean execute(String sql) throws SQLException { + return statement.execute(sql); + } + + @Override + public ResultSet getResultSet() throws SQLException { + return statement.getResultSet(); + } + + @Override + public int getUpdateCount() throws SQLException { + return statement.getUpdateCount(); + } + + @Override + public Connection getConnection() throws SQLException { + return statement.getConnection(); + } + + @Override + public boolean isClosed() throws SQLException { + return statement.isClosed(); + } +} diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/SchemalessStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/SchemalessStatement.java new file mode 100644 index 0000000000000000000000000000000000000000..f90fa43fa26288943b5fa6c500ace6b92feb8429 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/SchemalessStatement.java @@ -0,0 +1,31 @@ +package com.taosdata.jdbc; + +import com.taosdata.jdbc.enums.SchemalessProtocolType; +import com.taosdata.jdbc.enums.SchemalessTimestampType; +import com.taosdata.jdbc.rs.RestfulConnection; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +public class SchemalessStatement extends AbstractStatementWrapper { + public SchemalessStatement(Statement statement) { + super(statement); + } + + public void executeSchemaless(String[] strings, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException { + Connection connection = this.getConnection(); + if (connection instanceof TSDBConnection) { + TSDBConnection tsdbConnection = (TSDBConnection) connection; + tsdbConnection.getConnector().insertLines(strings, protocolType, timestampType); + } else if (connection instanceof RestfulConnection) { + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD, "restful connection is not supported currently"); + } else { + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "unknown connection:" + connection.getMetaData().getURL()); + } + } + + public void executeSchemaless(String sql, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException { + executeSchemaless(new String[]{sql}, protocolType, timestampType); + } +} diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index aaada2e78ec284f4019b29465a38db109cf9d80a..a5c7f26a266f81e3a7915503d2983efe077765c2 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -17,6 +17,8 @@ package com.taosdata.jdbc; import com.alibaba.fastjson.JSONObject; +import com.taosdata.jdbc.enums.SchemalessProtocolType; +import com.taosdata.jdbc.enums.SchemalessTimestampType; import com.taosdata.jdbc.utils.TaosInfo; import java.nio.ByteBuffer; @@ -359,14 +361,14 @@ public class TSDBJNIConnector { private native int closeStmt(long stmt, long con); - public void insertLines(String[] lines) throws SQLException { - int code = insertLinesImp(lines, this.taos); + public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException { + int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal()); if (code != TSDBConstants.JNI_SUCCESS) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines"); } } - private native int insertLinesImp(String[] lines, long conn); + private native int insertLinesImp(String[] lines, long conn, int type, int precision); } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessProtocolType.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessProtocolType.java new file mode 100644 index 0000000000000000000000000000000000000000..d5bd1bde5eb3d73ebd419652ca1fbbe3485d95c5 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessProtocolType.java @@ -0,0 +1,10 @@ +package com.taosdata.jdbc.enums; + +public enum SchemalessProtocolType { + UNKNOWN, + LINE, + TELNET, + JSON, + ; + +} diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessTimestampType.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessTimestampType.java new file mode 100644 index 0000000000000000000000000000000000000000..159714262e5386e00e74bb6154a20165a735c174 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/enums/SchemalessTimestampType.java @@ -0,0 +1,13 @@ +package com.taosdata.jdbc.enums; + +public enum SchemalessTimestampType { + + NOT_CONFIGURED, + HOURS, + MINUTES, + SECONDS, + MILLI_SECONDS, + MICRO_SECONDS, + NANO_SECONDS, + ; +} diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SchemalessInsertTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SchemalessInsertTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c706704f67e75ce61f6f96def26c6895e8805a7a --- /dev/null +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SchemalessInsertTest.java @@ -0,0 +1,145 @@ +package com.taosdata.jdbc; + +import com.taosdata.jdbc.enums.SchemalessProtocolType; +import com.taosdata.jdbc.enums.SchemalessTimestampType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; + +public class SchemalessInsertTest { + private String host = "127.0.0.1"; + private String dbname = "test_schemaless_insert"; + private Connection conn; + + @Test + public void schemalessInsert() throws SQLException { + // given + String[] lines = new String[]{ + "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"}; + // when + try (Statement statement = conn.createStatement(); + SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) { + schemalessStatement.executeSchemaless(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); + } + + // then + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("show tables"); + Assert.assertNotNull(rs); + ResultSetMetaData metaData = rs.getMetaData(); + Assert.assertTrue(metaData.getColumnCount() > 0); + int rowCnt = 0; + while (rs.next()) { + rowCnt++; + } + Assert.assertEquals(lines.length, rowCnt); + rs.close(); + statement.close(); + } + + @Test + public void telnetInsert() throws SQLException { + // given + String[] lines = new String[]{ + "stb0_0 1626006833 4 host=host0 interface=eth0", + "stb0_1 1626006833 4 host=host0 interface=eth0", + "stb0_2 1626006833 4 host=host0 interface=eth0 id=\"special_name\"", + }; + + // when + try (Statement statement = conn.createStatement(); + SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) { + schemalessStatement.executeSchemaless(lines, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED); + } + + // then + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("show tables"); + Assert.assertNotNull(rs); + ResultSetMetaData metaData = rs.getMetaData(); + Assert.assertTrue(metaData.getColumnCount() > 0); + int rowCnt = 0; + while (rs.next()) { + rowCnt++; + } + Assert.assertEquals(lines.length, rowCnt); + rs.close(); + statement.close(); + } + + @Test + public void jsonInsert() throws SQLException { + // given + String json = "[\n" + + " {\n" + + " \"metric\": \"cpu_load_1\",\n" + + " \"timestamp\": 1626006833,\n" + + " \"value\": 55.5,\n" + + " \"tags\": {\n" + + " \"host\": \"ubuntu\",\n" + + " \"interface\": \"eth1\",\n" + + " \"Id\": \"tb1\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"metric\": \"cpu_load_2\",\n" + + " \"timestamp\": 1626006834,\n" + + " \"value\": 55.5,\n" + + " \"tags\": {\n" + + " \"host\": \"ubuntu\",\n" + + " \"interface\": \"eth2\",\n" + + " \"Id\": \"tb2\"\n" + + " }\n" + + " }\n" + + "]"; + + // when + try (Statement statement = conn.createStatement(); + SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) { + schemalessStatement.executeSchemaless(json, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED); + } + + // then + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery("show tables"); + Assert.assertNotNull(rs); + ResultSetMetaData metaData = rs.getMetaData(); + Assert.assertTrue(metaData.getColumnCount() > 0); + int rowCnt = 0; + while (rs.next()) { + rowCnt++; + } +// Assert.assertEquals(json.length, rowCnt); + rs.close(); + statement.close(); + } + + @Before + public void before() { + final String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata"; + try { + conn = DriverManager.getConnection(url); + Statement stmt = conn.createStatement(); + stmt.execute("drop database if exists " + dbname); + stmt.execute("create database if not exists " + dbname + " precision 'ns'"); + stmt.execute("use " + dbname); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + @After + public void after() { + try (Statement stmt = conn.createStatement()) { + stmt.execute("drop database if exists " + dbname); + stmt.close(); + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } +} diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java index 8be6ae6b1c566abcd7ec398e7df3f5308e29e1b1..f44d647595e99ae00a355ca25f702cf2e0c1cc36 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java @@ -1,5 +1,7 @@ package com.taosdata.jdbc; +import com.taosdata.jdbc.enums.SchemalessProtocolType; +import com.taosdata.jdbc.enums.SchemalessTimestampType; import org.junit.Test; import java.lang.management.ManagementFactory; @@ -115,9 +117,10 @@ public class TSDBJNIConnectorTest { } // close statement connector.executeQuery("use d"); - String[] lines = new String[]{"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", - "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"}; - connector.insertLines(lines); + String[] lines = new String[]{ + "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000", + "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"}; + connector.insertLines(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); // close connection connector.closeConnection();