未验证 提交 822fca47 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-1063]<feature>(connector): one batch bind multi tables' data (#9786)

* [TS-1063]<feature>(connector): one batch bind multi tables' data

* [TS-1063]<feature>(connector): init and prepared stmt until setTableName

* [TS-1063]<feature>(connector): init and prepared stmt until setTableName
上级 a31f2e4f
......@@ -209,6 +209,15 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp
(JNIEnv *, jobject, jlong, jstring, jlong);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp
......@@ -217,6 +226,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: stmt_add_batch
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
......@@ -231,13 +248,12 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
* Method: stmt_errstr
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......
......@@ -805,6 +805,78 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char *curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
JNIEnv *env, jobject jobj, jlong stmt, jbyteArray colDataList, jbyteArray lengthList, jbyteArray nullList,
jint dataType, jint dataBytes, jint numOfRows, jint colIndex, jlong con) {
......@@ -872,8 +944,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -886,19 +958,18 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
return JNI_SQL_NULL;
}
taos_stmt_add_batch(pStmt);
int32_t code = taos_stmt_execute(pStmt);
int32_t code = taos_stmt_add_batch(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
......@@ -911,86 +982,58 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
return JNI_SQL_NULL;
}
int32_t code = taos_stmt_close(pStmt);
int32_t code = taos_stmt_execute(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
jniDebug("jobj:%p, conn:%p, batch execute", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(
JNIEnv *env, jobject jobj, jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList,
jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
int32_t code = taos_stmt_close(pStmt);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte *)typeArray);
if ((*env)->ExceptionCheck(env)) {
}
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_stmtErrorMsgImp(JNIEnv *env, jobject jobj, jlong stmt,
jlong con) {
char errMsg[128];
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection already closed", jobj);
sprintf(errMsg, "jobj:%p, connection already closed", jobj);
return (*env)->NewStringUTF(env, errMsg);
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char *curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for (int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t *)&lengthArray[i];
curTags += lengthArray[i];
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
sprintf(errMsg, "jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return (*env)->NewStringUTF(env, errMsg);
}
int32_t code = taos_stmt_set_tbname_tags((void *)stmt, name, tagsBind);
int32_t nTags = (int32_t)numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
tfree(tagsBind);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
return (*env)->NewStringUTF(env, taos_stmt_errstr((TAOS_STMT *)stmt));
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
......
......@@ -59,7 +59,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
}
parameters = new Object[parameterCnt];
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.
this.colData = new ArrayList<>();
......@@ -530,8 +529,14 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
public void setTableName(String name) throws SQLException {
if (this.nativeStmtHandle == 0) {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
}
if (this.tableName != null) {
this.columnDataExecuteBatch();
this.columnDataAddBatch();
this.columnDataClearBatchInternal();
}
this.tableName = name;
......@@ -963,10 +968,12 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
private void columnDataClearBatchInternal() {
int size = this.colData.size();
this.colData.clear();
this.colData.addAll(Collections.nCopies(size, null));
this.tableName = null; // clear the table name
this.tableName = null;
if (this.tableTags != null)
this.tableTags.clear();
this.tagValueLength = 0;
if (this.colData != null)
this.colData.clear();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册