未验证 提交 d09ecf61 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-1063]<feature>(connector): bind multi tables in one batch (#9784)

* change jdbc version

* stmt_add_batch and stmt_errstr in jni

* change

* [TS-1063]<feature>(connector): bind multi tables in one batch for parameter-binding

* [TS-1063]<feature>(connector): fix null pointer exception

* [TS-1063]<feature>(connector): init and prepared stmt until setTableName
上级 d455c95d
......@@ -46,7 +46,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.36-dist.jar DESTINATION connector/jdbc)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.37-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
......@@ -209,6 +209,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp
(JNIEnv *, jobject, jlong, jstring, jlong);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp
......@@ -217,6 +226,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: stmt_add_batch
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
......@@ -231,13 +248,12 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
* Method: stmt_errstr
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......
......@@ -805,6 +805,78 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char *curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList,
jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
......@@ -872,8 +944,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -886,19 +958,18 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
return JNI_SQL_NULL;
}
taos_stmt_add_batch(pStmt);
int32_t code = taos_stmt_execute(pStmt);
int32_t code = taos_stmt_add_batch(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -911,91 +982,63 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
return JNI_SQL_NULL;
}
int32_t code = taos_stmt_close(pStmt);
int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
int32_t code = taos_stmt_close(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
char errMsg[128];
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
sprintf(errMsg, "jobj:%p, connection already closed", jobj);
return (*env)->NewStringUTF(env, errMsg);
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char *curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
sprintf(errMsg, "jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return (*env)->NewStringUTF(env, errMsg);
}
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
return (*env)->NewStringUTF(env, taos_stmt_errstr((TAOS_STMT *)stmt));
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
jobjectArray lines, jlong conn,
jint protocol, jint precision) {
jobjectArray lines, jlong conn,
jint protocol, jint precision) {
TAOS *taos = (TAOS *)conn;
if (taos == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -1013,8 +1056,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JN
c_lines[i] = (char *)(*env)->GetStringUTFChars(env, line, 0);
}
SSqlObj* result = (SSqlObj*)taos_schemaless_insert(taos, c_lines, numLines, protocol, precision);
int code = taos_errno(result);
SSqlObj *result = (SSqlObj *)taos_schemaless_insert(taos, c_lines, numLines, protocol, precision);
int code = taos_errno(result);
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i));
......
......@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.36-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.37-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
......@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.36</version>
<version>2.0.37</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
......
......@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.36</version>
<version>2.0.37</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......
......@@ -63,7 +63,6 @@ public class TSDBConnection extends AbstractConnection {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
}
return new TSDBPreparedStatement(this, sql);
}
......@@ -71,7 +70,6 @@ public class TSDBConnection extends AbstractConnection {
if (isClosed) {
return;
}
this.connector.closeConnection();
this.isClosed = true;
}
......
......@@ -28,6 +28,8 @@ public class TSDBJNIConnector {
System.loadLibrary("taos");
}
/***********************************************************************/
//NOTE: JDBC
public static void init(Properties props) throws SQLWarning {
synchronized (LOCK) {
if (!isInitialized) {
......@@ -242,6 +244,9 @@ public class TSDBJNIConnector {
private native int closeConnectionImp(long connection);
/*****************************************************************************************/
// NOTE: subscribe
/**
* Create a subscription
*/
......@@ -269,6 +274,8 @@ public class TSDBJNIConnector {
private native void unsubscribeImp(long subscription, boolean isKeep);
/******************************************************************************************************/
// NOTE: parameter binding
public long prepareStmt(String sql) throws SQLException {
long stmt = prepareStmtImp(sql.getBytes(), this.taos);
......@@ -293,16 +300,19 @@ public class TSDBJNIConnector {
public void setBindTableName(long stmt, String tableName) throws SQLException {
int code = setBindTableNameImp(stmt, tableName, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to set table name");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to set table name, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags,
ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind table name and corresponding tags, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
......@@ -311,7 +321,8 @@ public class TSDBJNIConnector {
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind column data, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
......@@ -320,10 +331,20 @@ public class TSDBJNIConnector {
public void executeBatch(long stmt) throws SQLException {
int code = executeBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind");
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to execute batch bind, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
public void addBatch(long stmt) throws SQLException {
int code = addBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, stmtErrorMsgImp(stmt, this.taos));
}
}
private native int addBatchImp(long stmt, long con);
private native int executeBatchImp(long stmt, long con);
public void closeBatch(long stmt) throws SQLException {
......@@ -335,6 +356,10 @@ public class TSDBJNIConnector {
private native int closeStmt(long stmt, long con);
private native String stmtErrorMsgImp(long stmt, long con);
/*************************************************************************************************/
// NOTE: schemaless-lines
public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal());
if (code != TSDBConstants.JNI_SUCCESS) {
......
......@@ -40,25 +40,27 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private String rawSql;
private Object[] parameters;
// for parameter binding
private long nativeStmtHandle = 0;
private long nativeStmtHandle;
private String tableName;
private ArrayList<TableTagInfo> tableTags;
private int tagValueLength;
private ArrayList<ColumnInfo> colData;
TSDBPreparedStatement(TSDBConnection connection, String sql) {
TSDBPreparedStatement(TSDBConnection connection, String sql) throws SQLException {
super(connection);
init(sql);
int parameterCnt = 0;
if (sql.contains("?")) {
for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) {
parameterCnt++;
}
if (!sql.contains("?"))
return;
for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) {
parameterCnt++;
}
}
parameters = new Object[parameterCnt];
// for parameter-binding
// TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
// this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.
......@@ -530,8 +532,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
public void setTableName(String name) throws SQLException {
if (this.nativeStmtHandle == 0) {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
}
if (this.tableName != null) {
this.columnDataExecuteBatch();
this.columnDataAddBatch();
this.columnDataClearBatchInternal();
}
this.tableName = name;
......@@ -693,7 +701,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (rawSql == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet");
}
// table name is not set yet, abort
if (this.tableName == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet");
......@@ -703,24 +710,25 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (numOfCols == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
if (nativeStmtHandle == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "stmt is null");
}
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (this.tableTags == null) {
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} else {
int num = this.tableTags.size();
int tagSize = this.tableTags.size();
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer typeList = ByteBuffer.allocate(num);
ByteBuffer typeList = ByteBuffer.allocate(tagSize);
typeList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES);
ByteBuffer lengthList = ByteBuffer.allocate(tagSize * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES);
ByteBuffer isNullList = ByteBuffer.allocate(tagSize * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
for (TableTagInfo tag : this.tableTags) {
......@@ -744,54 +752,43 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
Boolean val = (Boolean) tag.value;
tagDataList.put((byte) (val ? 1 : 0));
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
Short val = (Short) tag.value;
tagDataList.putShort(val);
lengthList.putLong(Short.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
Long val = (Long) tag.value;
tagDataList.putLong(val == null ? 0 : val);
lengthList.putLong(Long.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
Float val = (Float) tag.value;
tagDataList.putFloat(val == null ? 0 : val);
lengthList.putLong(Float.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
Double val = (Double) tag.value;
tagDataList.putDouble(val == null ? 0 : val);
lengthList.putLong(Double.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_JSON:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset();
String val = (String) tag.value;
byte[] b = null;
byte[] b;
try {
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes();
......@@ -801,12 +798,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage());
}
tagDataList.put(b);
lengthList.putLong(b.length);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
......@@ -814,13 +809,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
}
}
typeList.put((byte) tag.type);
isNullList.putInt(tag.isNull ? 1 : 0);
}
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList,
typeList, lengthList, isNullList);
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(),
tagDataList, typeList, lengthList, isNullList);
}
ColumnInfo colInfo = this.colData.get(0);
......@@ -834,7 +828,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (col1 == null || !col1.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
}
if (rows != col1.data.size()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
}
......@@ -951,7 +944,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
......@@ -962,6 +954,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i);
}
connector.addBatch(this.nativeStmtHandle);
this.columnDataClearBatchInternal();
}
public void columnDataExecuteBatch() throws SQLException {
......@@ -976,13 +970,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
private void columnDataClearBatchInternal() {
int size = this.colData.size();
this.colData.clear();
this.colData.addAll(Collections.nCopies(size, null));
this.tableName = null; // clear the table name
this.tableName = null;
if (this.tableTags != null)
this.tableTags.clear();
tagValueLength = 0;
if (this.colData != null)
this.colData.clear();
}
public void columnDataCloseBatch() throws SQLException {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
connector.closeBatch(this.nativeStmtHandle);
......
package com.taosdata.jdbc;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;
import java.sql.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -21,13 +20,17 @@ public class ParameterBindTest {
private final Random random = new Random(System.currentTimeMillis());
@Test
public void test() {
public void one_batch_multi_table() throws SQLException {
// given
String[] tbnames = {"t1", "t2", "t3"};
int rows = 10;
// when
insertIntoTables(tbnames, 10);
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
insertIntoTables(pstmt, tbnames, current, 10);
}
// then
assertRows(stable, tbnames.length * rows);
......@@ -37,13 +40,48 @@ public class ParameterBindTest {
}
@Test
public void testMultiThreads() {
public void multi_batch_multi_table() throws SQLException {
// given
int rows = 10;
int batchSize = 10;
String[] tbnames = {"t1", "t2", "t3"};
// when
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
for (int i = 0; i < batchSize; i++) {
insertIntoTables(pstmt, tbnames, current + 1000 * i * rows, rows);
}
}
// then
assertRows(stable, tbnames.length * batchSize * rows);
for (String t : tbnames) {
assertRows(t, rows * batchSize);
}
}
@Test
public void multiThreads() {
// given
String[][] tables = {{"t1", "t2", "t3"}, {"t4", "t5", "t6"}, {"t7", "t8", "t9"}, {"t10"}};
int rows = 10;
// when
List<Thread> threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> insertIntoTables(tbnames, rows))).collect(Collectors.toList());
List<Thread> threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> {
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long current = System.currentTimeMillis();
insertIntoTables(pstmt, tbnames, current, 10);
} catch (SQLException throwables) {
throwables.printStackTrace();
}
})).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
......@@ -59,9 +97,26 @@ public class ParameterBindTest {
assertRows(t, rows);
}
}
}
@Ignore
@Test
public void testOOM() throws SQLException {
String[] tbnames = {"t1", "t2", "t3", "t4", "t5", "t6", "t7", "t8", "t9", "t10"};
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
int rows = 1000;
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
long ts = Instant.now().minus(5 * 365, ChronoUnit.DAYS).getEpochSecond() * 1000;
while (true) {
insertIntoTables(pstmt, tbnames, ts, rows);
ts += 1000 * rows;
}
}
}
private void assertRows(String tbname, int rows) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select count(*) from " + tbname);
......@@ -74,40 +129,36 @@ public class ParameterBindTest {
}
}
private void insertIntoTables(String[] tbnames, int rowsEachTable) {
long current = System.currentTimeMillis();
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 0; i < tbnames.length; i++) {
pstmt.setTableName(tbnames[i]);
pstmt.setTagInt(0, random.nextInt(100));
pstmt.setTagInt(1, random.nextInt(100));
ArrayList<Long> timestampList = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
timestampList.add(current + i * 1000 + j);
}
pstmt.setTimestamp(0, timestampList);
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f1List.add(random.nextInt(100));
}
pstmt.setInt(1, f1List);
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f2List.add(random.nextInt(100));
}
pstmt.setInt(2, f2List);
pstmt.columnDataAddBatch();
private void insertIntoTables(TSDBPreparedStatement pstmt, String[] tbnames, long ts_start, int rowsEachTable) throws SQLException {
for (int i = 0; i < tbnames.length; i++) {
// set table name
pstmt.setTableName(tbnames[i]);
// set tags
pstmt.setTagInt(0, random.nextInt(100));
pstmt.setTagInt(1, random.nextInt(100));
// set column: ts
ArrayList<Long> timestampList = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
timestampList.add(ts_start + j * 1000L);
}
pstmt.columnDataExecuteBatch();
} catch (SQLException e) {
e.printStackTrace();
pstmt.setTimestamp(0, timestampList);
// set column: f1
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f1List.add(random.nextInt(100));
}
pstmt.setInt(1, f1List);
// set column: f2
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f2List.add(random.nextInt(100));
}
pstmt.setInt(2, f2List);
// add batch
pstmt.columnDataAddBatch();
}
// execute batch
pstmt.columnDataExecuteBatch();
}
@Before
......
......@@ -4,38 +4,25 @@ import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.junit.Test;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TSDBJNIConnectorTest {
private static final String host = "127.0.0.1";
private static TSDBResultSetRowData rowData;
@Test
public void test() throws SQLException {
try {
//change sleepSeconds when debugging with attach to process to find PID
int sleepSeconds = -1;
if (sleepSeconds > 0) {
RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();
String jvmName = runtimeBean.getName();
long pid = Long.valueOf(jvmName.split("@")[0]);
System.out.println("JVM PID = " + pid);
Thread.sleep(sleepSeconds * 1000);
}
} catch (Exception e) {
e.printStackTrace();
}
// init
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR, "/etc/taos");
......@@ -43,7 +30,7 @@ public class TSDBJNIConnectorTest {
// connect
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect("127.0.0.1", 6030, null, "root", "taosdata");
connector.connect(host, 6030, null, "root", "taosdata");
// setup
String setupSqlStrs[] = {"create database if not exists d precision \"us\"",
......@@ -141,4 +128,128 @@ public class TSDBJNIConnectorTest {
} else return code != TSDBConstants.JNI_FETCH_END;
}
@Test
public void param_bind_one_batch_multi_table() throws SQLException {
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect(host, 6030, null, "root", "taosdata");
connector.executeQuery("drop database if exists test");
connector.executeQuery("create database if not exists test");
connector.executeQuery("use test");
connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)");
// 1. init + prepare
long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)");
for (int i = 0; i < 10; i++) {
// 2. set_tbname_tags
stmt_set_table_tags(connector, stmt, "t" + i);
// 3. bind_single_param_batch
// bind timestamp
long ts = System.currentTimeMillis();
bind_col_timestamp(connector, stmt, ts, 100);
// bind int
bind_col_integer(connector, stmt, 100);
// 4. add_batch
connector.addBatch(stmt);
}
connector.executeBatch(stmt);
connector.closeBatch(stmt);
connector.executeQuery("drop database if exists test");
connector.closeConnection();
}
@Test
public void param_bind_multi_batch_multi_table() throws SQLException {
TSDBJNIConnector connector = new TSDBJNIConnector();
connector.connect(host, 6030, null, "root", "taosdata");
connector.executeQuery("drop database if exists test");
connector.executeQuery("create database if not exists test");
connector.executeQuery("use test");
connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)");
// 1. init + prepare
long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)");
long ts = System.currentTimeMillis();
for (int ind_batch = 0; ind_batch < 10; ind_batch++) {
ts += ind_batch * 1000 * 1000;
System.out.println("batch: " + ind_batch + ", ts: " + ts);
for (int i = 0; i < 10; i++) {
// 2. set_tbname_tags
stmt_set_table_tags(connector, stmt, "t" + i);
// 3. bind_single_param_batch
// bind timestamp
bind_col_timestamp(connector, stmt, ts, 100);
// bind int
bind_col_integer(connector, stmt, 100);
// 4. add_batch
connector.addBatch(stmt);
}
connector.executeBatch(stmt);
}
connector.closeBatch(stmt);
connector.executeQuery("drop database if exists test");
connector.closeConnection();
}
private void bind_col_timestamp(TSDBJNIConnector connector, long stmt, long ts_start, int numOfRows) throws SQLException {
ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Long.BYTES);
colDataList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> colDataList.putLong(ts_start + ind * 1000L));
ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES));
ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0));
connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP, Long.BYTES, numOfRows, 0);
}
private void bind_col_integer(TSDBJNIConnector connector, long stmt, int numOfRows) throws SQLException {
ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
colDataList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> colDataList.putInt(new Random().nextInt(100)));
ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES));
ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0));
connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_INT, Integer.BYTES, numOfRows, 1);
}
private void stmt_set_table_tags(TSDBJNIConnector connector, long stmt, String tbname) throws SQLException {
ByteBuffer tagDataList = ByteBuffer.allocate(Integer.BYTES);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
tagDataList.putInt(new Random().nextInt(100));
ByteBuffer typeList = ByteBuffer.allocate(1);
typeList.order(ByteOrder.LITTLE_ENDIAN);
typeList.put((byte) TSDBConstants.TSDB_DATA_TYPE_INT);
ByteBuffer lengthList = ByteBuffer.allocate(1 * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
lengthList.putLong(Integer.BYTES);
ByteBuffer isNullList = ByteBuffer.allocate(1 * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
isNullList.putInt(0);
connector.setBindTableNameAndTags(stmt, tbname, 1, tagDataList, typeList, lengthList, isNullList);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册