From a4422e5e59903c8104ef55ebf8cad4b8ebbe8399 Mon Sep 17 00:00:00 2001 From: Zhiyu Yang <69311263+zyyang-taosdata@users.noreply.github.com> Date: Wed, 27 Oct 2021 11:00:09 +0800 Subject: [PATCH] [TD-10401]: support datax tdenginewriter migrate opentsdb data (#8411) * [TD-10401]: tdengine plugin based on datax * datax JNI plugin * remove getGlobalMethod in dataxPlugin method * change * change * change * taos_schemaless changed * add getErrMsgByCode for insertOpentsdbJson * remove useless print * change implements * fix Darwin compile error * [TD-10771]: fix invalid syntax error for python2 Co-authored-by: Huo Linhe --- packaging/release.sh | 2 +- ...libaba_datax_plugin_writer_JniConnection.h | 81 ++++++ src/client/jni/jniCommon.h | 87 +++++++ src/client/src/TSDBJNIConnector.c | 86 +------ src/client/src/dataxJniConnection.c | 232 ++++++++++++++++++ src/client/src/tscParseOpenTSDB.c | 2 - src/inc/tsdb.h | 3 - src/kit/taosdemo/taosdemo.c | 43 ++-- src/tsdb/src/tsdbMain.c | 13 +- src/vnode/src/vnodeWrite.c | 4 +- 10 files changed, 438 insertions(+), 115 deletions(-) create mode 100644 src/client/jni/com_alibaba_datax_plugin_writer_JniConnection.h create mode 100644 src/client/jni/jniCommon.h create mode 100644 src/client/src/dataxJniConnection.c diff --git a/packaging/release.sh b/packaging/release.sh index 705103a87a..a827c9ea46 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -213,7 +213,7 @@ else exit 1 fi -make -j8 +make cd ${curr_dir} diff --git a/src/client/jni/com_alibaba_datax_plugin_writer_JniConnection.h b/src/client/jni/com_alibaba_datax_plugin_writer_JniConnection.h new file mode 100644 index 0000000000..61f0e6eb9c --- /dev/null +++ b/src/client/jni/com_alibaba_datax_plugin_writer_JniConnection.h @@ -0,0 +1,81 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class com_alibaba_datax_plugin_writer_JniConnection */ + +#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection +#define _Included_com_alibaba_datax_plugin_writer_JniConnection +#ifdef __cplusplus +extern "C" { +#endif +#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER +#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL +#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL +#define com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL 0L +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: initImp + * Signature: (Ljava/lang/String;)V + */ +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp + (JNIEnv *, jclass, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: setOptions + * Signature: (ILjava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions + (JNIEnv *, jclass, jint, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: connectImp + * Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp + (JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrCodeImp + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: getErrMsgImp + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: freeResultSetImp + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_freeResultSetImp + (JNIEnv *, jobject, jlong, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: closeConnectionImp + * Signature: (J)I + */ +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp + (JNIEnv *, jobject, jlong); + +/* + * Class: com_alibaba_datax_plugin_writer_JniConnection + * Method: insertOpentsdbJson + * Signature: (Ljava/lang/String;J)J + */ +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson + (JNIEnv *, jobject, jstring, jlong); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/src/client/jni/jniCommon.h b/src/client/jni/jniCommon.h new file mode 100644 index 0000000000..78724eed31 --- /dev/null +++ b/src/client/jni/jniCommon.h @@ -0,0 +1,87 @@ +#include + +#ifndef TDENGINE_JNICOMMON_H +#define TDENGINE_JNICOMMON_H + +#define jniFatal(...) \ + { \ + if (jniDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniError(...) \ + { \ + if (jniDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniWarn(...) \ + { \ + if (jniDebugFlag & DEBUG_WARN) { \ + taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniInfo(...) \ + { \ + if (jniDebugFlag & DEBUG_INFO) { \ + taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniDebug(...) \ + { \ + if (jniDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ + } \ + } +#define jniTrace(...) \ + { \ + if (jniDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ + } \ + } + +extern jclass g_arrayListClass; +extern jmethodID g_arrayListConstructFp; +extern jmethodID g_arrayListAddFp; + +extern jclass g_metadataClass; +extern jmethodID g_metadataConstructFp; +extern jfieldID g_metadataColtypeField; +extern jfieldID g_metadataColnameField; +extern jfieldID g_metadataColsizeField; +extern jfieldID g_metadataColindexField; + +extern jclass g_rowdataClass; +extern jmethodID g_rowdataConstructor; +extern jmethodID g_rowdataClearFp; +extern jmethodID g_rowdataSetBooleanFp; +extern jmethodID g_rowdataSetByteFp; +extern jmethodID g_rowdataSetShortFp; +extern jmethodID g_rowdataSetIntFp; +extern jmethodID g_rowdataSetLongFp; +extern jmethodID g_rowdataSetFloatFp; +extern jmethodID g_rowdataSetDoubleFp; +extern jmethodID g_rowdataSetStringFp; +extern jmethodID g_rowdataSetTimestampFp; +extern jmethodID g_rowdataSetByteArrayFp; + +extern jmethodID g_blockdataSetByteArrayFp; +extern jmethodID g_blockdataSetNumOfRowsFp; +extern jmethodID g_blockdataSetNumOfColsFp; + +#define JNI_SUCCESS 0 +#define JNI_TDENGINE_ERROR -1 +#define JNI_CONNECTION_NULL -2 +#define JNI_RESULT_SET_NULL -3 +#define JNI_NUM_OF_FIELDS_0 -4 +#define JNI_SQL_NULL -5 +#define JNI_FETCH_END -6 +#define JNI_OUT_OF_MEMORY -7 + +extern JavaVM *g_vm; + +void jniGetGlobalMethod(JNIEnv *env); + +int32_t check_for_params(jobject jobj, jlong conn, jlong res); + +#endif // TDENGINE_JNICOMMON_H diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 50fe51e7da..ef7387f976 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -17,46 +17,9 @@ #include "taos.h" #include "tlog.h" #include "tscUtil.h" -#include "tscParseLine.h" #include "com_taosdata_jdbc_TSDBJNIConnector.h" - -#define jniFatal(...) \ - { \ - if (jniDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("JNI FATAL ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ - } \ - } -#define jniError(...) \ - { \ - if (jniDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("JNI ERROR ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ - } \ - } -#define jniWarn(...) \ - { \ - if (jniDebugFlag & DEBUG_WARN) { \ - taosPrintLog("JNI WARN ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ - } \ - } -#define jniInfo(...) \ - { \ - if (jniDebugFlag & DEBUG_INFO) { \ - taosPrintLog("JNI ", tscEmbedded ? 255 : jniDebugFlag, __VA_ARGS__); \ - } \ - } -#define jniDebug(...) \ - { \ - if (jniDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ - } \ - } -#define jniTrace(...) \ - { \ - if (jniDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("JNI ", jniDebugFlag, __VA_ARGS__); \ - } \ - } +#include "jniCommon.h" int __init = 0; @@ -91,16 +54,7 @@ jmethodID g_blockdataSetByteArrayFp; jmethodID g_blockdataSetNumOfRowsFp; jmethodID g_blockdataSetNumOfColsFp; -#define JNI_SUCCESS 0 -#define JNI_TDENGINE_ERROR -1 -#define JNI_CONNECTION_NULL -2 -#define JNI_RESULT_SET_NULL -3 -#define JNI_NUM_OF_FIELDS_0 -4 -#define JNI_SQL_NULL -5 -#define JNI_FETCH_END -6 -#define JNI_OUT_OF_MEMORY -7 - -static void jniGetGlobalMethod(JNIEnv *env) { +void jniGetGlobalMethod(JNIEnv *env) { // make sure init function executed once switch (atomic_val_compare_exchange_32(&__init, 0, 1)) { case 0: @@ -159,7 +113,7 @@ static void jniGetGlobalMethod(JNIEnv *env) { jniDebug("native method register finished"); } -static int32_t check_for_params(jobject jobj, jlong conn, jlong res) { +int32_t check_for_params(jobject jobj, jlong conn, jlong res) { if ((TAOS *)conn == NULL) { jniError("jobj:%p, connection is closed", jobj); return JNI_CONNECTION_NULL; @@ -219,26 +173,8 @@ JNIEXPORT jobject createTSDBException(JNIEnv *env, int code, char *msg) { return exception_obj; } -/* - * Class: com_taosdata_jdbc_TSDBJNIConnector - * Method: setConfigImp - * Signature: (Ljava/lang/String;)Lcom/taosdata/jdbc/TSDBException; - */ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setConfigImp(JNIEnv *env, jclass jobj, jstring config) { - /* - if (config == NULL) { - jniDebug("config value is null"); - return -1; - } - - const char *cfg = (*env)->GetStringUTFChars(env, config, NULL); - if (!cfg) { - return -1; - } - return 0; - */ - if (config == NULL) { char *msg = "config value is null"; jniDebug("config value is null"); @@ -254,7 +190,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setConfigImp(J setConfRet result = taos_set_config(cfg); int code = result.retCode; - char * msg = result.retMsg; + char *msg = result.retMsg; return createTSDBException(env, code, msg); } @@ -424,7 +360,7 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) { - TAOS * tscon = (TAOS *)con; + TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, tres); if (code != JNI_SUCCESS) { return code; @@ -467,7 +403,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp( JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con, jlong res) { - TAOS * tscon = (TAOS *)con; + TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { return code; @@ -483,13 +419,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsIm JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp(JNIEnv *env, jobject jobj, jlong con, jlong res, jobject arrayListObj) { - TAOS * tscon = (TAOS *)con; + TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { return code; } - TAOS_RES * tres = (TAOS_RES *)res; + TAOS_RES *tres = (TAOS_RES *)res; TAOS_FIELD *fields = taos_fetch_fields(tres); int32_t num_fields = taos_num_fields(tres); @@ -626,13 +562,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con, jlong res, jobject rowobj) { - TAOS * tscon = (TAOS *)con; + TAOS *tscon = (TAOS *)con; int32_t code = check_for_params(jobj, con, res); if (code != JNI_SUCCESS) { return code; } - TAOS_RES * tres = (TAOS_RES *)res; + TAOS_RES *tres = (TAOS_RES *)res; TAOS_FIELD *fields = taos_fetch_fields(tres); int32_t numOfFields = taos_num_fields(tres); @@ -1021,7 +957,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI } const char *name = (*env)->GetStringUTFChars(env, tableName, NULL); - char * curTags = tagsData; + char *curTags = tagsData; TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND)); for (int32_t i = 0; i < numOfTags; ++i) { diff --git a/src/client/src/dataxJniConnection.c b/src/client/src/dataxJniConnection.c new file mode 100644 index 0000000000..beaa61dac1 --- /dev/null +++ b/src/client/src/dataxJniConnection.c @@ -0,0 +1,232 @@ +#include "os.h" +#include "taos.h" +#include "tlog.h" +#include "tscUtil.h" + +#include "com_alibaba_datax_plugin_writer_JniConnection.h" +#include "jniCommon.h" + +jclass g_arrayListClass; +jmethodID g_arrayListConstructFp; +jmethodID g_arrayListAddFp; + +jclass g_metadataClass; +jmethodID g_metadataConstructFp; +jfieldID g_metadataColtypeField; +jfieldID g_metadataColnameField; +jfieldID g_metadataColsizeField; +jfieldID g_metadataColindexField; + +jclass g_rowdataClass; +jmethodID g_rowdataConstructor; +jmethodID g_rowdataClearFp; +jmethodID g_rowdataSetBooleanFp; +jmethodID g_rowdataSetByteFp; +jmethodID g_rowdataSetShortFp; +jmethodID g_rowdataSetIntFp; +jmethodID g_rowdataSetLongFp; +jmethodID g_rowdataSetFloatFp; +jmethodID g_rowdataSetDoubleFp; +jmethodID g_rowdataSetStringFp; +jmethodID g_rowdataSetTimestampFp; +jmethodID g_rowdataSetByteArrayFp; + +jmethodID g_blockdataSetByteArrayFp; +jmethodID g_blockdataSetNumOfRowsFp; +jmethodID g_blockdataSetNumOfColsFp; + +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp(JNIEnv *env, jobject jobj, + jstring jconfigDir) { + if (jconfigDir != NULL) { + const char *confDir = (*env)->GetStringUTFChars(env, jconfigDir, NULL); + if (confDir && strlen(confDir) != 0) { + tstrncpy(configDir, confDir, TSDB_FILENAME_LEN); + } + (*env)->ReleaseStringUTFChars(env, jconfigDir, confDir); + } + + jniDebug("jni initialized successfully, config directory: %s", configDir); +} + +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions(JNIEnv *env, jobject jobj, + jint optionIndex, + jstring optionValue) { + if (optionValue == NULL) { + jniDebug("option index:%d value is null", (int32_t)optionIndex); + return 0; + } + + int res = 0; + + if (optionIndex == TSDB_OPTION_LOCALE) { + const char *locale = (*env)->GetStringUTFChars(env, optionValue, NULL); + if (locale && strlen(locale) != 0) { + res = taos_options(TSDB_OPTION_LOCALE, locale); + jniDebug("set locale to %s, result:%d", locale, res); + } else { + jniDebug("input locale is empty"); + } + (*env)->ReleaseStringUTFChars(env, optionValue, locale); + } else if (optionIndex == TSDB_OPTION_CHARSET) { + const char *charset = (*env)->GetStringUTFChars(env, optionValue, NULL); + if (charset && strlen(charset) != 0) { + res = taos_options(TSDB_OPTION_CHARSET, charset); + jniDebug("set character encoding to %s, result:%d", charset, res); + } else { + jniDebug("input character encoding is empty"); + } + (*env)->ReleaseStringUTFChars(env, optionValue, charset); + } else if (optionIndex == TSDB_OPTION_TIMEZONE) { + const char *tz1 = (*env)->GetStringUTFChars(env, optionValue, NULL); + if (tz1 && strlen(tz1) != 0) { + res = taos_options(TSDB_OPTION_TIMEZONE, tz1); + jniDebug("set timezone to %s, result:%d", tz1, res); + } else { + jniDebug("input timezone is empty"); + } + (*env)->ReleaseStringUTFChars(env, optionValue, tz1); + } else { + jniError("option index:%d is not found", (int32_t)optionIndex); + } + + return res; +} + +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp(JNIEnv *env, jobject jobj, + jstring jhost, jint jport, + jstring jdbName, jstring juser, + jstring jpass) { + jlong ret = 0; + const char *host = NULL; + const char *user = NULL; + const char *pass = NULL; + const char *dbname = NULL; + + if (jhost != NULL) { + host = (*env)->GetStringUTFChars(env, jhost, NULL); + } + + if (jdbName != NULL) { + dbname = (*env)->GetStringUTFChars(env, jdbName, NULL); + } + + if (juser != NULL) { + user = (*env)->GetStringUTFChars(env, juser, NULL); + } + + if (jpass != NULL) { + pass = (*env)->GetStringUTFChars(env, jpass, NULL); + } + + if (user == NULL) { + jniDebug("jobj:%p, user not specified, use default user %s", jobj, TSDB_DEFAULT_USER); + } + + if (pass == NULL) { + jniDebug("jobj:%p, pass not specified, use default password", jobj); + } + + ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport); + if (ret == 0) { + jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret, + (char *)host, (char *)user, (char *)dbname, (int32_t)jport); + } else { + jniDebug("jobj:%p, conn:%p, connect to database succeed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret, + (char *)host, (char *)user, (char *)dbname, (int32_t)jport); + } + + if (host != NULL) { + (*env)->ReleaseStringUTFChars(env, jhost, host); + } + + if (dbname != NULL) { + (*env)->ReleaseStringUTFChars(env, jdbName, dbname); + } + + if (user != NULL) { + (*env)->ReleaseStringUTFChars(env, juser, user); + } + + if (pass != NULL) { + (*env)->ReleaseStringUTFChars(env, jpass, pass); + } + + return ret; +} + +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp(JNIEnv *env, jobject jobj, + jlong con, jlong tres) { + int32_t code = check_for_params(jobj, con, tres); + if (code != JNI_SUCCESS) { + return code; + } + + return (jint)taos_errno((TAOS_RES *)tres); +} + +JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp(JNIEnv *env, jobject jobj, + jlong tres) { + TAOS_RES *pSql = (TAOS_RES *)tres; + return (*env)->NewStringUTF(env, (const char *)taos_errstr(pSql)); +} + +JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_freeResultSetImp(JNIEnv *env, jobject jobj, + jlong con, jlong res) { + if ((TAOS *)con == NULL) { + jniError("jobj:%p, connection is closed", jobj); + } + if ((TAOS_RES *)res == NULL) { + jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS *)con); + } + taos_free_result((TAOS_RES *)res); + jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS *)con, (void *)res); +} + +JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp(JNIEnv *env, jobject jobj, + jlong con) { + TAOS *tscon = (TAOS *)con; + if (tscon == NULL) { + jniError("jobj:%p, connection is already closed", jobj); + return JNI_CONNECTION_NULL; + } else { + jniDebug("jobj:%p, conn:%p, close connection success", jobj, tscon); + taos_close(tscon); + return JNI_SUCCESS; + } +} + +JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson(JNIEnv *env, jobject jobj, + jstring json, jlong con) { + // check connection + TAOS *conn = (TAOS *)con; + if (conn == NULL) { + jniError("jobj:%p, connection already closed", jobj); + return JNI_CONNECTION_NULL; + } + // java.lang.String -> char * + char *payload = NULL; + if (json != NULL) { + payload = (char *)(*env)->GetStringUTFChars(env, json, NULL); + } + // check payload + if (payload == NULL) { + jniDebug("jobj:%p, invalid argument: opentsdb insert json is NULL", jobj); + return JNI_SQL_NULL; + } + // schemaless insert + char *payload_arr[1]; + payload_arr[0] = payload; + TAOS_RES *result; + result = taos_schemaless_insert(conn, payload_arr, 0, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED); + + int code = taos_errno(result); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, conn, tstrerror(code), taos_errstr(result)); + } else { + int32_t affectRows = taos_affected_rows(result); + jniDebug("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, conn, tstrerror(code), affectRows); + } + + (*env)->ReleaseStringUTFChars(env, json, payload); + return (jlong)result; +} \ No newline at end of file diff --git a/src/client/src/tscParseOpenTSDB.c b/src/client/src/tscParseOpenTSDB.c index decef48878..e06f6df5f7 100644 --- a/src/client/src/tscParseOpenTSDB.c +++ b/src/client/src/tscParseOpenTSDB.c @@ -888,7 +888,6 @@ static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, if (tags == NULL || tags->type != cJSON_Object) { return TSDB_CODE_TSC_INVALID_JSON; } - //only pick up the first ID value as child table name cJSON *id = cJSON_GetObjectItem(tags, "ID"); if (id != NULL) { @@ -912,7 +911,6 @@ static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, return TSDB_CODE_TSC_DUP_TAG_NAMES; } } - int32_t tagNum = cJSON_GetArraySize(tags); //at least one tag pair required if (tagNum <= 0) { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index ad7eaef8cb..9d82245c21 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -421,9 +421,6 @@ bool tsdbNoProblem(STsdbRepo* pRepo); // unit of walSize: MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); -// not commit if other instances in committing state or waiting to commit -bool tsdbIsNeedCommit(STsdbRepo *pRepo); - #ifdef __cplusplus } #endif diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 761593f676..e9806a24f2 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -77,6 +77,7 @@ extern char configDir[]; #define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space #define OPT_ABORT 1 /* –abort */ #define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255. +#define MAX_PATH_LEN 4096 #define DEFAULT_START_TIME 1500000000000 @@ -511,7 +512,7 @@ typedef struct SThreadInfo_S { int threadID; char db_name[TSDB_DB_NAME_LEN]; uint32_t time_precision; - char filePath[TSDB_FILENAME_LEN]; + char filePath[MAX_PATH_LEN]; FILE *fp; char tb_prefix[TSDB_TABLE_NAME_LEN]; uint64_t start_table_from; @@ -3574,7 +3575,7 @@ static int postProceSql(char *host, uint16_t port, "%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", __func__, __LINE__, received, resp_len, response_buf); break; - } + } } } while(received < resp_len); @@ -4380,7 +4381,7 @@ static int createSuperTable( superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow; - + snprintf(command, BUFFER_SIZE, superTbl->escapeChar ? "CREATE TABLE IF NOT EXISTS %s.`%s` (ts TIMESTAMP%s) TAGS %s": @@ -4515,7 +4516,7 @@ int createDatabasesAndStables(char *command) { if (g_Dbs.db[i].superTbls[j].iface == SML_IFACE) { goto skip; } - + sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].stbName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -4575,7 +4576,7 @@ static void* createTable(void *sarg) i <= pThreadInfo->end_table_to; i++) { if (0 == g_Dbs.use_metric) { snprintf(pThreadInfo->buffer, buff_len, - g_args.escapeChar ? + g_args.escapeChar ? "CREATE TABLE IF NOT EXISTS %s.`%s%"PRIu64"` %s;" : "CREATE TABLE IF NOT EXISTS %s.%s%"PRIu64" %s;", pThreadInfo->db_name, @@ -6604,7 +6605,7 @@ static int getRowDataFromSample( stbInfo->sampleDataBuf + stbInfo->lenOfOneRow * (*sampleUsePos)); } - + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; @@ -7139,7 +7140,7 @@ static void getTableName(char *pTblName, if (stbInfo) { if (AUTO_CREATE_SUBTBL != stbInfo->autoCreateTable) { if (stbInfo->childTblLimit > 0) { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, + snprintf(pTblName, TSDB_TABLE_NAME_LEN, stbInfo->escapeChar ? "`%s`" : "%s", stbInfo->childTblName + (tableSeq - stbInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); @@ -7152,12 +7153,12 @@ static void getTableName(char *pTblName, stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); } } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, + snprintf(pTblName, TSDB_TABLE_NAME_LEN, stbInfo->escapeChar ? "`%s%"PRIu64"`" : "%s%"PRIu64"", stbInfo->childTblPrefix, tableSeq); } } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, + snprintf(pTblName, TSDB_TABLE_NAME_LEN, g_args.escapeChar ? "`%s%"PRIu64"`" : "%s%"PRIu64"", g_args.tb_prefix, tableSeq); } @@ -9713,7 +9714,7 @@ static void generateSmlHead(char* smlHead, SSuperTable* stbInfo, threadInfo* pTh } } -static void generateSmlTail(char* line, char* smlHead, SSuperTable* stbInfo, +static void generateSmlTail(char* line, char* smlHead, SSuperTable* stbInfo, threadInfo* pThreadInfo, int64_t timestamp) { int dataLen = 0; dataLen = snprintf(line, BUFFER_SIZE, "%s ", smlHead); @@ -9860,7 +9861,7 @@ static void* syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRo } else { batchPerTblTimes = 1; } - + char *smlHead[pThreadInfo->ntables]; for (int t = 0; t < pThreadInfo->ntables; t++) { smlHead[t] = (char *)calloc(HEAD_BUFF_LEN, 1); @@ -9869,7 +9870,7 @@ static void* syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRo exit(EXIT_FAILURE); } generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t); - + } pThreadInfo->totalInsertRows = 0; @@ -9895,11 +9896,11 @@ static void* syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRo pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *)); if (NULL == pThreadInfo->lines) { errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n", - g_args.reqPerReq * sizeof(char *), + g_args.reqPerReq * (uint64_t)sizeof(char *), strerror(errno)); return NULL; } - + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); @@ -10470,7 +10471,7 @@ static void* syncWriteProgressiveSml(threadInfo *pThreadInfo) { exit(EXIT_FAILURE); } generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t); - + } int currentPercent = 0; int percentComplete = 0; @@ -10481,14 +10482,14 @@ static void* syncWriteProgressiveSml(threadInfo *pThreadInfo) { pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *)); if (NULL == pThreadInfo->lines) { errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n", - g_args.reqPerReq * sizeof(char *), + g_args.reqPerReq * (uint64_t)sizeof(char *), strerror(errno)); return NULL; } - + for (uint64_t i = 0; i < pThreadInfo->ntables; i++) { int64_t timestamp = pThreadInfo->start_time; - + for (uint64_t j = 0; j < insertRows;) { for (int k = 0; k < g_args.reqPerReq; k++) { pThreadInfo->lines[k] = calloc(BUFFER_SIZE, 1); @@ -10956,7 +10957,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, int64_t ntables = 0; uint64_t tableFrom; - + if (stbInfo) { if (stbInfo->iface != SML_IFACE) { int64_t limit; @@ -11198,7 +11199,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); } */ - + if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { #ifdef WINDOWS WSADATA wsaData; @@ -11223,7 +11224,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } pThreadInfo->sockfd = sockfd; } - + tsem_init(&(pThreadInfo->lock_sem), 0, 0); if (ASYNC_MODE == g_Dbs.asyncMode) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 23835d659d..2dea0de055 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -185,19 +185,10 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } -bool tsdbIsNeedCommit(STsdbRepo *pRepo) { - int nVal = 0; - if (sem_getvalue(&pRepo->readyToCommit, &nVal) != 0) { - tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo)); - return false; - } - return nVal > 0; -} - int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB STsdbCfg *pCfg = &(pRepo->config); if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { - if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1; + if (tsdbAsyncCommit(pRepo) < 0) return -1; } return 0; } @@ -211,7 +202,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { if ((pRepo->mem->extraBuffList != NULL) || ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { // trigger commit - if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1; + if (tsdbAsyncCommit(pRepo) < 0) return -1; } return 0; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 7f7d37a255..a0c772f540 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -169,7 +169,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR } static int32_t vnodeCheckWal(SVnodeObj *pVnode) { - if (tsdbIsNeedCommit(pVnode->tsdb)) { + if (pVnode->isCommiting == 0) { return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); } return 0; @@ -189,7 +189,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ASSERT(code != 0); } - if (((++pVnode->tblMsgVer) & 16383) == 0) { // lazy check + if (((++pVnode->tblMsgVer) & 32767) == 0) { // lazy check vnodeCheckWal(pVnode); } -- GitLab