提交 87ab7482 编写于 作者: H Haojun Liao

[TD-2070]<enhance>: optimize the load data performance.

上级 91dcb46d
...@@ -129,6 +129,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData ...@@ -129,6 +129,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp
(JNIEnv *, jobject, jlong, jlong, jobject); (JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: fetchBlockImp
* Signature: (JJLcom/taosdata/jdbc/TSDBResultSetBlockData;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/* /*
* Class: com_taosdata_jdbc_TSDBJNIConnector * Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: closeConnectionImp * Method: closeConnectionImp
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#include "taos.h" #include "taos.h"
#include "tlog.h" #include "tlog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h" #include "com_taosdata_jdbc_TSDBJNIConnector.h"
...@@ -57,6 +56,10 @@ jmethodID g_rowdataSetStringFp; ...@@ -57,6 +56,10 @@ jmethodID g_rowdataSetStringFp;
jmethodID g_rowdataSetTimestampFp; jmethodID g_rowdataSetTimestampFp;
jmethodID g_rowdataSetByteArrayFp; jmethodID g_rowdataSetByteArrayFp;
jmethodID g_blockdataSetByteArrayFp;
jmethodID g_blockdataSetNumOfRowsFp;
jmethodID g_blockdataSetNumOfColsFp;
#define JNI_SUCCESS 0 #define JNI_SUCCESS 0
#define JNI_TDENGINE_ERROR -1 #define JNI_TDENGINE_ERROR -1
#define JNI_CONNECTION_NULL -2 #define JNI_CONNECTION_NULL -2
...@@ -66,7 +69,7 @@ jmethodID g_rowdataSetByteArrayFp; ...@@ -66,7 +69,7 @@ jmethodID g_rowdataSetByteArrayFp;
#define JNI_FETCH_END -6 #define JNI_FETCH_END -6
#define JNI_OUT_OF_MEMORY -7 #define JNI_OUT_OF_MEMORY -7
void jniGetGlobalMethod(JNIEnv *env) { static void jniGetGlobalMethod(JNIEnv *env) {
// make sure init function executed once // make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init, 0, 1)) { switch (atomic_val_compare_exchange_32(&__init, 0, 1)) {
case 0: case 0:
...@@ -114,10 +117,31 @@ void jniGetGlobalMethod(JNIEnv *env) { ...@@ -114,10 +117,31 @@ void jniGetGlobalMethod(JNIEnv *env) {
g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V"); g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V");
(*env)->DeleteLocalRef(env, rowdataClass); (*env)->DeleteLocalRef(env, rowdataClass);
jclass blockdataClass = (*env)->FindClass(env, "com/taosdata/jdbc/TSDBResultSetBlockData");
jclass g_blockdataClass = (*env)->NewGlobalRef(env, blockdataClass);
g_blockdataSetByteArrayFp = (*env)->GetMethodID(env, g_blockdataClass, "setByteArray", "(II[B)V");
g_blockdataSetNumOfRowsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfRows", "(I)V");
g_blockdataSetNumOfColsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfCols", "(I)V");
(*env)->DeleteLocalRef(env, blockdataClass);
atomic_store_32(&__init, 2); atomic_store_32(&__init, 2);
jniDebug("native method register finished"); jniDebug("native method register finished");
} }
static int32_t check_for_params(jobject jobj, jlong conn, jlong res) {
if ((TAOS*) conn == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((TAOS_RES *) res == NULL) {
jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS*) conn);
return JNI_RESULT_SET_NULL;
}
return JNI_SUCCESS;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode, JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode,
jstring jPath, jboolean jAutoDump) { jstring jPath, jboolean jAutoDump) {
if (jPath != NULL) { if (jPath != NULL) {
...@@ -192,39 +216,37 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv ...@@ -192,39 +216,37 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost,
jint jport, jstring jdbName, jstring juser, jint jport, jstring jdbName, jstring juser,
jstring jpass) { jstring jpass) {
jlong ret = 0; jlong ret = 0;
const char *host = NULL; const char *host = NULL;
const char *dbname = NULL;
const char *user = NULL; const char *user = NULL;
const char *pass = NULL; const char *pass = NULL;
const char *dbname = NULL;
if (jhost != NULL) { if (jhost != NULL) {
host = (*env)->GetStringUTFChars(env, jhost, NULL); host = (*env)->GetStringUTFChars(env, jhost, NULL);
} }
if (jdbName != NULL) { if (jdbName != NULL) {
dbname = (*env)->GetStringUTFChars(env, jdbName, NULL); dbname = (*env)->GetStringUTFChars(env, jdbName, NULL);
} }
if (juser != NULL) { if (juser != NULL) {
user = (*env)->GetStringUTFChars(env, juser, NULL); user = (*env)->GetStringUTFChars(env, juser, NULL);
} }
if (jpass != NULL) { if (jpass != NULL) {
pass = (*env)->GetStringUTFChars(env, jpass, NULL); pass = (*env)->GetStringUTFChars(env, jpass, NULL);
} }
if (user == NULL) { if (user == NULL) {
jniDebug("jobj:%p, user is null, use default user %s", jobj, TSDB_DEFAULT_USER); jniDebug("jobj:%p, user not specified, use default user %s", jobj, TSDB_DEFAULT_USER);
} }
if (pass == NULL) { if (pass == NULL) {
jniDebug("jobj:%p, pass is null, use default password", jobj); jniDebug("jobj:%p, pass not specified, use default password", jobj);
} }
/* ret = (jlong) taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
* set numOfThreadsPerCore = 0
* means only one thread for client side scheduler
*/
tsNumOfThreadsPerCore = 0.0;
ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
if (ret == 0) { if (ret == 0) {
jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret, jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, (int32_t)jport); (char *)host, (char *)user, (char *)dbname, (int32_t)jport);
...@@ -233,10 +255,21 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn ...@@ -233,10 +255,21 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
(char *)host, (char *)user, (char *)dbname, (int32_t)jport); (char *)host, (char *)user, (char *)dbname, (int32_t)jport);
} }
if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host); if (host != NULL) {
if (dbname != NULL) (*env)->ReleaseStringUTFChars(env, jdbName, dbname); (*env)->ReleaseStringUTFChars(env, jhost, host);
if (user != NULL) (*env)->ReleaseStringUTFChars(env, juser, user); }
if (pass != NULL) (*env)->ReleaseStringUTFChars(env, jpass, pass);
if (dbname != NULL) {
(*env)->ReleaseStringUTFChars(env, jdbName, dbname);
}
if (user != NULL) {
(*env)->ReleaseStringUTFChars(env, juser, user);
}
if (pass != NULL) {
(*env)->ReleaseStringUTFChars(env, jpass, pass);
}
return ret; return ret;
} }
...@@ -245,64 +278,53 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp( ...@@ -245,64 +278,53 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
jbyteArray jsql, jlong con) { jbyteArray jsql, jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
jniError("jobj:%p, connection is already closed", jobj); jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL; return JNI_CONNECTION_NULL;
} }
if (jsql == NULL) { if (jsql == NULL) {
jniError("jobj:%p, conn:%p, sql is null", jobj, tscon); jniError("jobj:%p, conn:%p, empty sql string", jobj, tscon);
return JNI_SQL_NULL; return JNI_SQL_NULL;
} }
jsize len = (*env)->GetArrayLength(env, jsql); jsize len = (*env)->GetArrayLength(env, jsql);
char *dst = (char *)calloc(1, sizeof(char) * (len + 1)); char *str = (char *) calloc(1, sizeof(char) * (len + 1));
if (dst == NULL) { if (str == NULL) {
jniError("jobj:%p, conn:%p, can not alloc memory", jobj, tscon); jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
return JNI_OUT_OF_MEMORY; return JNI_OUT_OF_MEMORY;
} }
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst); (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
// todo handle error // todo handle error
} }
jniDebug("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst); SSqlObj *pSql = taos_query(tscon, str);
SSqlObj *pSql = taos_query(tscon, dst);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql)); jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql));
} else { } else {
int32_t affectRows = 0;
if (pSql->cmd.command == TSDB_SQL_INSERT) { if (pSql->cmd.command == TSDB_SQL_INSERT) {
affectRows = taos_affected_rows(pSql); int32_t affectRows = taos_affected_rows(pSql);
jniDebug("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows); jniDebug("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows);
} else { } else {
jniDebug("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniDebug("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
} }
} }
free(dst); free(str);
return (jlong)pSql; return (jlong) pSql;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) { JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) {
TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, tres);
if (tscon == NULL) { if (code != JNI_SUCCESS) {
jniError("jobj:%p, connection is closed", jobj); return code;
return (jint)TSDB_CODE_TSC_INVALID_CONNECTION;
} }
if ((void *)tres == NULL) { return (jint)taos_errno((TAOS_RES*) tres);
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
TAOS_RES *pSql = (TAOS_RES *)tres;
return (jint)taos_errno(pSql);
} }
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) { JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) {
...@@ -313,23 +335,16 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J ...@@ -313,23 +335,16 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) { jlong tres) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { int32_t code = check_for_params(jobj, con, tres);
jniError("jobj:%p, connection is closed", jobj); if (code != JNI_SUCCESS) {
return JNI_CONNECTION_NULL; return code;
}
if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
} }
SSqlObj *pSql = (TAOS_RES *)tres; SSqlObj *pSql = (TAOS_RES *)tres;
STscObj *pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) { if (tscIsUpdateQuery(pSql)) {
jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, pObj, (void *)tres); jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, tscon, (void *)tres);
} else { } else {
jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *)tres); jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, tscon, (void *)tres);
} }
return tres; return tres;
...@@ -337,15 +352,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp( ...@@ -337,15 +352,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) { jlong tres) {
TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, tres);
if (tscon == NULL) { if (code != JNI_SUCCESS) {
jniError("jobj:%p, connection is closed", jobj); return code;
return JNI_CONNECTION_NULL;
}
if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
} }
SSqlObj *pSql = (TAOS_RES *)tres; SSqlObj *pSql = (TAOS_RES *)tres;
...@@ -355,37 +364,27 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp( ...@@ -355,37 +364,27 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) { jlong res) {
TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res);
if (tscon == NULL) { if (code != JNI_SUCCESS) {
jniError("jobj:%p, connection is closed", jobj); return code;
return JNI_CONNECTION_NULL;
}
if ((void *)res == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
} }
taos_free_result((void *)res); taos_free_result((void *)res);
jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, tscon, (void *)res); jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS*) con, (void *)res);
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) { jlong res) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { int32_t code = check_for_params(jobj, con, res);
jniError("jobj:%p, connection is closed", jobj); if (code != JNI_SUCCESS) {
return JNI_CONNECTION_NULL; return code;
}
if ((void *)res == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
} }
jint ret = taos_affected_rows((SSqlObj *)res); jint ret = taos_affected_rows((SSqlObj *)res);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (void *)con, (void *)res, (int32_t)ret); jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, (int32_t)ret);
return ret; return ret;
} }
...@@ -394,27 +393,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData ...@@ -394,27 +393,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
jlong con, jlong res, jlong con, jlong res,
jobject arrayListObj) { jobject arrayListObj) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { int32_t code = check_for_params(jobj, con, res);
jniError("jobj:%p, connection is closed", jobj); if (code != JNI_SUCCESS) {
return JNI_CONNECTION_NULL; return code;
}
TAOS_RES *result = (TAOS_RES *)res;
if (result == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
} }
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_RES* tres = (TAOS_RES*) res;
int num_fields = taos_num_fields(result); TAOS_FIELD *fields = taos_fetch_fields(tres);
// jobject arrayListObj = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp, "");
int32_t num_fields = taos_num_fields(tres);
if (num_fields == 0) { if (num_fields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields); jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
return JNI_NUM_OF_FIELDS_0; return JNI_NUM_OF_FIELDS_0;
} else { } else {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields); jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
jobject metadataObj = (*env)->NewObject(env, g_metadataClass, g_metadataConstructFp); jobject metadataObj = (*env)->NewObject(env, g_metadataClass, g_metadataConstructFp);
(*env)->SetIntField(env, metadataObj, g_metadataColtypeField, fields[i].type); (*env)->SetIntField(env, metadataObj, g_metadataColtypeField, fields[i].type);
...@@ -457,21 +449,21 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn ...@@ -457,21 +449,21 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
} }
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
int num_fields = taos_num_fields(result);
if (num_fields == 0) { int32_t numOfFields = taos_num_fields(result);
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void*)res, num_fields); if (numOfFields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void*)res, numOfFields);
return JNI_NUM_OF_FIELDS_0; return JNI_NUM_OF_FIELDS_0;
} }
TAOS_ROW row = taos_fetch_row(result); TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) { if (row == NULL) {
int tserrno = taos_errno(result); int code = taos_errno(result);
if (tserrno == 0) { if (code == TSDB_CODE_SUCCESS) {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, num_fields); jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, numOfFields);
return JNI_FETCH_END; return JNI_FETCH_END;
} else { } else {
jniDebug("jobj:%p, conn:%p, interruptted query", jobj, tscon); jniDebug("jobj:%p, conn:%p, interrupted query", jobj, tscon);
return JNI_RESULT_SET_NULL; return JNI_RESULT_SET_NULL;
} }
} }
...@@ -480,7 +472,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn ...@@ -480,7 +472,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
for (int i = 0; i < num_fields; i++) { for (int i = 0; i < numOfFields; i++) {
if (row[i] == NULL) { if (row[i] == NULL) {
continue; continue;
} }
...@@ -534,6 +526,45 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn ...@@ -534,6 +526,45 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj) {
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
TAOS_RES * tres = (TAOS_RES *)res;
TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t numOfFields = taos_num_fields(tres);
assert(numOfFields > 0);
TAOS_ROW row = NULL;
int32_t numOfRows = taos_fetch_block(tres, &row);
if (numOfRows == 0) {
code = taos_errno(tres);
if (code == JNI_SUCCESS) {
jniDebug("jobj:%p, conn:%p, resultset:%p, numOfFields:%d, no data to retrieve", jobj, tscon, (void *)res,
numOfFields);
return JNI_FETCH_END;
} else {
jniDebug("jobj:%p, conn:%p, query interrupted", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
}
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
for (int i = 0; i < numOfFields; i++) {
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, fields[i].bytes * numOfRows,
jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes * numOfRows));
}
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionImp(JNIEnv *env, jobject jobj, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionImp(JNIEnv *env, jobject jobj,
jlong con) { jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
...@@ -589,7 +620,6 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEn ...@@ -589,7 +620,6 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEn
jniGetGlobalMethod(env); jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub; TAOS_SUB *tsub = (TAOS_SUB *)sub;
TAOS_RES *res = taos_consume(tsub); TAOS_RES *res = taos_consume(tsub);
if (res == NULL) { if (res == NULL) {
...@@ -621,16 +651,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab ...@@ -621,16 +651,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab
jsize len = (*env)->GetArrayLength(env, jsql); jsize len = (*env)->GetArrayLength(env, jsql);
char *dst = (char *)calloc(1, sizeof(char) * (len + 1)); char *str = (char *)calloc(1, sizeof(char) * (len + 1));
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst); (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
// todo handle error // todo handle error
} }
int code = taos_validate_sql(tscon, dst); int code = taos_validate_sql(tscon, str);
jniDebug("jobj:%p, conn:%p, code is %d", jobj, tscon, code); jniDebug("jobj:%p, conn:%p, code is %d", jobj, tscon, code);
free(dst); free(str);
return code; return code;
} }
......
...@@ -2196,16 +2196,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2196,16 +2196,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} }
// if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
//
// }
//
// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY) ||
// ((!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || pCmd->command == TSDB_SQL_RETRIEVE) &&
// !(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)))) {
// tscSetResRawPtr(pRes, pQueryInfo);
// }
if (pSql->pSubscription != NULL) { if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
...@@ -243,6 +243,11 @@ public class TSDBJNIConnector { ...@@ -243,6 +243,11 @@ public class TSDBJNIConnector {
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData); private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
return this.fetchBlockImp(this.taos, resultSet, blockData);
}
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
/** /**
* Execute close operation from C to release connection pointer by JNI * Execute close operation from C to release connection pointer by JNI
* *
......
...@@ -47,10 +47,14 @@ public class TSDBResultSet implements ResultSet { ...@@ -47,10 +47,14 @@ public class TSDBResultSet implements ResultSet {
private List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>(); private List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
private TSDBResultSetRowData rowData; private TSDBResultSetRowData rowData;
private TSDBResultSetBlockData blockData;
private boolean blockwiseFetch = false;
private boolean lastWasNull = false; private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1; private final int COLUMN_INDEX_START_VALUE = 1;
private int rowIndex = 0;
public TSDBJNIConnector getJniConnector() { public TSDBJNIConnector getJniConnector() {
return jniConnector; return jniConnector;
} }
...@@ -67,6 +71,10 @@ public class TSDBResultSet implements ResultSet { ...@@ -67,6 +71,10 @@ public class TSDBResultSet implements ResultSet {
this.resultSetPointer = resultSetPointer; this.resultSetPointer = resultSetPointer;
} }
public void setBlockWiseFetch(boolean fetchBlock) {
this.blockwiseFetch = fetchBlock;
}
public List<ColumnMetaData> getColumnMetaDataList() { public List<ColumnMetaData> getColumnMetaDataList() {
return columnMetaDataList; return columnMetaDataList;
} }
...@@ -107,6 +115,7 @@ public class TSDBResultSet implements ResultSet { ...@@ -107,6 +115,7 @@ public class TSDBResultSet implements ResultSet {
} }
this.rowData = new TSDBResultSetRowData(this.columnMetaDataList.size()); this.rowData = new TSDBResultSetRowData(this.columnMetaDataList.size());
this.blockData = new TSDBResultSetBlockData(this.columnMetaDataList, this.columnMetaDataList.size());
} }
public <T> T unwrap(Class<T> iface) throws SQLException { public <T> T unwrap(Class<T> iface) throws SQLException {
...@@ -118,21 +127,42 @@ public class TSDBResultSet implements ResultSet { ...@@ -118,21 +127,42 @@ public class TSDBResultSet implements ResultSet {
} }
public boolean next() throws SQLException { public boolean next() throws SQLException {
if (rowData != null) { if (this.blockwiseFetch) {
this.rowData.clear(); if (this.blockData.forward()) {
} return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.resetCursor();
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
} else {
return true; return true;
} else {
if (rowData != null) {
this.rowData.clear();
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
} else {
return true;
}
} }
} }
...@@ -155,21 +185,30 @@ public class TSDBResultSet implements ResultSet { ...@@ -155,21 +185,30 @@ public class TSDBResultSet implements ResultSet {
String res = null; String res = null;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getString(colIndex);
} }
return res;
} }
public boolean getBoolean(int columnIndex) throws SQLException { public boolean getBoolean(int columnIndex) throws SQLException {
boolean res = false; boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
} else {
return this.blockData.getBoolean(colIndex);
} }
return res; return res;
} }
...@@ -177,66 +216,91 @@ public class TSDBResultSet implements ResultSet { ...@@ -177,66 +216,91 @@ public class TSDBResultSet implements ResultSet {
byte res = 0; byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (byte) this.blockData.getInt(colIndex);
} }
return res;
} }
public short getShort(int columnIndex) throws SQLException { public short getShort(int columnIndex) throws SQLException {
short res = 0; short res = 0;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (short) this.blockData.getInt(colIndex);
} }
return res;
} }
public int getInt(int columnIndex) throws SQLException { public int getInt(int columnIndex) throws SQLException {
int res = 0; int res = 0;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getInt(colIndex);
} }
return res;
} }
public long getLong(int columnIndex) throws SQLException { public long getLong(int columnIndex) throws SQLException {
long res = 0l; long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getLong(colIndex);
} }
return res;
} }
public float getFloat(int columnIndex) throws SQLException { public float getFloat(int columnIndex) throws SQLException {
float res = 0; float res = 0;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (float) this.blockData.getDouble(colIndex);
} }
return res;
} }
public double getDouble(int columnIndex) throws SQLException { public double getDouble(int columnIndex) throws SQLException {
double res = 0; double res = 0;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType()); if (!lastWasNull) {
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getDouble(colIndex);
} }
return res;
} }
/* /*
...@@ -249,25 +313,11 @@ public class TSDBResultSet implements ResultSet { ...@@ -249,25 +313,11 @@ public class TSDBResultSet implements ResultSet {
*/ */
@Deprecated @Deprecated
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
BigDecimal res = null; return new BigDecimal(getLong(columnIndex));
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
}
return res;
} }
public byte[] getBytes(int columnIndex) throws SQLException { public byte[] getBytes(int columnIndex) throws SQLException {
byte[] res = null; return getString(columnIndex).getBytes();
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
}
return res;
} }
public Date getDate(int columnIndex) throws SQLException { public Date getDate(int columnIndex) throws SQLException {
...@@ -284,11 +334,15 @@ public class TSDBResultSet implements ResultSet { ...@@ -284,11 +334,15 @@ public class TSDBResultSet implements ResultSet {
Timestamp res = null; Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
if (!lastWasNull) { this.lastWasNull = this.rowData.wasNull(colIndex);
res = this.rowData.getTimestamp(colIndex); if (!lastWasNull) {
res = this.rowData.getTimestamp(colIndex);
}
return res;
} else {
return this.blockData.getTimestamp(columnIndex);
} }
return res;
} }
public InputStream getAsciiStream(int columnIndex) throws SQLException { public InputStream getAsciiStream(int columnIndex) throws SQLException {
...@@ -400,8 +454,12 @@ public class TSDBResultSet implements ResultSet { ...@@ -400,8 +454,12 @@ public class TSDBResultSet implements ResultSet {
public Object getObject(int columnIndex) throws SQLException { public Object getObject(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
return this.rowData.get(colIndex); this.lastWasNull = this.rowData.wasNull(colIndex);
return this.rowData.get(colIndex);
} else {
return this.blockData.get(colIndex);
}
} }
public Object getObject(String columnLabel) throws SQLException { public Object getObject(String columnLabel) throws SQLException {
...@@ -433,8 +491,12 @@ public class TSDBResultSet implements ResultSet { ...@@ -433,8 +491,12 @@ public class TSDBResultSet implements ResultSet {
public BigDecimal getBigDecimal(int columnIndex) throws SQLException { public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex); int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex); if (!this.blockwiseFetch) {
return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType())); this.lastWasNull = this.rowData.wasNull(colIndex);
return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
} else {
return new BigDecimal(this.blockData.getLong(colIndex));
}
} }
public BigDecimal getBigDecimal(String columnLabel) throws SQLException { public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
......
...@@ -218,5 +218,4 @@ public class TSDBResultSetRowData { ...@@ -218,5 +218,4 @@ public class TSDBResultSetRowData {
public void setData(ArrayList<Object> data) { public void setData(ArrayList<Object> data) {
this.data = (ArrayList<Object>) data.clone(); this.data = (ArrayList<Object>) data.clone();
} }
} }
...@@ -81,7 +81,9 @@ public class TSDBStatement implements Statement { ...@@ -81,7 +81,9 @@ public class TSDBStatement implements Statement {
} }
if (!this.connecter.isUpdateQuery(pSql)) { if (!this.connecter.isUpdateQuery(pSql)) {
return new TSDBResultSet(this.connecter, resultSetPointer); TSDBResultSet res = new TSDBResultSet(this.connecter, resultSetPointer);
res.setBlockWiseFetch(true);
return res;
} else { } else {
this.connecter.freeResultSet(pSql); this.connecter.freeResultSet(pSql);
return null; return null;
...@@ -136,7 +138,7 @@ public class TSDBStatement implements Statement { ...@@ -136,7 +138,7 @@ public class TSDBStatement implements Statement {
} }
public void setMaxRows(int max) throws SQLException { public void setMaxRows(int max) throws SQLException {
// always set maxRows to zero, meaning unlimitted rows in a resultSet // always set maxRows to zero, meaning unlimited rows in a resultSet
} }
public void setEscapeProcessing(boolean enable) throws SQLException { public void setEscapeProcessing(boolean enable) throws SQLException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册