diff --git a/cmake/install.inc b/cmake/install.inc index 111efdae2dc3d186db16114ef238ebaddc5e5924..283d6a9c045c2a14dd18cd82d4fabb47f24466ee 100755 --- a/cmake/install.inc +++ b/cmake/install.inc @@ -46,7 +46,7 @@ ELSEIF (TD_WINDOWS) #INSTALL(TARGETS taos RUNTIME DESTINATION driver) #INSTALL(TARGETS shell RUNTIME DESTINATION .) IF (TD_MVN_INSTALLED) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.36-dist.jar DESTINATION connector/jdbc) + INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.37-dist.jar DESTINATION connector/jdbc) ENDIF () ELSEIF (TD_DARWIN) SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h index 1038af5abb1d00b14b1c54d2f96522647b71178b..4c999b710a62d1e620064af4d5647ee46d9a570e 100644 --- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h +++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h @@ -209,6 +209,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp (JNIEnv *, jobject, jlong, jstring, jlong); + +/** + * Class: com_taosdata_jdbc_TSDBJNIConnector + * Method: setTableNameTagsImp + * Signature: (JLjava/lang/String;I[B[B[B[BJ)I + */ +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp + (JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong); + /* * Class: com_taosdata_jdbc_TSDBJNIConnector * Method: bindColDataImp @@ -217,6 +226,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp (JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong); +/* + * Class: com_taosdata_jdbc_TSDBJNIConnector + * Method: stmt_add_batch + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con); + + /* * Class: com_taosdata_jdbc_TSDBJNIConnector * Method: executeBatchImp @@ -231,13 +248,12 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J */ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con); -/** +/* * Class: com_taosdata_jdbc_TSDBJNIConnector - * Method: setTableNameTagsImp - * Signature: (JLjava/lang/String;I[B[B[B[BJ)I + * Method: stmt_errstr + * Signature: (JJ)I */ -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp - (JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong); +JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con); /* * Class: com_taosdata_jdbc_TSDBJNIConnector diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 32a07b3aad20d8399620b13bf8c4fdb440a8e106..67a08fa4fac39e2497a3cc0447b73f2a93d0c4ee 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -805,6 +805,78 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI return JNI_SUCCESS; } +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp( + JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, + jbyteArray lengthList, jbyteArray nullList, jlong conn) { + TAOS *tsconn = (TAOS *)conn; + if (tsconn == NULL) { + jniError("jobj:%p, connection already closed", jobj); + return JNI_CONNECTION_NULL; + } + + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; + if (pStmt == NULL) { + jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn); + return JNI_SQL_NULL; + } + + jsize len = (*env)->GetArrayLength(env, tags); + char *tagsData = (char *)calloc(1, len); + (*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData); + if ((*env)->ExceptionCheck(env)) { + // todo handle error + } + + len = (*env)->GetArrayLength(env, lengthList); + int64_t *lengthArray = (int64_t *)calloc(1, len); + (*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray); + if ((*env)->ExceptionCheck(env)) { + } + + len = (*env)->GetArrayLength(env, typeList); + char *typeArray = (char *)calloc(1, len); + (*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray); + if ((*env)->ExceptionCheck(env)) { + } + + len = (*env)->GetArrayLength(env, nullList); + int32_t *nullArray = (int32_t *)calloc(1, len); + (*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray); + if ((*env)->ExceptionCheck(env)) { + } + + const char *name = (*env)->GetStringUTFChars(env, tableName, NULL); + char *curTags = tagsData; + + TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND)); + for (int32_t i = 0; i < numOfTags; ++i) { + tagsBind[i].buffer_type = typeArray[i]; + tagsBind[i].buffer = curTags; + tagsBind[i].is_null = &nullArray[i]; + tagsBind[i].length = (uintptr_t *)&lengthArray[i]; + + curTags += lengthArray[i]; + } + + int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind); + + int32_t nTags = (int32_t)numOfTags; + jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags); + + tfree(tagsData); + tfree(lengthArray); + tfree(typeArray); + tfree(nullArray); + tfree(tagsBind); + (*env)->ReleaseStringUTFChars(env, tableName, name); + + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code)); + return JNI_TDENGINE_ERROR; + } + return JNI_SUCCESS; +} + JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp( JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) { @@ -872,8 +944,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp( return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, - jlong con) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); @@ -886,19 +958,18 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J return JNI_SQL_NULL; } - taos_stmt_add_batch(pStmt); - int32_t code = taos_stmt_execute(pStmt); + int32_t code = taos_stmt_add_batch(pStmt); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); return JNI_TDENGINE_ERROR; } - jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon); + jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon); return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, - jlong con) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { TAOS *tscon = (TAOS *)con; if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); @@ -911,91 +982,63 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv return JNI_SQL_NULL; } - int32_t code = taos_stmt_close(pStmt); + int32_t code = taos_stmt_execute(pStmt); if (code != TSDB_CODE_SUCCESS) { jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); return JNI_TDENGINE_ERROR; } - jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon); + jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon); return JNI_SUCCESS; } -JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp( - JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, - jbyteArray lengthList, jbyteArray nullList, jlong conn) { - TAOS *tsconn = (TAOS *)conn; - if (tsconn == NULL) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { + TAOS *tscon = (TAOS *)con; + if (tscon == NULL) { jniError("jobj:%p, connection already closed", jobj); return JNI_CONNECTION_NULL; } TAOS_STMT *pStmt = (TAOS_STMT *)stmt; if (pStmt == NULL) { - jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn); + jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon); return JNI_SQL_NULL; } - jsize len = (*env)->GetArrayLength(env, tags); - char *tagsData = (char *)calloc(1, len); - (*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData); - if ((*env)->ExceptionCheck(env)) { - // todo handle error - } - - len = (*env)->GetArrayLength(env, lengthList); - int64_t *lengthArray = (int64_t *)calloc(1, len); - (*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray); - if ((*env)->ExceptionCheck(env)) { + int32_t code = taos_stmt_close(pStmt); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); + return JNI_TDENGINE_ERROR; } - len = (*env)->GetArrayLength(env, typeList); - char *typeArray = (char *)calloc(1, len); - (*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray); - if ((*env)->ExceptionCheck(env)) { - } + jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon); + return JNI_SUCCESS; +} - len = (*env)->GetArrayLength(env, nullList); - int32_t *nullArray = (int32_t *)calloc(1, len); - (*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray); - if ((*env)->ExceptionCheck(env)) { +JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt, + jlong con) { + char errMsg[128]; + TAOS *tscon = (TAOS *)con; + if (tscon == NULL) { + jniError("jobj:%p, connection already closed", jobj); + sprintf(errMsg, "jobj:%p, connection already closed", jobj); + return (*env)->NewStringUTF(env, errMsg); } - const char *name = (*env)->GetStringUTFChars(env, tableName, NULL); - char *curTags = tagsData; - - TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND)); - for (int32_t i = 0; i < numOfTags; ++i) { - tagsBind[i].buffer_type = typeArray[i]; - tagsBind[i].buffer = curTags; - tagsBind[i].is_null = &nullArray[i]; - tagsBind[i].length = (uintptr_t *)&lengthArray[i]; - - curTags += lengthArray[i]; + TAOS_STMT *pStmt = (TAOS_STMT *)stmt; + if (pStmt == NULL) { + jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon); + sprintf(errMsg, "jobj:%p, conn:%p, invalid stmt", jobj, tscon); + return (*env)->NewStringUTF(env, errMsg); } - int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind); - - int32_t nTags = (int32_t)numOfTags; - jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags); - - tfree(tagsData); - tfree(lengthArray); - tfree(typeArray); - tfree(nullArray); - tfree(tagsBind); - (*env)->ReleaseStringUTFChars(env, tableName, name); - - if (code != TSDB_CODE_SUCCESS) { - jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code)); - return JNI_TDENGINE_ERROR; - } - return JNI_SUCCESS; + return (*env)->NewStringUTF(env, taos_stmt_errstr((TAOS_STMT *)stmt)); } JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj, - jobjectArray lines, jlong conn, - jint protocol, jint precision) { + jobjectArray lines, jlong conn, + jint protocol, jint precision) { TAOS *taos = (TAOS *)conn; if (taos == NULL) { jniError("jobj:%p, connection already closed", jobj); @@ -1013,8 +1056,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JN c_lines[i] = (char *)(*env)->GetStringUTFChars(env, line, 0); } - SSqlObj* result = (SSqlObj*)taos_schemaless_insert(taos, c_lines, numLines, protocol, precision); - int code = taos_errno(result); + SSqlObj *result = (SSqlObj *)taos_schemaless_insert(taos, c_lines, numLines, protocol, precision); + int code = taos_errno(result); for (int i = 0; i < numLines; ++i) { jstring line = (jstring)((*env)->GetObjectArrayElement(env, lines, i)); diff --git a/src/connector/jdbc/CMakeLists.txt b/src/connector/jdbc/CMakeLists.txt index c5b59baefedc38fa4bf558526a8c4a1777bfb7bb..42dc541a3107dbcb82caea5e2d96b08155766cca 100644 --- a/src/connector/jdbc/CMakeLists.txt +++ b/src/connector/jdbc/CMakeLists.txt @@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED) ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} POST_BUILD COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml - COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.36-dist.jar ${LIBRARY_OUTPUT_PATH} + COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.37-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMENT "build jdbc driver") ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) diff --git a/src/connector/jdbc/deploy-pom.xml b/src/connector/jdbc/deploy-pom.xml index 926a5ef483d9f1da07dbfdeb796567d3ea077c87..e482dd97de336cb1108d40b4e14ccd946fc1425e 100755 --- a/src/connector/jdbc/deploy-pom.xml +++ b/src/connector/jdbc/deploy-pom.xml @@ -5,7 +5,7 @@ com.taosdata.jdbc taos-jdbcdriver - 2.0.36 + 2.0.37 jar JDBCDriver diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 04115e2a0ebc5924a51862cd9a49a5352cf6a5b6..4b5bcdee67e7d75f25f694e7e05c1b95c33acc65 100644 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.taosdata.jdbc taos-jdbcdriver - 2.0.36 + 2.0.37 jar JDBCDriver https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java index 77a97d644ca3da3a51bce021ab7904883ed885f4..af036e6025e071cd39d3dac38de62bb8a2689c50 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java @@ -63,7 +63,6 @@ public class TSDBConnection extends AbstractConnection { if (isClosed()) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED); } - return new TSDBPreparedStatement(this, sql); } @@ -71,7 +70,6 @@ public class TSDBConnection extends AbstractConnection { if (isClosed) { return; } - this.connector.closeConnection(); this.isClosed = true; } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 247ae929dabc9aba4d50309433a9b1866125909d..093baef3cac5b33e6be74248a289addbc1e18e9d 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -28,6 +28,8 @@ public class TSDBJNIConnector { System.loadLibrary("taos"); } + /***********************************************************************/ + //NOTE: JDBC public static void init(Properties props) throws SQLWarning { synchronized (LOCK) { if (!isInitialized) { @@ -242,6 +244,9 @@ public class TSDBJNIConnector { private native int closeConnectionImp(long connection); + /*****************************************************************************************/ + // NOTE: subscribe + /** * Create a subscription */ @@ -269,6 +274,8 @@ public class TSDBJNIConnector { private native void unsubscribeImp(long subscription, boolean isKeep); + /******************************************************************************************************/ + // NOTE: parameter binding public long prepareStmt(String sql) throws SQLException { long stmt = prepareStmtImp(sql.getBytes(), this.taos); @@ -293,16 +300,19 @@ public class TSDBJNIConnector { public void setBindTableName(long stmt, String tableName) throws SQLException { int code = setBindTableNameImp(stmt, tableName, this.taos); if (code != TSDBConstants.JNI_SUCCESS) { - throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to set table name"); + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, + "failed to set table name, reason: " + stmtErrorMsgImp(stmt, this.taos)); } } private native int setBindTableNameImp(long stmt, String name, long conn); - public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException { + public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, + ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException { int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos); if (code != TSDBConstants.JNI_SUCCESS) { - throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags"); + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, + "failed to bind table name and corresponding tags, reason: " + stmtErrorMsgImp(stmt, this.taos)); } } @@ -311,7 +321,8 @@ public class TSDBJNIConnector { public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException { int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos); if (code != TSDBConstants.JNI_SUCCESS) { - throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data"); + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, + "failed to bind column data, reason: " + stmtErrorMsgImp(stmt, this.taos)); } } @@ -320,10 +331,20 @@ public class TSDBJNIConnector { public void executeBatch(long stmt) throws SQLException { int code = executeBatchImp(stmt, this.taos); if (code != TSDBConstants.JNI_SUCCESS) { - throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind"); + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, + "failed to execute batch bind, reason: " + stmtErrorMsgImp(stmt, this.taos)); + } + } + + public void addBatch(long stmt) throws SQLException { + int code = addBatchImp(stmt, this.taos); + if (code != TSDBConstants.JNI_SUCCESS){ + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, stmtErrorMsgImp(stmt, this.taos)); } } + private native int addBatchImp(long stmt, long con); + private native int executeBatchImp(long stmt, long con); public void closeBatch(long stmt) throws SQLException { @@ -335,6 +356,10 @@ public class TSDBJNIConnector { private native int closeStmt(long stmt, long con); + private native String stmtErrorMsgImp(long stmt, long con); + + /*************************************************************************************************/ + // NOTE: schemaless-lines public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException { int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal()); if (code != TSDBConstants.JNI_SUCCESS) { diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java index 5ec28779b2fab98ddd0ea22fe84285a4394bc336..ac1e91b51d2b3ae857100036e430f92366b181d7 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBPreparedStatement.java @@ -40,25 +40,27 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat private String rawSql; private Object[] parameters; // for parameter binding - private long nativeStmtHandle = 0; + private long nativeStmtHandle; private String tableName; private ArrayList tableTags; private int tagValueLength; private ArrayList colData; - TSDBPreparedStatement(TSDBConnection connection, String sql) { + TSDBPreparedStatement(TSDBConnection connection, String sql) throws SQLException { super(connection); init(sql); - int parameterCnt = 0; - if (sql.contains("?")) { - for (int i = 0; i < sql.length(); i++) { - if ('?' == sql.charAt(i)) { - parameterCnt++; - } + if (!sql.contains("?")) + return; + for (int i = 0; i < sql.length(); i++) { + if ('?' == sql.charAt(i)) { + parameterCnt++; } } parameters = new Object[parameterCnt]; + // for parameter-binding +// TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); +// this.nativeStmtHandle = connector.prepareStmt(rawSql); if (parameterCnt > 1) { // the table name is also a parameter, so ignore it. @@ -530,8 +532,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat } public void setTableName(String name) throws SQLException { + + if (this.nativeStmtHandle == 0) { + TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); + this.nativeStmtHandle = connector.prepareStmt(rawSql); + } + if (this.tableName != null) { - this.columnDataExecuteBatch(); + this.columnDataAddBatch(); this.columnDataClearBatchInternal(); } this.tableName = name; @@ -693,7 +701,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat if (rawSql == null) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet"); } - // table name is not set yet, abort if (this.tableName == null) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet"); @@ -703,24 +710,25 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat if (numOfCols == 0) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind"); } + if (nativeStmtHandle == 0) { + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "stmt is null"); + } TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); - this.nativeStmtHandle = connector.prepareStmt(rawSql); - if (this.tableTags == null) { connector.setBindTableName(this.nativeStmtHandle, this.tableName); } else { - int num = this.tableTags.size(); + int tagSize = this.tableTags.size(); ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength); tagDataList.order(ByteOrder.LITTLE_ENDIAN); - ByteBuffer typeList = ByteBuffer.allocate(num); + ByteBuffer typeList = ByteBuffer.allocate(tagSize); typeList.order(ByteOrder.LITTLE_ENDIAN); - ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES); + ByteBuffer lengthList = ByteBuffer.allocate(tagSize * Long.BYTES); lengthList.order(ByteOrder.LITTLE_ENDIAN); - ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES); + ByteBuffer isNullList = ByteBuffer.allocate(tagSize * Integer.BYTES); isNullList.order(ByteOrder.LITTLE_ENDIAN); for (TableTagInfo tag : this.tableTags) { @@ -744,54 +752,43 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat lengthList.putLong(Byte.BYTES); break; } - case TSDBConstants.TSDB_DATA_TYPE_BOOL: { Boolean val = (Boolean) tag.value; tagDataList.put((byte) (val ? 1 : 0)); lengthList.putLong(Byte.BYTES); break; } - case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: { Short val = (Short) tag.value; tagDataList.putShort(val); lengthList.putLong(Short.BYTES); - break; } - case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: case TSDBConstants.TSDB_DATA_TYPE_BIGINT: { Long val = (Long) tag.value; tagDataList.putLong(val == null ? 0 : val); lengthList.putLong(Long.BYTES); - break; } - case TSDBConstants.TSDB_DATA_TYPE_FLOAT: { Float val = (Float) tag.value; tagDataList.putFloat(val == null ? 0 : val); lengthList.putLong(Float.BYTES); - break; } - case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: { Double val = (Double) tag.value; tagDataList.putDouble(val == null ? 0 : val); lengthList.putLong(Double.BYTES); - break; } - case TSDBConstants.TSDB_DATA_TYPE_NCHAR: case TSDBConstants.TSDB_DATA_TYPE_JSON: case TSDBConstants.TSDB_DATA_TYPE_BINARY: { String charset = TaosGlobalConfig.getCharset(); String val = (String) tag.value; - - byte[] b = null; + byte[] b; try { if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) { b = val.getBytes(); @@ -801,12 +798,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat } catch (UnsupportedEncodingException e) { throw new RuntimeException(e.getMessage()); } - tagDataList.put(b); lengthList.putLong(b.length); break; } - case TSDBConstants.TSDB_DATA_TYPE_UTINYINT: case TSDBConstants.TSDB_DATA_TYPE_USMALLINT: case TSDBConstants.TSDB_DATA_TYPE_UINT: @@ -814,13 +809,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types"); } } - typeList.put((byte) tag.type); isNullList.putInt(tag.isNull ? 1 : 0); } - connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList, - typeList, lengthList, isNullList); + connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), + tagDataList, typeList, lengthList, isNullList); } ColumnInfo colInfo = this.colData.get(0); @@ -834,7 +828,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat if (col1 == null || !col1.isTypeSet()) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind"); } - if (rows != col1.data.size()) { throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical"); } @@ -951,7 +944,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat } break; } - case TSDBConstants.TSDB_DATA_TYPE_UTINYINT: case TSDBConstants.TSDB_DATA_TYPE_USMALLINT: case TSDBConstants.TSDB_DATA_TYPE_UINT: @@ -962,6 +954,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i); } + connector.addBatch(this.nativeStmtHandle); + this.columnDataClearBatchInternal(); } public void columnDataExecuteBatch() throws SQLException { @@ -976,13 +970,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat } private void columnDataClearBatchInternal() { - int size = this.colData.size(); - this.colData.clear(); - this.colData.addAll(Collections.nCopies(size, null)); - this.tableName = null; // clear the table name + this.tableName = null; + if (this.tableTags != null) + this.tableTags.clear(); + tagValueLength = 0; + if (this.colData != null) + this.colData.clear(); } - public void columnDataCloseBatch() throws SQLException { TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); connector.closeBatch(this.nativeStmtHandle); diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ParameterBindTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ParameterBindTest.java index 63c3a6318a611f7159c0ac16dc85cd5e05de47c0..f06480bc68bfd52790b4ebb27a09dc3bb90c4d41 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ParameterBindTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ParameterBindTest.java @@ -1,11 +1,10 @@ package com.taosdata.jdbc; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import java.sql.*; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -21,13 +20,17 @@ public class ParameterBindTest { private final Random random = new Random(System.currentTimeMillis()); @Test - public void test() { + public void one_batch_multi_table() throws SQLException { // given String[] tbnames = {"t1", "t2", "t3"}; int rows = 10; // when - insertIntoTables(tbnames, 10); + String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)"; + try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { + long current = System.currentTimeMillis(); + insertIntoTables(pstmt, tbnames, current, 10); + } // then assertRows(stable, tbnames.length * rows); @@ -37,13 +40,48 @@ public class ParameterBindTest { } @Test - public void testMultiThreads() { + public void multi_batch_multi_table() throws SQLException { + // given + int rows = 10; + int batchSize = 10; + String[] tbnames = {"t1", "t2", "t3"}; + + // when + String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)"; + try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { + + long current = System.currentTimeMillis(); + + for (int i = 0; i < batchSize; i++) { + insertIntoTables(pstmt, tbnames, current + 1000 * i * rows, rows); + } + } + + // then + assertRows(stable, tbnames.length * batchSize * rows); + for (String t : tbnames) { + assertRows(t, rows * batchSize); + } + } + + @Test + public void multiThreads() { // given String[][] tables = {{"t1", "t2", "t3"}, {"t4", "t5", "t6"}, {"t7", "t8", "t9"}, {"t10"}}; int rows = 10; // when - List threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> insertIntoTables(tbnames, rows))).collect(Collectors.toList()); + List threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> { + + String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)"; + try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { + long current = System.currentTimeMillis(); + insertIntoTables(pstmt, tbnames, current, 10); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + + })).collect(Collectors.toList()); threads.forEach(Thread::start); for (Thread thread : threads) { try { @@ -59,9 +97,26 @@ public class ParameterBindTest { assertRows(t, rows); } } + } + + @Ignore + @Test + public void testOOM() throws SQLException { + String[] tbnames = {"t1", "t2", "t3", "t4", "t5", "t6", "t7", "t8", "t9", "t10"}; + String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)"; + int rows = 1000; + try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { + + long ts = Instant.now().minus(5 * 365, ChronoUnit.DAYS).getEpochSecond() * 1000; + while (true) { + insertIntoTables(pstmt, tbnames, ts, rows); + ts += 1000 * rows; + } + } } + private void assertRows(String tbname, int rows) { try (Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery("select count(*) from " + tbname); @@ -74,40 +129,36 @@ public class ParameterBindTest { } } - private void insertIntoTables(String[] tbnames, int rowsEachTable) { - long current = System.currentTimeMillis(); - String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)"; - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - for (int i = 0; i < tbnames.length; i++) { - pstmt.setTableName(tbnames[i]); - pstmt.setTagInt(0, random.nextInt(100)); - pstmt.setTagInt(1, random.nextInt(100)); - - ArrayList timestampList = new ArrayList<>(); - for (int j = 0; j < rowsEachTable; j++) { - timestampList.add(current + i * 1000 + j); - } - pstmt.setTimestamp(0, timestampList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < rowsEachTable; j++) { - f1List.add(random.nextInt(100)); - } - pstmt.setInt(1, f1List); - - ArrayList f2List = new ArrayList<>(); - for (int j = 0; j < rowsEachTable; j++) { - f2List.add(random.nextInt(100)); - } - pstmt.setInt(2, f2List); - - pstmt.columnDataAddBatch(); + private void insertIntoTables(TSDBPreparedStatement pstmt, String[] tbnames, long ts_start, int rowsEachTable) throws SQLException { + for (int i = 0; i < tbnames.length; i++) { + // set table name + pstmt.setTableName(tbnames[i]); + // set tags + pstmt.setTagInt(0, random.nextInt(100)); + pstmt.setTagInt(1, random.nextInt(100)); + // set column: ts + ArrayList timestampList = new ArrayList<>(); + for (int j = 0; j < rowsEachTable; j++) { + timestampList.add(ts_start + j * 1000L); } - - pstmt.columnDataExecuteBatch(); - } catch (SQLException e) { - e.printStackTrace(); + pstmt.setTimestamp(0, timestampList); + // set column: f1 + ArrayList f1List = new ArrayList<>(); + for (int j = 0; j < rowsEachTable; j++) { + f1List.add(random.nextInt(100)); + } + pstmt.setInt(1, f1List); + // set column: f2 + ArrayList f2List = new ArrayList<>(); + for (int j = 0; j < rowsEachTable; j++) { + f2List.add(random.nextInt(100)); + } + pstmt.setInt(2, f2List); + // add batch + pstmt.columnDataAddBatch(); } + // execute batch + pstmt.columnDataExecuteBatch(); } @Before diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java index f508fbdeed5bf617cf81330985981b5715678472..1531966689b58d5c92c7cc79eedb7b95183a77a9 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBJNIConnectorTest.java @@ -4,38 +4,25 @@ import com.taosdata.jdbc.enums.SchemalessProtocolType; import com.taosdata.jdbc.enums.SchemalessTimestampType; import org.junit.Test; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.sql.SQLException; -import java.sql.SQLWarning; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.Random; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class TSDBJNIConnectorTest { + private static final String host = "127.0.0.1"; private static TSDBResultSetRowData rowData; @Test public void test() throws SQLException { - try { - //change sleepSeconds when debugging with attach to process to find PID - int sleepSeconds = -1; - if (sleepSeconds > 0) { - RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean(); - String jvmName = runtimeBean.getName(); - long pid = Long.valueOf(jvmName.split("@")[0]); - System.out.println("JVM PID = " + pid); - - Thread.sleep(sleepSeconds * 1000); - } - } catch (Exception e) { - e.printStackTrace(); - } - // init Properties properties = new Properties(); properties.setProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR, "/etc/taos"); @@ -43,7 +30,7 @@ public class TSDBJNIConnectorTest { // connect TSDBJNIConnector connector = new TSDBJNIConnector(); - connector.connect("127.0.0.1", 6030, null, "root", "taosdata"); + connector.connect(host, 6030, null, "root", "taosdata"); // setup String setupSqlStrs[] = {"create database if not exists d precision \"us\"", @@ -141,4 +128,128 @@ public class TSDBJNIConnectorTest { } else return code != TSDBConstants.JNI_FETCH_END; } + @Test + public void param_bind_one_batch_multi_table() throws SQLException { + TSDBJNIConnector connector = new TSDBJNIConnector(); + connector.connect(host, 6030, null, "root", "taosdata"); + connector.executeQuery("drop database if exists test"); + connector.executeQuery("create database if not exists test"); + connector.executeQuery("use test"); + connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)"); + + // 1. init + prepare + long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)"); + for (int i = 0; i < 10; i++) { + // 2. set_tbname_tags + stmt_set_table_tags(connector, stmt, "t" + i); + // 3. bind_single_param_batch + // bind timestamp + long ts = System.currentTimeMillis(); + bind_col_timestamp(connector, stmt, ts, 100); + // bind int + bind_col_integer(connector, stmt, 100); + // 4. add_batch + connector.addBatch(stmt); + } + connector.executeBatch(stmt); + connector.closeBatch(stmt); + + connector.executeQuery("drop database if exists test"); + + connector.closeConnection(); + } + + @Test + public void param_bind_multi_batch_multi_table() throws SQLException { + TSDBJNIConnector connector = new TSDBJNIConnector(); + connector.connect(host, 6030, null, "root", "taosdata"); + connector.executeQuery("drop database if exists test"); + connector.executeQuery("create database if not exists test"); + connector.executeQuery("use test"); + connector.executeQuery("create table weather(ts timestamp, f1 int) tags(t1 int)"); + + // 1. init + prepare + long stmt = connector.prepareStmt("insert into ? using weather tags(?) values(?,?)"); + + long ts = System.currentTimeMillis(); + + for (int ind_batch = 0; ind_batch < 10; ind_batch++) { + + ts += ind_batch * 1000 * 1000; + System.out.println("batch: " + ind_batch + ", ts: " + ts); + + for (int i = 0; i < 10; i++) { + // 2. set_tbname_tags + stmt_set_table_tags(connector, stmt, "t" + i); + // 3. bind_single_param_batch + // bind timestamp + + bind_col_timestamp(connector, stmt, ts, 100); + // bind int + bind_col_integer(connector, stmt, 100); + // 4. add_batch + connector.addBatch(stmt); + } + connector.executeBatch(stmt); + } + + connector.closeBatch(stmt); + + connector.executeQuery("drop database if exists test"); + + connector.closeConnection(); + } + + private void bind_col_timestamp(TSDBJNIConnector connector, long stmt, long ts_start, int numOfRows) throws SQLException { + ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Long.BYTES); + colDataList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> colDataList.putLong(ts_start + ind * 1000L)); + + ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES); + lengthList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES)); + + ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES); + isNullList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0)); + + connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP, Long.BYTES, numOfRows, 0); + } + + private void bind_col_integer(TSDBJNIConnector connector, long stmt, int numOfRows) throws SQLException { + ByteBuffer colDataList = ByteBuffer.allocate(numOfRows * Integer.BYTES); + colDataList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> colDataList.putInt(new Random().nextInt(100))); + + ByteBuffer lengthList = ByteBuffer.allocate(numOfRows * Long.BYTES); + lengthList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> lengthList.putLong(Integer.BYTES)); + + ByteBuffer isNullList = ByteBuffer.allocate(numOfRows * Integer.BYTES); + isNullList.order(ByteOrder.LITTLE_ENDIAN); + IntStream.range(0, numOfRows).forEach(ind -> isNullList.putInt(0)); + + connector.bindColumnDataArray(stmt, colDataList, lengthList, isNullList, TSDBConstants.TSDB_DATA_TYPE_INT, Integer.BYTES, numOfRows, 1); + } + + private void stmt_set_table_tags(TSDBJNIConnector connector, long stmt, String tbname) throws SQLException { + ByteBuffer tagDataList = ByteBuffer.allocate(Integer.BYTES); + tagDataList.order(ByteOrder.LITTLE_ENDIAN); + tagDataList.putInt(new Random().nextInt(100)); + + ByteBuffer typeList = ByteBuffer.allocate(1); + typeList.order(ByteOrder.LITTLE_ENDIAN); + typeList.put((byte) TSDBConstants.TSDB_DATA_TYPE_INT); + + ByteBuffer lengthList = ByteBuffer.allocate(1 * Long.BYTES); + lengthList.order(ByteOrder.LITTLE_ENDIAN); + lengthList.putLong(Integer.BYTES); + + ByteBuffer isNullList = ByteBuffer.allocate(1 * Integer.BYTES); + isNullList.order(ByteOrder.LITTLE_ENDIAN); + isNullList.putInt(0); + + connector.setBindTableNameAndTags(stmt, tbname, 1, tagDataList, typeList, lengthList, isNullList); + } + }