未验证 提交 2f5f4722 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-1063]<feature>(connector): one batch bind multi tables' data (#9785)

* [TS-1063]<feature>(connector): one batch bind multi tables' data

* [TS-1063]<feature>(connector): jdbc change

* [TS-1063]<feature>(connector): fix null pointer exception

* [TS-1063]<feature>(connector): init and prepared stmt until setTableName
上级 afe03dc9
...@@ -209,6 +209,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp ...@@ -209,6 +209,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp
(JNIEnv *, jobject, jlong, jstring, jlong); (JNIEnv *, jobject, jlong, jstring, jlong);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp * Method: bindColDataImp
...@@ -217,6 +226,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI ...@@ -217,6 +226,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong); (JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: stmt_add_batch
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp * Method: executeBatchImp
...@@ -231,13 +248,12 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J ...@@ -231,13 +248,12 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
*/ */
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con); JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/** /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp * Method: stmt_errstr
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I * Signature: (JJ)I
*/ */
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
......
...@@ -370,7 +370,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J ...@@ -370,7 +370,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) { jlong tres) {
TAOS * tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, tres); int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) { if (code != JNI_SUCCESS) {
return code; return code;
...@@ -413,7 +413,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp( ...@@ -413,7 +413,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) { jlong res) {
TAOS * tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res); int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) { if (code != JNI_SUCCESS) {
return code; return code;
...@@ -429,13 +429,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm ...@@ -429,13 +429,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp(JNIEnv *env, jobject jobj, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp(JNIEnv *env, jobject jobj,
jlong con, jlong res, jlong con, jlong res,
jobject arrayListObj) { jobject arrayListObj) {
TAOS * tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res); int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) { if (code != JNI_SUCCESS) {
return code; return code;
} }
TAOS_RES * tres = (TAOS_RES *)res; TAOS_RES *tres = (TAOS_RES *)res;
TAOS_FIELD *fields = taos_fetch_fields(tres); TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t num_fields = taos_num_fields(tres); int32_t num_fields = taos_num_fields(tres);
...@@ -572,13 +572,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn ...@@ -572,13 +572,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj) { jlong res, jobject rowobj) {
TAOS * tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res); int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) { if (code != JNI_SUCCESS) {
return code; return code;
} }
TAOS_RES * tres = (TAOS_RES *)res; TAOS_RES *tres = (TAOS_RES *)res;
TAOS_FIELD *fields = taos_fetch_fields(tres); TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t numOfFields = taos_num_fields(tres); int32_t numOfFields = taos_num_fields(tres);
...@@ -810,6 +810,78 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI ...@@ -810,6 +810,78 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char *curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp( JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList, JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList,
jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) { jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
...@@ -877,7 +949,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp( ...@@ -877,7 +949,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) { jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
...@@ -891,18 +963,17 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J ...@@ -891,18 +963,17 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
taos_stmt_add_batch(pStmt); int32_t code = taos_stmt_add_batch(pStmt);
int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR; return JNI_TDENGINE_ERROR;
} }
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon); jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) { jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
...@@ -916,90 +987,61 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv ...@@ -916,90 +987,61 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
int32_t code = taos_stmt_close(pStmt); int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR; return JNI_TDENGINE_ERROR;
} }
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon); jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp( JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jlong con) {
jbyteArray lengthList, jbyteArray nullList, jlong conn) { TAOS *tscon = (TAOS *)con;
TAOS *tsconn = (TAOS *)conn; if (tscon == NULL) {
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj); jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL; return JNI_CONNECTION_NULL;
} }
TAOS_STMT *pStmt = (TAOS_STMT *)stmt; TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) { if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn); jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
jsize len = (*env)->GetArrayLength(env, tags); int32_t code = taos_stmt_close(pStmt);
char *tagsData = (char *)calloc(1, len); if (code != TSDB_CODE_SUCCESS) {
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData); jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
if ((*env)->ExceptionCheck(env)) { return JNI_TDENGINE_ERROR;
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
} }
len = (*env)->GetArrayLength(env, typeList); jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
char *typeArray = (char *)calloc(1, len); return JNI_SUCCESS;
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray); }
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList); JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt,
int32_t *nullArray = (int32_t *)calloc(1, len); jlong con) {
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray); char errMsg[128];
if ((*env)->ExceptionCheck(env)) { TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
sprintf(errMsg, "jobj:%p, connection already closed", jobj);
return (*env)->NewStringUTF(env, errMsg);
} }
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL); TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
char * curTags = tagsData; if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND)); sprintf(errMsg, "jobj:%p, conn:%p, invalid stmt", jobj, tscon);
for (int32_t i = 0; i < numOfTags; ++i) { return (*env)->NewStringUTF(env, errMsg);
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
} }
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind); return (*env)->NewStringUTF(env, taos_stmt_errstr((TAOS_STMT *)stmt));
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp (JNIEnv *env, jobject jobj, jobjectArray lines, jlong conn, jint protocol, jint precision){
(JNIEnv *env, jobject jobj, jobjectArray lines, jlong conn, jint protocol, jint precision){
TAOS *taos = (TAOS *)conn; TAOS *taos = (TAOS *)conn;
if (taos == NULL) { if (taos == NULL) {
jniError("jobj:%p, connection already closed", jobj); jniError("jobj:%p, connection already closed", jobj);
......
...@@ -29,6 +29,8 @@ public class TSDBJNIConnector { ...@@ -29,6 +29,8 @@ public class TSDBJNIConnector {
System.loadLibrary("taos"); System.loadLibrary("taos");
} }
/***********************************************************************/
//NOTE: JDBC
public static void init(Properties props) throws SQLWarning { public static void init(Properties props) throws SQLWarning {
synchronized (LOCK) { synchronized (LOCK) {
if (!isInitialized) { if (!isInitialized) {
...@@ -243,6 +245,9 @@ public class TSDBJNIConnector { ...@@ -243,6 +245,9 @@ public class TSDBJNIConnector {
private native int closeConnectionImp(long connection); private native int closeConnectionImp(long connection);
/*****************************************************************************************/
// NOTE: subscribe
/** /**
* Create a subscription * Create a subscription
*/ */
...@@ -270,6 +275,8 @@ public class TSDBJNIConnector { ...@@ -270,6 +275,8 @@ public class TSDBJNIConnector {
private native void unsubscribeImp(long subscription, boolean isKeep); private native void unsubscribeImp(long subscription, boolean isKeep);
/******************************************************************************************************/
// NOTE: parameter binding
public long prepareStmt(String sql) throws SQLException { public long prepareStmt(String sql) throws SQLException {
long stmt = prepareStmtImp(sql.getBytes(), this.taos); long stmt = prepareStmtImp(sql.getBytes(), this.taos);
...@@ -294,16 +301,19 @@ public class TSDBJNIConnector { ...@@ -294,16 +301,19 @@ public class TSDBJNIConnector {
public void setBindTableName(long stmt, String tableName) throws SQLException { public void setBindTableName(long stmt, String tableName) throws SQLException {
int code = setBindTableNameImp(stmt, tableName, this.taos); int code = setBindTableNameImp(stmt, tableName, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name: " + tableName + ", failed to set table name"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to set table name, reason: " + stmtErrorMsgImp(stmt, this.taos));
} }
} }
private native int setBindTableNameImp(long stmt, String name, long conn); private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException { public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags,
ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos); int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(), nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind table name and corresponding tags, reason: " + stmtErrorMsgImp(stmt, this.taos));
} }
} }
...@@ -312,7 +322,8 @@ public class TSDBJNIConnector { ...@@ -312,7 +322,8 @@ public class TSDBJNIConnector {
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException { public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos); int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to bind column data, reason: " + stmtErrorMsgImp(stmt, this.taos));
} }
} }
...@@ -321,10 +332,20 @@ public class TSDBJNIConnector { ...@@ -321,10 +332,20 @@ public class TSDBJNIConnector {
public void executeBatch(long stmt) throws SQLException { public void executeBatch(long stmt) throws SQLException {
int code = executeBatchImp(stmt, this.taos); int code = executeBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN,
"failed to execute batch bind, reason: " + stmtErrorMsgImp(stmt, this.taos));
}
}
public void addBatch(long stmt) throws SQLException {
int code = addBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, stmtErrorMsgImp(stmt, this.taos));
} }
} }
private native int addBatchImp(long stmt, long con);
private native int executeBatchImp(long stmt, long con); private native int executeBatchImp(long stmt, long con);
public void closeBatch(long stmt) throws SQLException { public void closeBatch(long stmt) throws SQLException {
...@@ -336,6 +357,10 @@ public class TSDBJNIConnector { ...@@ -336,6 +357,10 @@ public class TSDBJNIConnector {
private native int closeStmt(long stmt, long con); private native int closeStmt(long stmt, long con);
private native String stmtErrorMsgImp(long stmt, long con);
/*************************************************************************************************/
// NOTE: schemaless-lines
public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException { public void insertLines(String[] lines, SchemalessProtocolType protocolType, SchemalessTimestampType timestampType) throws SQLException {
int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal()); int code = insertLinesImp(lines, this.taos, protocolType.ordinal(), timestampType.ordinal());
if (code != TSDBConstants.JNI_SUCCESS) { if (code != TSDBConstants.JNI_SUCCESS) {
......
...@@ -40,25 +40,27 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -40,25 +40,27 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private String rawSql; private String rawSql;
private Object[] parameters; private Object[] parameters;
// for parameter binding // for parameter binding
private long nativeStmtHandle = 0; private long nativeStmtHandle;
private String tableName; private String tableName;
private ArrayList<TableTagInfo> tableTags; private ArrayList<TableTagInfo> tableTags;
private int tagValueLength; private int tagValueLength;
private ArrayList<ColumnInfo> colData; private ArrayList<ColumnInfo> colData;
TSDBPreparedStatement(TSDBConnection connection, String sql) { TSDBPreparedStatement(TSDBConnection connection, String sql) throws SQLException {
super(connection); super(connection);
init(sql); init(sql);
int parameterCnt = 0; int parameterCnt = 0;
if (sql.contains("?")) { if (!sql.contains("?"))
return;
for (int i = 0; i < sql.length(); i++) { for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) { if ('?' == sql.charAt(i)) {
parameterCnt++; parameterCnt++;
} }
} }
}
parameters = new Object[parameterCnt]; parameters = new Object[parameterCnt];
// for parameter-binding
// TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
// this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (parameterCnt > 1) { if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it. // the table name is also a parameter, so ignore it.
...@@ -530,8 +532,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -530,8 +532,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} }
public void setTableName(String name) throws SQLException { public void setTableName(String name) throws SQLException {
if (this.nativeStmtHandle == 0) {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
}
if (this.tableName != null) { if (this.tableName != null) {
this.columnDataExecuteBatch(); this.columnDataAddBatch();
this.columnDataClearBatchInternal(); this.columnDataClearBatchInternal();
} }
this.tableName = name; this.tableName = name;
...@@ -681,7 +689,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -681,7 +689,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (rawSql == null) { if (rawSql == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "sql statement not set yet");
} }
// table name is not set yet, abort // table name is not set yet, abort
if (this.tableName == null) { if (this.tableName == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "table name not set yet");
...@@ -691,24 +698,25 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -691,24 +698,25 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (numOfCols == 0) { if (numOfCols == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
} }
if (nativeStmtHandle == 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "stmt is null");
}
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
if (this.tableTags == null) { if (this.tableTags == null) {
connector.setBindTableName(this.nativeStmtHandle, this.tableName); connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} else { } else {
int num = this.tableTags.size(); int tagSize = this.tableTags.size();
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength); ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
tagDataList.order(ByteOrder.LITTLE_ENDIAN); tagDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer typeList = ByteBuffer.allocate(num); ByteBuffer typeList = ByteBuffer.allocate(tagSize);
typeList.order(ByteOrder.LITTLE_ENDIAN); typeList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES); ByteBuffer lengthList = ByteBuffer.allocate(tagSize * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN); lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES); ByteBuffer isNullList = ByteBuffer.allocate(tagSize * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN); isNullList.order(ByteOrder.LITTLE_ENDIAN);
for (TableTagInfo tag : this.tableTags) { for (TableTagInfo tag : this.tableTags) {
...@@ -732,53 +740,42 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -732,53 +740,42 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
lengthList.putLong(Byte.BYTES); lengthList.putLong(Byte.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_BOOL: { case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
Boolean val = (Boolean) tag.value; Boolean val = (Boolean) tag.value;
tagDataList.put((byte) (val ? 1 : 0)); tagDataList.put((byte) (val ? 1 : 0));
lengthList.putLong(Byte.BYTES); lengthList.putLong(Byte.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: { case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
Short val = (Short) tag.value; Short val = (Short) tag.value;
tagDataList.putShort(val); tagDataList.putShort(val);
lengthList.putLong(Short.BYTES); lengthList.putLong(Short.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: { case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
Long val = (Long) tag.value; Long val = (Long) tag.value;
tagDataList.putLong(val == null ? 0 : val); tagDataList.putLong(val == null ? 0 : val);
lengthList.putLong(Long.BYTES); lengthList.putLong(Long.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: { case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
Float val = (Float) tag.value; Float val = (Float) tag.value;
tagDataList.putFloat(val == null ? 0 : val); tagDataList.putFloat(val == null ? 0 : val);
lengthList.putLong(Float.BYTES); lengthList.putLong(Float.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: { case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
Double val = (Double) tag.value; Double val = (Double) tag.value;
tagDataList.putDouble(val == null ? 0 : val); tagDataList.putDouble(val == null ? 0 : val);
lengthList.putLong(Double.BYTES); lengthList.putLong(Double.BYTES);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: { case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset(); String charset = TaosGlobalConfig.getCharset();
String val = (String) tag.value; String val = (String) tag.value;
byte[] b;
byte[] b = null;
try { try {
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) { if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes(); b = val.getBytes();
...@@ -788,12 +785,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -788,12 +785,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage()); throw new RuntimeException(e.getMessage());
} }
tagDataList.put(b); tagDataList.put(b);
lengthList.putLong(b.length); lengthList.putLong(b.length);
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT: case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT: case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT: case TSDBConstants.TSDB_DATA_TYPE_UINT:
...@@ -801,13 +796,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -801,13 +796,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
} }
} }
typeList.put((byte) tag.type); typeList.put((byte) tag.type);
isNullList.putInt(tag.isNull ? 1 : 0); isNullList.putInt(tag.isNull ? 1 : 0);
} }
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList, connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(),
typeList, lengthList, isNullList); tagDataList, typeList, lengthList, isNullList);
} }
ColumnInfo colInfo = this.colData.get(0); ColumnInfo colInfo = this.colData.get(0);
...@@ -821,7 +815,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -821,7 +815,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (col1 == null || !col1.isTypeSet()) { if (col1 == null || !col1.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data not bind");
} }
if (rows != col1.data.size()) { if (rows != col1.data.size()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical"); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "the rows in column data not identical");
} }
...@@ -938,7 +931,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -938,7 +931,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} }
break; break;
} }
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT: case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT: case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT: case TSDBConstants.TSDB_DATA_TYPE_UINT:
...@@ -949,6 +941,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -949,6 +941,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i); connector.bindColumnDataArray(this.nativeStmtHandle, colDataList, lengthList, isNullList, col1.type, col1.bytes, rows, i);
} }
connector.addBatch(this.nativeStmtHandle);
this.columnDataClearBatchInternal();
} }
public void columnDataExecuteBatch() throws SQLException { public void columnDataExecuteBatch() throws SQLException {
...@@ -963,13 +957,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -963,13 +957,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} }
private void columnDataClearBatchInternal() { private void columnDataClearBatchInternal() {
int size = this.colData.size(); this.tableName = null;
if (this.tableTags != null)
this.tableTags.clear();
tagValueLength = 0;
if (this.colData != null)
this.colData.clear(); this.colData.clear();
this.colData.addAll(Collections.nCopies(size, null));
this.tableName = null; // clear the table name
} }
public void columnDataCloseBatch() throws SQLException { public void columnDataCloseBatch() throws SQLException {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
connector.closeBatch(this.nativeStmtHandle); connector.closeBatch(this.nativeStmtHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册