未验证 提交 c9e280dc 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TD-13353]<fix>: fix CI failed cases of JDBC (#10123)

上级 240cbaa1
......@@ -22,7 +22,7 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.36-dist.jar DESTINATION connector/jdbc)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.37-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
......@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.36-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.37-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
......@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.36</version>
<version>2.0.37</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
......
......@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.36</version>
<version>2.0.37</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......
......@@ -63,7 +63,6 @@ public class TSDBConnection extends AbstractConnection {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
}
return new TSDBPreparedStatement(this, sql);
}
......@@ -71,7 +70,6 @@ public class TSDBConnection extends AbstractConnection {
if (isClosed) {
return;
}
this.connector.closeConnection();
this.isClosed = true;
}
......
......@@ -28,6 +28,8 @@ public class TSDBJNIConnector {
System.loadLibrary("taos");
}
/***********************************************************************/
//NOTE: JDBC
public static void init(Properties props) throws SQLWarning {
synchronized (LOCK) {
if (!isInitialized) {
......@@ -242,6 +244,9 @@ public class TSDBJNIConnector {
private native int closeConnectionImp(long connection);
/*****************************************************************************************/
// NOTE: subscribe
/**
* Create a subscription
*/
......@@ -269,6 +274,8 @@ public class TSDBJNIConnector {
private native void unsubscribeImp(long subscription, boolean isKeep);
/******************************************************************************************************/
// NOTE: parameter binding
public long prepareStmt(String sql) throws SQLException {
long stmt = prepareStmtImp(sql.getBytes(), this.taos);
......@@ -293,16 +300,19 @@ public class TSDBJNIConnector {
public void setBindTableName(long stmt, String tableName) throws SQLException {
int code = setBindTableNameImp(stmt, tableName, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to set table name");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to set table name, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags,
ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind table name and corresponding tags, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
......@@ -311,7 +321,8 @@ public class TSDBJNIConnector {
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind column data, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
......@@ -320,10 +331,20 @@ public class TSDBJNIConnector {
public void executeBatch(long stmt) throws SQLException {
int code = executeBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to execute batch bind, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
public void addBatch(long stmt) throws SQLException {
int code = addBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, stmtErrorMsgImp(stmt, this.taos));
}
}
private native int addBatchImp(long stmt, long con);
private native int executeBatchImp(long stmt, long con);
public void closeBatch(long stmt) throws SQLException {
......@@ -335,6 +356,10 @@ public class TSDBJNIConnector {
private native int closeStmt(long stmt, long con);
private native String stmtErrorMsgImp(long stmt, long con);
/*************************************************************************************************/
// NOTE: schemaless-lines
public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal());
if (code != TSDBConstants.JNI_SUCCESS) {
......
......@@ -40,25 +40,28 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private String rawSql;
private Object[] parameters;
// for parameter binding
private long nativeStmtHandle = 0;
private long nativeStmtHandle;
private String tableName;
private ArrayList<TableTagInfo> tableTags;
private int tagValueLength;
private ArrayList<ColumnInfo> colData;
TSDBPreparedStatement(TSDBConnection connection, String sql) {
TSDBPreparedStatement(TSDBConnection connection, String sql) throws SQLException {
super(connection);
init(sql);
int parameterCnt = 0;
if (sql.contains("?")) {
for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) {
parameterCnt++;
}
if (!sql.contains("?"))
return;
for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) {
parameterCnt++;
}
}
parameters = new Object[parameterCnt];
// for parameter-binding
// TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
// this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.
this.colData = new ArrayList<>();
......@@ -686,7 +689,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (rawSql == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet");
}
// table name is not set yet, abort
if (this.tableName == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet");
......@@ -696,24 +698,25 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (numOfCols == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
if (nativeStmtHandle == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "stmt is null");
}
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (this.tableTags == null) {
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} else {
int num = this.tableTags.size();
int tagSize = this.tableTags.size();
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer typeList = ByteBuffer.allocate(num);
ByteBuffer typeList = ByteBuffer.allocate(tagSize);
typeList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES);
ByteBuffer lengthList = ByteBuffer.allocate(tagSize * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES);
ByteBuffer isNullList = ByteBuffer.allocate(tagSize * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
for (TableTagInfo tag : this.tableTags) {
......@@ -737,53 +740,42 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
Boolean val = (Boolean) tag.value;
tagDataList.put((byte) (val ? 1 : 0));
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
Short val = (Short) tag.value;
tagDataList.putShort(val);
lengthList.putLong(Short.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
Long val = (Long) tag.value;
tagDataList.putLong(val == null ? 0 : val);
lengthList.putLong(Long.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
Float val = (Float) tag.value;
tagDataList.putFloat(val == null ? 0 : val);
lengthList.putLong(Float.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
Double val = (Double) tag.value;
tagDataList.putDouble(val == null ? 0 : val);
lengthList.putLong(Double.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset();
String val = (String) tag.value;
byte[] b = null;
byte[] b;
try {
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes();
......@@ -793,12 +785,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage());
}
tagDataList.put(b);
lengthList.putLong(b.length);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
......@@ -806,13 +796,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
}
}
typeList.put((byte) tag.type);
isNullList.putInt(tag.isNull ? 1 : 0);
}
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList,
typeList, lengthList, isNullList);
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(),
tagDataList, typeList, lengthList, isNullList);
}
ColumnInfo colInfo = this.colData.get(0);
......@@ -826,7 +815,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (col1 == null || !col1.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
if (rows != col1.data.size()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
}
......@@ -943,7 +931,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
......@@ -954,6 +941,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i);
}
connector.addBatch(this.nativeStmtHandle);
this.columnDataClearBatchInternal();
}
public void columnDataExecuteBatch() throws SQLException {
......@@ -971,12 +960,11 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
this.tableName = null;
if (this.tableTags != null)
this.tableTags.clear();
this.tagValueLength = 0;
tagValueLength = 0;
if (this.colData != null)
this.colData.clear();
}
public void columnDataCloseBatch() throws SQLException {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
connector.closeBatch(this.nativeStmtHandle);
......
package com.taosdata.jdbc;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;
import java.sql.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -21,13 +20,17 @@ public class ParameterBindTest {
private final Random random = new Random(System.currentTimeMillis());
@Test
public void test() {
public void one_batch_multi_table() throws SQLException {
// given
String[] tbnames = {"t1", "t2", "t3"};
int rows = 10;
// when
insertIntoTables(tbnames, 10);
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
insertIntoTables(pstmt, tbnames, current, 10);
}
// then
assertRows(stable, tbnames.length * rows);
......@@ -37,13 +40,48 @@ public class ParameterBindTest {
}
@Test
public void testMultiThreads() {
public void multi_batch_multi_table() throws SQLException {
// given
int rows = 10;
int batchSize = 10;
String[] tbnames = {"t1", "t2", "t3"};
// when
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
for (int i = 0; i < batchSize; i++) {
insertIntoTables(pstmt, tbnames, current + 1000 * i * rows, rows);
}
}
// then
assertRows(stable, tbnames.length * batchSize * rows);
for (String t : tbnames) {
assertRows(t, rows * batchSize);
}
}
@Test
public void multiThreads() {
// given
String[][] tables = {{"t1", "t2", "t3"}, {"t4", "t5", "t6"}, {"t7", "t8", "t9"}, {"t10"}};
int rows = 10;
// when
List<Thread> threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> insertIntoTables(tbnames, rows))).collect(Collectors.toList());
List<Thread> threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> {
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
insertIntoTables(pstmt, tbnames, current, 10);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
})).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
......@@ -59,9 +97,26 @@ public class ParameterBindTest {
assertRows(t, rows);
}
}
}
@Ignore
@Test
public void testOOM() throws SQLException {
String[] tbnames = {"t1", "t2", "t3", "t4", "t5", "t6", "t7", "t8", "t9", "t10"};
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
int rows = 1000;
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long ts = Instant.now().minus(5 * 365, ChronoUnit.DAYS).getEpochSecond() * 1000;
while (true) {
insertIntoTables(pstmt, tbnames, ts, rows);
ts += 1000 * rows;
}
}
}
private void assertRows(String tbname, int rows) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select count(*) from " + tbname);
......@@ -74,40 +129,36 @@ public class ParameterBindTest {
}
}
private void insertIntoTables(String[] tbnames, int rowsEachTable) {
long current = System.currentTimeMillis();
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 0; i < tbnames.length; i++) {
pstmt.setTableName(tbnames[i]);
pstmt.setTagInt(0, random.nextInt(100));
pstmt.setTagInt(1, random.nextInt(100));
ArrayList<Long> timestampList = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
timestampList.add(current + i * 1000 + j);
}
pstmt.setTimestamp(0, timestampList);
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f1List.add(random.nextInt(100));
}
pstmt.setInt(1, f1List);
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f2List.add(random.nextInt(100));
}
pstmt.setInt(2, f2List);
pstmt.columnDataAddBatch();
private void insertIntoTables(TSDBPreparedStatement pstmt, String[] tbnames, long ts_start, int rowsEachTable) throws SQLException {
for (int i = 0; i < tbnames.length; i++) {
// set table name
pstmt.setTableName(tbnames[i]);
// set tags
pstmt.setTagInt(0, random.nextInt(100));
pstmt.setTagInt(1, random.nextInt(100));
// set column: ts
ArrayList<Long> timestampList = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
timestampList.add(ts_start + j * 1000L);
}
pstmt.columnDataExecuteBatch();
} catch (SQLException e) {
e.printStackTrace();
pstmt.setTimestamp(0, timestampList);
// set column: f1
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f1List.add(random.nextInt(100));
}
pstmt.setInt(1, f1List);
// set column: f2
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f2List.add(random.nextInt(100));
}
pstmt.setInt(2, f2List);
// add batch
pstmt.columnDataAddBatch();
}
// execute batch
pstmt.columnDataExecuteBatch();
}
@Before
......
......@@ -4,38 +4,25 @@ import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.junit.Test;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TSDBJNIConnectorTest {
private static final String host = "127.0.0.1";
private static TSDBResultSetRowData rowData;
@Test
public void test() throws SQLException {
try {
//change sleepSeconds when debugging with attach to process to find PID
int sleepSeconds = -1;
if (sleepSeconds > 0) {
RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();
String jvmName = runtimeBean.getName();
long pid = Long.valueOf(jvmName.split("@")[0]);
System.out.println("JVM PID = " + pid);
Thread.sleep(sleepSeconds * 1000);
}
} catch (Exception e) {
e.printStackTrace();
}
// init
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR, "/etc/taos");
......@@ -43,7 +30,7 @@ public class TSDBJNIConnectorTest {
// connect
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect("127.0.0.1", 6030, null, "root", "taosdata");
connector.connect(host, 6030, null, "root", "taosdata");
// setup
String setupSqlStrs[] = {"create database if not exists d precision \"us\"",
......@@ -141,4 +128,128 @@ public class TSDBJNIConnectorTest {
} else return code != TSDBConstants.JNI_FETCH_END;
}
@Test
public void param_bind_one_batch_multi_table() throws SQLException {
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect(host, 6030, null, "root", "taosdata");
connector.executeQuery("drop database if exists test");
connector.executeQuery("create database if not exists test");
connector.executeQuery("use test");
connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)");
// 1. init + prepare
long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)");
for (int i = 0; i < 10; i++) {
// 2. set_tbname_tags
stmt_set_table_tags(connector, stmt, "t" + i);
// 3. bind_single_param_batch
// bind timestamp
long ts = System.currentTimeMillis();
bind_col_timestamp(connector, stmt, ts, 100);
// bind int
bind_col_integer(connector, stmt, 100);
// 4. add_batch
connector.addBatch(stmt);
}
connector.executeBatch(stmt);
connector.closeBatch(stmt);
connector.executeQuery("drop database if exists test");
connector.closeConnection();
}
@Test
public void param_bind_multi_batch_multi_table() throws SQLException {
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect(host, 6030, null, "root", "taosdata");
connector.executeQuery("drop database if exists test");
connector.executeQuery("create database if not exists test");
connector.executeQuery("use test");
connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)");
// 1. init + prepare
long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)");
long ts = System.currentTimeMillis();
for (int ind_batch = 0; ind_batch < 10; ind_batch++) {
ts += ind_batch * 1000 * 1000;
System.out.println("batch: " + ind_batch + ", ts: " + ts);
for (int i = 0; i < 10; i++) {
// 2. set_tbname_tags
stmt_set_table_tags(connector, stmt, "t" + i);
// 3. bind_single_param_batch
// bind timestamp
bind_col_timestamp(connector, stmt, ts, 100);
// bind int
bind_col_integer(connector, stmt, 100);
// 4. add_batch
connector.addBatch(stmt);
}
connector.executeBatch(stmt);
}
connector.closeBatch(stmt);
connector.executeQuery("drop database if exists test");
connector.closeConnection();
}
private void bind_col_timestamp(TSDBJNIConnector connector, long stmt, long ts_start, int numOfRows) throws SQLException {
ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Long.BYTES);
colDataList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> colDataList.putLong(ts_start + ind * 1000L));
ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES));
ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0));
connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP, Long.BYTES, numOfRows, 0);
}
private void bind_col_integer(TSDBJNIConnector connector, long stmt, int numOfRows) throws SQLException {
ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
colDataList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> colDataList.putInt(new Random().nextInt(100)));
ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES));
ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0));
connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_INT, Integer.BYTES, numOfRows, 1);
}
private void stmt_set_table_tags(TSDBJNIConnector connector, long stmt, String tbname) throws SQLException {
ByteBuffer tagDataList = ByteBuffer.allocate(Integer.BYTES);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
tagDataList.putInt(new Random().nextInt(100));
ByteBuffer typeList = ByteBuffer.allocate(1);
typeList.order(ByteOrder.LITTLE_ENDIAN);
typeList.put((byte) TSDBConstants.TSDB_DATA_TYPE_INT);
ByteBuffer lengthList = ByteBuffer.allocate(1 * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
lengthList.putLong(Integer.BYTES);
ByteBuffer isNullList = ByteBuffer.allocate(1 * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
isNullList.putInt(0);
connector.setBindTableNameAndTags(stmt, tbname, 1, tagDataList, typeList, lengthList, isNullList);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册