未验证 提交 8b3b6af5 编写于 作者: H huolibo 提交者: GitHub

[TD-10751]<feature>: jdbc support schemaless insert (#8511)

* [TD-10751]<feature>:jdbc support schemaless input

* add schemaless test case
上级 abcf7842
...@@ -239,6 +239,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv ...@@ -239,6 +239,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong); (JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: insertLinesImp
* Signature: ([Ljava/lang/String;JII)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp
(JNIEnv *, jobject, jobjectArray, jlong, jint, jint);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -988,7 +988,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI ...@@ -988,7 +988,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
jobjectArray lines, jlong conn, jobjectArray lines, jlong conn,
jint protocol, jint precision) { jint protocol, jint precision) {
TAOS *taos = (TAOS *)conn; TAOS *taos = (TAOS *)conn;
...@@ -1019,9 +1019,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(J ...@@ -1019,9 +1019,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(J
tfree(c_lines); tfree(c_lines);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, taos, tstrerror(code), taos_errstr(result)); jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, taos, tstrerror(code), taos_errstr(result));
taos_free_result((void *)result);
return JNI_TDENGINE_ERROR; return JNI_TDENGINE_ERROR;
} }
taos_free_result((void *)result);
return (jlong)result; return JNI_SUCCESS;
} }
package com.taosdata.jdbc;
import java.sql.*;
public class AbstractStatementWrapper extends AbstractStatement{
protected Statement statement;
public AbstractStatementWrapper(Statement statement) {
this.statement = statement;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return statement.executeQuery(sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return statement.executeUpdate(sql);
}
@Override
public void close() throws SQLException {
statement.close();
}
@Override
public boolean execute(String sql) throws SQLException {
return statement.execute(sql);
}
@Override
public ResultSet getResultSet() throws SQLException {
return statement.getResultSet();
}
@Override
public int getUpdateCount() throws SQLException {
return statement.getUpdateCount();
}
@Override
public Connection getConnection() throws SQLException {
return statement.getConnection();
}
@Override
public boolean isClosed() throws SQLException {
return statement.isClosed();
}
}
package com.taosdata.jdbc;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import com.taosdata.jdbc.rs.RestfulConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class SchemalessStatement extends AbstractStatementWrapper {
public SchemalessStatement(Statement statement) {
super(statement);
}
public void executeSchemaless(String[] strings, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
Connection connection = this.getConnection();
if (connection instanceof TSDBConnection) {
TSDBConnection tsdbConnection = (TSDBConnection) connection;
tsdbConnection.getConnector().insertLines(strings, protocolType, timestampType);
} else if (connection instanceof RestfulConnection) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD, "restful connection is not supported currently");
} else {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "unknown connection:" + connection.getMetaData().getURL());
}
}
public void executeSchemaless(String sql, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
executeSchemaless(new String[]{sql}, protocolType, timestampType);
}
}
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package com.taosdata.jdbc; package com.taosdata.jdbc;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import com.taosdata.jdbc.utils.TaosInfo; import com.taosdata.jdbc.utils.TaosInfo;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
...@@ -359,14 +361,14 @@ public class TSDBJNIConnector { ...@@ -359,14 +361,14 @@ public class TSDBJNIConnector {
private native int closeStmt(long stmt, long con); private native int closeStmt(long stmt, long con);
public void insertLines(String[] lines) throws SQLException { public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
int code = insertLinesImp(lines, this.taos); int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal());
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines");
} }
} }
private native int insertLinesImp(String[] lines, long conn); private native int insertLinesImp(String[] lines, long conn, int type, int precision);
} }
package com.taosdata.jdbc.enums;
public enum SchemalessProtocolType {
UNKNOWN,
LINE,
TELNET,
JSON,
;
}
package com.taosdata.jdbc.enums;
public enum SchemalessTimestampType {
NOT_CONFIGURED,
HOURS,
MINUTES,
SECONDS,
MILLI_SECONDS,
MICRO_SECONDS,
NANO_SECONDS,
;
}
package com.taosdata.jdbc;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
public class SchemalessInsertTest {
private String host = "127.0.0.1";
private String dbname = "test_schemaless_insert";
private Connection conn;
@Test
public void schemalessInsert() throws SQLException {
// given
String[] lines = new String[]{
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"};
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.executeSchemaless(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
}
// then
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show tables");
Assert.assertNotNull(rs);
ResultSetMetaData metaData = rs.getMetaData();
Assert.assertTrue(metaData.getColumnCount() > 0);
int rowCnt = 0;
while (rs.next()) {
rowCnt++;
}
Assert.assertEquals(lines.length, rowCnt);
rs.close();
statement.close();
}
@Test
public void telnetInsert() throws SQLException {
// given
String[] lines = new String[]{
"stb0_0 1626006833 4 host=host0 interface=eth0",
"stb0_1 1626006833 4 host=host0 interface=eth0",
"stb0_2 1626006833 4 host=host0 interface=eth0 id=\"special_name\"",
};
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.executeSchemaless(lines, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED);
}
// then
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show tables");
Assert.assertNotNull(rs);
ResultSetMetaData metaData = rs.getMetaData();
Assert.assertTrue(metaData.getColumnCount() > 0);
int rowCnt = 0;
while (rs.next()) {
rowCnt++;
}
Assert.assertEquals(lines.length, rowCnt);
rs.close();
statement.close();
}
@Test
public void jsonInsert() throws SQLException {
// given
String json = "[\n" +
" {\n" +
" \"metric\": \"cpu_load_1\",\n" +
" \"timestamp\": 1626006833,\n" +
" \"value\": 55.5,\n" +
" \"tags\": {\n" +
" \"host\": \"ubuntu\",\n" +
" \"interface\": \"eth1\",\n" +
" \"Id\": \"tb1\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"metric\": \"cpu_load_2\",\n" +
" \"timestamp\": 1626006834,\n" +
" \"value\": 55.5,\n" +
" \"tags\": {\n" +
" \"host\": \"ubuntu\",\n" +
" \"interface\": \"eth2\",\n" +
" \"Id\": \"tb2\"\n" +
" }\n" +
" }\n" +
"]";
// when
try (Statement statement = conn.createStatement();
SchemalessStatement schemalessStatement = new SchemalessStatement(statement)) {
schemalessStatement.executeSchemaless(json, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
// then
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show tables");
Assert.assertNotNull(rs);
ResultSetMetaData metaData = rs.getMetaData();
Assert.assertTrue(metaData.getColumnCount() > 0);
int rowCnt = 0;
while (rs.next()) {
rowCnt++;
}
// Assert.assertEquals(json.length, rowCnt);
rs.close();
statement.close();
}
@Before
public void before() {
final String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try {
conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + dbname);
stmt.execute("create database if not exists " + dbname + " precision 'ns'");
stmt.execute("use " + dbname);
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists " + dbname);
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package com.taosdata.jdbc; package com.taosdata.jdbc;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.junit.Test; import org.junit.Test;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
...@@ -115,9 +117,10 @@ public class TSDBJNIConnectorTest { ...@@ -115,9 +117,10 @@ public class TSDBJNIConnectorTest {
} }
// close statement // close statement
connector.executeQuery("use d"); connector.executeQuery("use d");
String[] lines = new String[]{"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", String[] lines = new String[]{
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"}; "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
connector.insertLines(lines); "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000"};
connector.insertLines(lines, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
// close connection // close connection
connector.closeConnection(); connector.closeConnection();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册