提交 5800fdf9 编写于 作者: H Haojun Liao

[td-314]

上级 18528495
...@@ -13,19 +13,35 @@ ...@@ -13,19 +13,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "tscSubquery.h" #include "tlog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tlog.h"
#include "ttime.h" #include "ttime.h"
#define jniError(...) { if (jniDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR JNI ", jniDebugFlag, __VA_ARGS__); }} #include "com_taosdata_jdbc_TSDBJNIConnector.h"
#define jniWarn(...) { if (jniDebugFlag & DEBUG_WARN) { taosPrintLog("WARN JNI ", jniDebugFlag, __VA_ARGS__); }}
#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); }} #define jniError(...) \
#define jniPrint(...) { taosPrintLog("JNI ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); } { \
if (jniDebugFlag & DEBUG_ERROR) { \
taosPrintLog("ERROR JNI ", jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniWarn(...) \
{ \
if (jniDebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN JNI ", jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniTrace(...) \
{ \
if (jniDebugFlag & DEBUG_TRACE) { \
taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \
} \
}
#define jniPrint(...) \
{ taosPrintLog("JNI ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }
int __init = 0; int __init = 0;
...@@ -117,7 +133,8 @@ void jniGetGlobalMethod(JNIEnv *env) { ...@@ -117,7 +133,8 @@ void jniGetGlobalMethod(JNIEnv *env) {
jniTrace("native method register finished"); jniTrace("native method register finished");
} }
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode, jstring jPath, jboolean jAutoDump) { JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode,
jstring jPath, jboolean jAutoDump) {
if (jPath != NULL) { if (jPath != NULL) {
const char *path = (*env)->GetStringUTFChars(env, jPath, NULL); const char *path = (*env)->GetStringUTFChars(env, jPath, NULL);
taosSetAllocMode(jMode, path, !!jAutoDump); taosSetAllocMode(jMode, path, !!jAutoDump);
...@@ -219,9 +236,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn ...@@ -219,9 +236,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
} }
/* /*
* set numOfThreadsPerCore = 0 * set numOfThreadsPerCore = 0
* means only one thread for client side scheduler * means only one thread for client side scheduler
*/ */
tsNumOfThreadsPerCore = 0.0; tsNumOfThreadsPerCore = 0.0;
ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, jport); ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, jport);
...@@ -242,7 +259,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn ...@@ -242,7 +259,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
} }
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(JNIEnv *env, jobject jobj, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(JNIEnv *env, jobject jobj,
jbyteArray jsql, jlong con) { jbyteArray jsql, jlong con) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
jniError("jobj:%p, connection is already closed", jobj); jniError("jobj:%p, connection is already closed", jobj);
...@@ -264,14 +281,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp( ...@@ -264,14 +281,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst); (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
//todo handle error // todo handle error
} }
jniTrace("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst); jniTrace("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst);
SSqlObj *pSql = taos_query(tscon, dst); SSqlObj *pSql = taos_query(tscon, dst);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql)); jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql));
} else { } else {
...@@ -283,9 +300,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp( ...@@ -283,9 +300,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
jniTrace("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); jniTrace("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
} }
} }
free(dst); free(dst);
return (jlong) pSql; return (jlong)pSql;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con) { JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con) {
...@@ -299,31 +316,32 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNI ...@@ -299,31 +316,32 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNI
} }
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) { JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) {
TAOS_RES *pSql = (TAOS_RES *) tres; TAOS_RES *pSql = (TAOS_RES *)tres;
return (*env)->NewStringUTF(env, (const char *)taos_errstr(pSql)); return (*env)->NewStringUTF(env, (const char *)taos_errstr(pSql));
} }
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) { JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj); jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL; return JNI_CONNECTION_NULL;
} }
if ((void *) tres == NULL) { if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon); jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL; return JNI_RESULT_SET_NULL;
} }
SSqlObj *pSql = (TAOS_RES *) tres; SSqlObj *pSql = (TAOS_RES *)tres;
STscObj* pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) { if (tscIsUpdateQuery(pSql)) {
taos_free_result(pSql); // free result here taos_free_result(pSql); // free result here
jniTrace("jobj:%p, conn:%p, no resultset, %p", jobj, pObj, (void*) tres); jniTrace("jobj:%p, conn:%p, no resultset, %p", jobj, pObj, (void *)tres);
return 0; return 0;
} else { } else {
jniTrace("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *) tres); jniTrace("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *)tres);
return tres; return tres;
} }
} }
...@@ -346,20 +364,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp( ...@@ -346,20 +364,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(
return JNI_SUCCESS; return JNI_SUCCESS;
} }
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong con, jlong res) { jlong res) {
TAOS *tscon = (TAOS *)con; TAOS *tscon = (TAOS *)con;
if (tscon == NULL) { if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj); jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL; return JNI_CONNECTION_NULL;
} }
if ((void *)res == NULL) { if ((void *)res == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon); jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL; return JNI_RESULT_SET_NULL;
} }
jint ret = taos_affected_rows((SSqlObj*) res); jint ret = taos_affected_rows((SSqlObj *)res);
jniTrace("jobj:%p, conn:%p, sql:%p, affect rows:%d", jobj, tscon, (void *)con, res, ret); jniTrace("jobj:%p, conn:%p, sql:%p, affect rows:%d", jobj, tscon, (void *)con, res, ret);
return ret; return ret;
...@@ -412,7 +430,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData ...@@ -412,7 +430,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
* @return * @return
*/ */
jstring jniFromNCharToByteArray(JNIEnv *env, char *nchar, int32_t maxBytes) { jstring jniFromNCharToByteArray(JNIEnv *env, char *nchar, int32_t maxBytes) {
int len = (int)strlen(nchar); int len = (int)strlen(nchar);
if (len > maxBytes) { // no terminated symbol exists '\0' if (len > maxBytes) { // no terminated symbol exists '\0'
len = maxBytes; len = maxBytes;
} }
...@@ -483,24 +501,22 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn ...@@ -483,24 +501,22 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
float fv = 0; float fv = 0;
fv = GET_FLOAT_VAL(row[i]); fv = GET_FLOAT_VAL(row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv);
} } break;
break;
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0; double dv = 0;
dv = GET_DOUBLE_VAL(row[i]); dv = GET_DOUBLE_VAL(row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv);
} } break;
break;
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist strncpy(tmp, row[i], (size_t)fields[i].bytes); // handle the case that terminated does not exist
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp)); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp));
memset(tmp, 0, (size_t) fields[i].bytes); memset(tmp, 0, (size_t)fields[i].bytes);
break; break;
} }
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i, (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes)); jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes));
break; break;
} }
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -528,7 +544,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm ...@@ -528,7 +544,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
} }
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con, JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
jboolean restart, jstring jtopic, jstring jsql, jint jinterval) { jboolean restart, jstring jtopic,
jstring jsql, jint jinterval) {
jlong sub = 0; jlong sub = 0;
TAOS *taos = (TAOS *)con; TAOS *taos = (TAOS *)con;
char *topic = NULL; char *topic = NULL;
...@@ -559,7 +576,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI ...@@ -559,7 +576,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
return sub; return sub;
} }
static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, int num_fields) { static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields); jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields);
jniTrace("created a rowdata object, rowobj:%p", rowobj); jniTrace("created a rowdata object, rowobj:%p", rowobj);
...@@ -588,25 +605,23 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in ...@@ -588,25 +605,23 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in
float fv = 0; float fv = 0;
fv = GET_FLOAT_VAL(row[i]); fv = GET_FLOAT_VAL(row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv);
} } break;
break; case TSDB_DATA_TYPE_DOUBLE: {
case TSDB_DATA_TYPE_DOUBLE:{
double dv = 0; double dv = 0;
dv = GET_DOUBLE_VAL(row[i]); dv = GET_DOUBLE_VAL(row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv);
} } break;
break;
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist strncpy(tmp, row[i], (size_t)fields[i].bytes); // handle the case that terminated does not exist
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp)); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp));
memset(tmp, 0, (size_t) fields[i].bytes); memset(tmp, 0, (size_t)fields[i].bytes);
break; break;
} }
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i, (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes)); jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes));
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i])); (*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]));
...@@ -618,7 +633,8 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in ...@@ -618,7 +633,8 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in
return rowobj; return rowobj;
} }
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) { JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub,
jint timeout) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub); jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniGetGlobalMethod(env); jniGetGlobalMethod(env);
...@@ -626,10 +642,10 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI ...@@ -626,10 +642,10 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp); jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp);
int64_t start = taosGetTimestampMs(); int64_t start = taosGetTimestampMs();
int count = 0; int count = 0;
while (true) { while (true) {
TAOS_RES * res = taos_consume(tsub); TAOS_RES *res = taos_consume(tsub);
if (res == NULL) { if (res == NULL) {
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
return NULL; return NULL;
...@@ -662,7 +678,8 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI ...@@ -662,7 +678,8 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
return rows; return rows;
} }
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) { JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub,
jboolean keepProgress) {
TAOS_SUB *tsub = (TAOS_SUB *)sub; TAOS_SUB *tsub = (TAOS_SUB *)sub;
taos_unsubscribe(tsub, keepProgress); taos_unsubscribe(tsub, keepProgress);
} }
...@@ -685,7 +702,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab ...@@ -685,7 +702,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab
char *dst = (char *)calloc(1, sizeof(char) * (len + 1)); char *dst = (char *)calloc(1, sizeof(char) * (len + 1));
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst); (*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
//todo handle error // todo handle error
} }
int code = taos_validate_sql(tscon, dst); int code = taos_validate_sql(tscon, dst);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册