From 592f210983d592e8d414df19abf8b5b0649462e3 Mon Sep 17 00:00:00 2001 From: localvar Date: Sat, 4 Jan 2020 18:33:10 +0800 Subject: [PATCH] update java connector --- .../jni/com_taosdata_jdbc_TSDBJNIConnector.h | 6 +- src/client/src/TSDBJNIConnector.c | 135 +++++++++--------- src/client/src/tscSub.c | 20 ++- src/inc/taos.h | 2 +- tests/examples/c/subscribe.c | 4 +- 5 files changed, 87 insertions(+), 80 deletions(-) diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h index 958252b4de..8dbe63d75a 100644 --- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h +++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h @@ -135,7 +135,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm * Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;JI)J */ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp - (JNIEnv *, jobject, jstring, jstring, jstring, jstring, jstring, jlong, jint); + (JNIEnv *, jobject, jlong, jboolean, jstring, jstring, jint); /* * Class: com_taosdata_jdbc_TSDBJNIConnector @@ -143,7 +143,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp * Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData; */ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp - (JNIEnv *, jobject, jlong); + (JNIEnv *, jobject, jlong, jint); /* * Class: com_taosdata_jdbc_TSDBJNIConnector @@ -151,7 +151,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp * Signature: (J)V */ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp - (JNIEnv *, jobject, jlong); + (JNIEnv *, jobject, jlong, jboolean); /* * Class: com_taosdata_jdbc_TSDBJNIConnector diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 9cec4f4b0f..228403c79d 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -20,6 +20,7 @@ #include "tscJoinProcess.h" #include "tsclient.h" #include "tscUtil.h" +#include "ttime.h" int __init = 0; @@ -514,92 +515,42 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm } } -JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jstring jhost, - jstring juser, jstring jpass, jstring jdb, - jstring jtable, jlong jtime, - jint jperiod) { - TAOS_SUB *tsub; - jlong sub = 0; - char * host = NULL; - char * user = NULL; - char * pass = NULL; - char * db = NULL; - char * table = NULL; - int64_t time = 0; - int period = 0; +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con, + jboolean restart, jstring jtopic, jstring jsql, jint jinterval) { + jlong sub = 0; + TAOS *taos = (TAOS *)con; + char *topic = NULL; + char *sql = NULL; jniGetGlobalMethod(env); jniTrace("jobj:%p, in TSDBJNIConnector_subscribeImp", jobj); - if (jhost != NULL) { - host = (char *)(*env)->GetStringUTFChars(env, jhost, NULL); - } - if (juser != NULL) { - user = (char *)(*env)->GetStringUTFChars(env, juser, NULL); - } - if (jpass != NULL) { - pass = (char *)(*env)->GetStringUTFChars(env, jpass, NULL); - } - if (jdb != NULL) { - db = (char *)(*env)->GetStringUTFChars(env, jdb, NULL); - } - if (jtable != NULL) { - table = (char *)(*env)->GetStringUTFChars(env, jtable, NULL); + if (jtopic != NULL) { + topic = (char *)(*env)->GetStringUTFChars(env, jtopic, NULL); } - time = (int64_t)jtime; - period = (int)jperiod; - - if (user == NULL) { - jniTrace("jobj:%p, user is null, use tsDefaultUser", jobj); - user = tsDefaultUser; - } - if (pass == NULL) { - jniTrace("jobj:%p, pass is null, use tsDefaultPass", jobj); - pass = tsDefaultPass; + if (jsql != NULL) { + sql = (char *)(*env)->GetStringUTFChars(env, jsql, NULL); } - jniTrace("jobj:%p, host:%s, user:%s, pass:%s, db:%s, table:%s, time:%d, period:%d", jobj, host, user, pass, db, table, - time, period); - tsub = taos_subscribe(host, user, pass, db, table, time, period); + TAOS_SUB *tsub = taos_subscribe(taos, (int)restart, topic, sql, NULL, NULL, jinterval); sub = (jlong)tsub; if (sub == 0) { - jniTrace("jobj:%p, failed to subscribe to db:%s, table:%s", jobj, db, table); + jniTrace("jobj:%p, failed to subscribe: topic:%s", jobj, jtopic); } else { - jniTrace("jobj:%p, successfully subscribe to db:%s, table:%s, sub:%ld, tsub:%p", jobj, db, table, sub, tsub); + jniTrace("jobj:%p, successfully subscribe: topic: %s", jobj, jtopic); } - if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host); - if (user != NULL && user != tsDefaultUser) (*env)->ReleaseStringUTFChars(env, juser, user); - if (pass != NULL && pass != tsDefaultPass) (*env)->ReleaseStringUTFChars(env, jpass, pass); - if (db != NULL) (*env)->ReleaseStringUTFChars(env, jdb, db); - if (table != NULL) (*env)->ReleaseStringUTFChars(env, jtable, table); + if (topic != NULL) (*env)->ReleaseStringUTFChars(env, jtopic, topic); + if (sql != NULL) (*env)->ReleaseStringUTFChars(env, jsql, sql); return sub; } -JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) { - jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub); - - TAOS_SUB * tsub = (TAOS_SUB *)sub; - TAOS_ROW row = taos_consume(tsub); - TAOS_FIELD *fields = taos_fetch_subfields(tsub); - int num_fields = taos_subfields_count(tsub); - - jniGetGlobalMethod(env); - - jniTrace("jobj:%p, check fields:%p, num_fields=%d", jobj, fields, num_fields); - +static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, int num_fields) { jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields); jniTrace("created a rowdata object, rowobj:%p", rowobj); - if (row == NULL) { - jniTrace("jobj:%p, tsub:%p, fields size is %d, fetch row to the end", jobj, tsub, num_fields); - return NULL; - } - - char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; - for (int i = 0; i < num_fields; i++) { if (row[i] == NULL) { continue; @@ -634,6 +585,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI } break; case TSDB_DATA_TYPE_BINARY: { + char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp)); @@ -642,7 +594,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI } case TSDB_DATA_TYPE_NCHAR: (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i, - jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes)); + jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes)); break; case TSDB_DATA_TYPE_TIMESTAMP: (*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i])); @@ -651,13 +603,56 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI break; } } - jniTrace("jobj:%p, rowdata retrieved, rowobj:%p", jobj, rowobj); return rowobj; } -JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub) { +JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) { + jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub); + jniGetGlobalMethod(env); + + TAOS_SUB *tsub = (TAOS_SUB *)sub; + jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp); + + int64_t start = taosGetTimestampMs(); + int count = 0; + + while (true) { + TAOS_RES * res = taos_consume(tsub); + if (res == NULL) { + jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); + return NULL; + } + + TAOS_FIELD *fields = taos_fetch_fields(res); + int num_fields = taos_num_fields(res); + while (true) { + TAOS_ROW row = taos_fetch_row(res); + if (row == NULL) { + break; + } + jobject rowobj = convert_one_row(env, row, fields, num_fields); + (*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj); + count++; + } + + if (count > 0) { + break; + } + if (timeout == -1) { + continue; + } + if (((int)(taosGetTimestampMs() - start)) >= timeout) { + jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub); + break; + } + } + + return rows; +} + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) { TAOS_SUB *tsub = (TAOS_SUB *)sub; - taos_unsubscribe(tsub); + taos_unsubscribe(tsub, keepProgress); } JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp(JNIEnv *env, jobject jobj, diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index b2c332deff..8293378abf 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -35,6 +35,7 @@ typedef struct SSubscriptionProgress { typedef struct SSub { char topic[32]; int64_t lastSyncTime; + int64_t lastConsumeTime; void * signature; TAOS * taos; void * pTimer; @@ -128,6 +129,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* pSub->pSql = pSql; pSub->signature = pSub; strncpy(pSub->topic, topic, sizeof(pSub->topic)); + pSub->topic[sizeof(pSub->topic) - 1] = 0; return pSub; failed: @@ -294,7 +296,7 @@ void tscSaveSubscriptionProgress(void* sub) { fclose(fp); } -TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { +TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { STscObj* pObj = (STscObj*)taos; if (pObj == NULL || pObj->signature != pObj) { globalCode = TSDB_CODE_DISCONNECTED; @@ -319,11 +321,11 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char return NULL; } + pSub->interval = interval; if (fp != NULL) { pSub->fp = fp; - pSub->interval = interval; pSub->param = param; - taosTmrReset(tscProcessSubscriptionTimer, 0, pSub, tscTmr, &pSub->pTimer); + taosTmrReset(tscProcessSubscriptionTimer, interval, pSub, tscTmr, &pSub->pTimer); } return pSub; @@ -338,7 +340,14 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSqlObj* pSql = pSub->pSql; SSqlRes *pRes = &pSql->res; - if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 1000) { + if (pSub->pTimer == NULL) { + int duration = (int)(taosGetTimestampMs() - pSub->lastConsumeTime); + if (duration < pSub->interval) { + taosMsleep(pSub->interval - (int32_t)duration); + } + } + + if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { char* sqlstr = pSql->sqlstr; pSql->sqlstr = NULL; taos_free_result_imp(pSql, 0); @@ -356,11 +365,14 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->cmd.type = type; } + tscDoQuery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { tscRemoveFromSqlList(pSql); return NULL; } + + pSub->lastConsumeTime = taosGetTimestampMs(); return pSql; } diff --git a/src/inc/taos.h b/src/inc/taos.h index 3574ec5b30..d9db79fbcb 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -117,7 +117,7 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RE DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); -DLL_EXPORT TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); +DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index bcec3e61ec..80efa71dc2 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -75,9 +75,9 @@ int main(int argc, char *argv[]) { } if (async) { - tsub = taos_subscribe(topic, restart, taos, sql, subscribe_callback, NULL, 1000); + tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000); } else { - tsub = taos_subscribe(topic, restart, taos, sql, NULL, NULL, 0); + tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0); } if (tsub == NULL) { -- GitLab