From c41d23daaa68aed0a00a2eca6d6a6814eb4b49f7 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 21 Jul 2021 18:55:10 +0800 Subject: [PATCH] copy from develop branch. stmt: 0, nanosec: 0 (#6955) * copy from develop branch. stmt: 0, nanosec: 0 * set thread name disabled on master branch. --- src/kit/taosdemo/taosdemo.c | 1927 ++++++++++++++++++----------------- 1 file changed, 1020 insertions(+), 907 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9b1c4ba444..de894434ad 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -55,6 +55,11 @@ #define STMT_IFACE_ENABLED 0 #define NANO_SECOND_ENABLED 0 +#define SET_THREADNAME_ENABLED 0 + +#if SET_THREADNAME_ENABLED == 0 +#define setThreadName(name) +#endif #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 @@ -90,7 +95,7 @@ extern char configDir[]; #define MAX_SUPER_TABLE_COUNT 200 #define MAX_QUERY_SQL_COUNT 100 -#define MAX_QUERY_SQL_LENGTH 1024 +#define MAX_QUERY_SQL_LENGTH BUFFER_SIZE #define MAX_DATABASE_COUNT 256 #define INPUT_BUF_LEN 256 @@ -2367,7 +2372,25 @@ static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { return dataBuf; } -static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { +static char *generateBinaryNCharTagValues(int64_t tableSeq, uint32_t len) +{ + char* buf = (char*)calloc(len, 1); + if (NULL == buf) { + printf("calloc failed! size:%d\n", len); + return NULL; + } + + if (tableSeq % 2) { + tstrncpy(buf, "beijing", len); + } else { + tstrncpy(buf, "shanghai", len); + } + //rand_string(buf, stbInfo->tags[i].dataLen); + + return buf; +} + +static char* generateTagValuesForStb(SSuperTable* stbInfo, int64_t tableSeq) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); @@ -2388,20 +2411,12 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { return NULL; } - int tagBufLen = stbInfo->tags[i].dataLen + 1; - char* buf = (char*)calloc(tagBufLen, 1); + int32_t tagBufLen = stbInfo->tags[i].dataLen + 1; + char *buf = generateBinaryNCharTagValues(tableSeq, tagBufLen); if (NULL == buf) { - printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen); tmfree(dataBuf); return NULL; } - - if (tableSeq % 2) { - tstrncpy(buf, "beijing", tagBufLen); - } else { - tstrncpy(buf, "shanghai", tagBufLen); - } - //rand_string(buf, stbInfo->tags[i].dataLen); dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "\'%s\',", buf); tmfree(buf); @@ -2410,11 +2425,11 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { if ((g_args.demo_mode) && (i == 0)) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d,", tableSeq % 10); + "%"PRId64",", tableSeq % 10); } else { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, - "%d,", tableSeq); + "%"PRId64",", tableSeq); } } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", strlen("bigint"))) { @@ -2445,7 +2460,7 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64",", rand_bigint()); } else { - printf("No support data type: %s\n", stbInfo->tags[i].dataType); + errorPrint("No support data type: %s\n", stbInfo->tags[i].dataType); tmfree(dataBuf); return NULL; } @@ -2734,7 +2749,7 @@ static int createSuperTable( } else if (strcasecmp(dataType, "INT") == 0) { if ((g_args.demo_mode) && (colIndex == 1)) { len += snprintf(cols + len, COL_BUFFER_LEN - len, - ",VOLTAGE INT"); + ", VOLTAGE INT"); } else { len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT"); } @@ -2756,9 +2771,9 @@ static int createSuperTable( } else if (strcasecmp(dataType, "FLOAT") == 0) { if (g_args.demo_mode) { if (colIndex == 0) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ",CURRENT FLOAT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ", CURRENT FLOAT"); } else if (colIndex == 2) { - len += snprintf(cols + len, COL_BUFFER_LEN - len, ",PHASE FLOAT"); + len += snprintf(cols + len, COL_BUFFER_LEN - len, ", PHASE FLOAT"); } } else { len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "FLOAT"); @@ -3025,10 +3040,11 @@ static void* createTable(void *sarg) threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("createTable"); + uint64_t lastPrintTime = taosGetTimestampMs(); - int buff_len; - buff_len = BUFFER_SIZE; + int buff_len = BUFFER_SIZE; pThreadInfo->buffer = calloc(buff_len, 1); if (pThreadInfo->buffer == NULL) { @@ -4275,577 +4291,577 @@ PARSE_OVER: } static bool getMetaFromQueryJsonFile(cJSON* root) { - bool ret = false; - - cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir"); - if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) { - tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN); - } - - cJSON* host = cJSON_GetObjectItem(root, "host"); - if (host && host->type == cJSON_String && host->valuestring != NULL) { - tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE); - } else if (!host) { - tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE); - } else { - printf("ERROR: failed to read json, host not found\n"); - goto PARSE_OVER; - } - - cJSON* port = cJSON_GetObjectItem(root, "port"); - if (port && port->type == cJSON_Number) { - g_queryInfo.port = port->valueint; - } else if (!port) { - g_queryInfo.port = 6030; - } - - cJSON* user = cJSON_GetObjectItem(root, "user"); - if (user && user->type == cJSON_String && user->valuestring != NULL) { - tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE); - } else if (!user) { - tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ; - } - - cJSON* password = cJSON_GetObjectItem(root, "password"); - if (password && password->type == cJSON_String && password->valuestring != NULL) { - tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE); - } else if (!password) { - tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);; - } - - cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, - if (answerPrompt && answerPrompt->type == cJSON_String - && answerPrompt->valuestring != NULL) { - if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { - g_args.answer_yes = false; - } else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) { - g_args.answer_yes = true; - } else { - g_args.answer_yes = false; - } - } else if (!answerPrompt) { - g_args.answer_yes = false; - } else { - printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); - goto PARSE_OVER; - } - - cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times"); - if (gQueryTimes && gQueryTimes->type == cJSON_Number) { - if (gQueryTimes->valueint <= 0) { - errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", - __func__, __LINE__); - goto PARSE_OVER; - } - g_args.query_times = gQueryTimes->valueint; - } else if (!gQueryTimes) { - g_args.query_times = 1; - } else { - errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", - __func__, __LINE__); - goto PARSE_OVER; - } - - cJSON* dbs = cJSON_GetObjectItem(root, "databases"); - if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) { - tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN); - } else if (!dbs) { - printf("ERROR: failed to read json, databases not found\n"); - goto PARSE_OVER; - } - - cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode"); - if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) { - tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE); - } else if (!queryMode) { - tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE); - } else { - printf("ERROR: failed to read json, query_mode not found\n"); - goto PARSE_OVER; - } + bool ret = false; - // specified_table_query - cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query"); - if (!specifiedQuery) { - g_queryInfo.specifiedQueryInfo.concurrent = 1; - g_queryInfo.specifiedQueryInfo.sqlCount = 0; - } else if (specifiedQuery->type != cJSON_Object) { - printf("ERROR: failed to read json, super_table_query not found\n"); - goto PARSE_OVER; - } else { - cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval"); - if (queryInterval && queryInterval->type == cJSON_Number) { - g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint; - } else if (!queryInterval) { - g_queryInfo.specifiedQueryInfo.queryInterval = 0; + cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir"); + if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) { + tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN); } - cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, - "query_times"); - if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { - if (specifiedQueryTimes->valueint <= 0) { - errorPrint( - "%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", - __func__, __LINE__, specifiedQueryTimes->valueint); - goto PARSE_OVER; - - } - g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; - } else if (!specifiedQueryTimes) { - g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; + cJSON* host = cJSON_GetObjectItem(root, "host"); + if (host && host->type == cJSON_String && host->valuestring != NULL) { + tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE); + } else if (!host) { + tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE); } else { - errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", - __func__, __LINE__); - goto PARSE_OVER; - } - - cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); - if (concurrent && concurrent->type == cJSON_Number) { - if (concurrent->valueint <= 0) { - errorPrint( - "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", - __func__, __LINE__, - g_queryInfo.specifiedQueryInfo.sqlCount, - g_queryInfo.specifiedQueryInfo.concurrent); - goto PARSE_OVER; - } - g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint; - } else if (!concurrent) { - g_queryInfo.specifiedQueryInfo.concurrent = 1; - } - - cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode"); - if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String - && specifiedAsyncMode->valuestring != NULL) { - if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) { - g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE; - } else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) { - g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE; - } else { - errorPrint("%s() LN%d, failed to read json, async mode input error\n", - __func__, __LINE__); + printf("ERROR: failed to read json, host not found\n"); goto PARSE_OVER; - } - } else { - g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE; } - cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); - if (interval && interval->type == cJSON_Number) { - g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint; - } else if (!interval) { - //printf("failed to read json, subscribe interval no found\n"); - //goto PARSE_OVER; - g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000; + cJSON* port = cJSON_GetObjectItem(root, "port"); + if (port && port->type == cJSON_Number) { + g_queryInfo.port = port->valueint; + } else if (!port) { + g_queryInfo.port = 6030; } - cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); - if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { - if (0 == strcmp("yes", restart->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeRestart = true; - } else if (0 == strcmp("no", restart->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeRestart = false; - } else { - printf("ERROR: failed to read json, subscribe restart error\n"); - goto PARSE_OVER; - } - } else { - g_queryInfo.specifiedQueryInfo.subscribeRestart = true; + cJSON* user = cJSON_GetObjectItem(root, "user"); + if (user && user->type == cJSON_String && user->valuestring != NULL) { + tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE); + } else if (!user) { + tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ; } - cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); - if (keepProgress - && keepProgress->type == cJSON_String - && keepProgress->valuestring != NULL) { - if (0 == strcmp("yes", keepProgress->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1; - } else if (0 == strcmp("no", keepProgress->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; - } else { - printf("ERROR: failed to read json, subscribe keepProgress error\n"); - goto PARSE_OVER; - } - } else { - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; + cJSON* password = cJSON_GetObjectItem(root, "password"); + if (password && password->type == cJSON_String && password->valuestring != NULL) { + tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE); + } else if (!password) { + tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);; } - // sqls - cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); - if (!specifiedSqls) { - g_queryInfo.specifiedQueryInfo.sqlCount = 0; - } else if (specifiedSqls->type != cJSON_Array) { - errorPrint("%s() LN%d, failed to read json, super sqls not found\n", - __func__, __LINE__); - goto PARSE_OVER; - } else { - int superSqlSize = cJSON_GetArraySize(specifiedSqls); - if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent - > MAX_QUERY_SQL_COUNT) { - errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n", - __func__, __LINE__, - superSqlSize, - g_queryInfo.specifiedQueryInfo.concurrent, - MAX_QUERY_SQL_COUNT); - goto PARSE_OVER; - } - - g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize; - for (int j = 0; j < superSqlSize; ++j) { - cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j); - if (sql == NULL) continue; - - cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); - if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { - printf("ERROR: failed to read json, sql not found\n"); - goto PARSE_OVER; - } - tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], - sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); - - // default value is -1, which mean infinite loop - g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; - cJSON* endAfterConsume = - cJSON_GetObjectItem(specifiedQuery, "endAfterConsume"); - if (endAfterConsume - && endAfterConsume->type == cJSON_Number) { - g_queryInfo.specifiedQueryInfo.endAfterConsume[j] - = endAfterConsume->valueint; - } - if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1) - g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; - - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; - cJSON* resubAfterConsume = - cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); - if ((resubAfterConsume) - && (resubAfterConsume->type == cJSON_Number) - && (resubAfterConsume->valueint >= 0)) { - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] - = resubAfterConsume->valueint; - } - - if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1) - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; - - cJSON *result = cJSON_GetObjectItem(sql, "result"); - if ((NULL != result) && (result->type == cJSON_String) - && (result->valuestring != NULL)) { - tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], - result->valuestring, MAX_FILE_NAME_LEN); - } else if (NULL == result) { - memset(g_queryInfo.specifiedQueryInfo.result[j], - 0, MAX_FILE_NAME_LEN); + cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, + if (answerPrompt && answerPrompt->type == cJSON_String + && answerPrompt->valuestring != NULL) { + if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { + g_args.answer_yes = false; + } else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) { + g_args.answer_yes = true; } else { - printf("ERROR: failed to read json, super query result file not found\n"); - goto PARSE_OVER; + g_args.answer_yes = false; } - } - } - } - - // super_table_query - cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); - if (!superQuery) { - g_queryInfo.superQueryInfo.threadCnt = 1; - g_queryInfo.superQueryInfo.sqlCount = 0; - } else if (superQuery->type != cJSON_Object) { - printf("ERROR: failed to read json, sub_table_query not found\n"); - ret = true; - goto PARSE_OVER; - } else { - cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval"); - if (subrate && subrate->type == cJSON_Number) { - g_queryInfo.superQueryInfo.queryInterval = subrate->valueint; - } else if (!subrate) { - g_queryInfo.superQueryInfo.queryInterval = 0; - } - - cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times"); - if (superQueryTimes && superQueryTimes->type == cJSON_Number) { - if (superQueryTimes->valueint <= 0) { - errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", - __func__, __LINE__, superQueryTimes->valueint); - goto PARSE_OVER; - } - g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint; - } else if (!superQueryTimes) { - g_queryInfo.superQueryInfo.queryTimes = g_args.query_times; + } else if (!answerPrompt) { + g_args.answer_yes = false; } else { - errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", - __func__, __LINE__); - goto PARSE_OVER; - } - - cJSON* threads = cJSON_GetObjectItem(superQuery, "threads"); - if (threads && threads->type == cJSON_Number) { - if (threads->valueint <= 0) { - errorPrint("%s() LN%d, failed to read json, threads input mistake\n", - __func__, __LINE__); + printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); goto PARSE_OVER; - - } - g_queryInfo.superQueryInfo.threadCnt = threads->valueint; - } else if (!threads) { - g_queryInfo.superQueryInfo.threadCnt = 1; } - //cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count"); - //if (subTblCnt && subTblCnt->type == cJSON_Number) { - // g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint; - //} else if (!subTblCnt) { - // g_queryInfo.superQueryInfo.childTblCount = 0; - //} - - cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); - if (stblname && stblname->type == cJSON_String - && stblname->valuestring != NULL) { - tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, - TSDB_TABLE_NAME_LEN); + cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times"); + if (gQueryTimes && gQueryTimes->type == cJSON_Number) { + if (gQueryTimes->valueint <= 0) { + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; + } + g_args.query_times = gQueryTimes->valueint; + } else if (!gQueryTimes) { + g_args.query_times = 1; } else { - errorPrint("%s() LN%d, failed to read json, super table name input error\n", - __func__, __LINE__); - goto PARSE_OVER; + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; } - cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode"); - if (superAsyncMode && superAsyncMode->type == cJSON_String - && superAsyncMode->valuestring != NULL) { - if (0 == strcmp("sync", superAsyncMode->valuestring)) { - g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; - } else if (0 == strcmp("async", superAsyncMode->valuestring)) { - g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE; - } else { - errorPrint("%s() LN%d, failed to read json, async mode input error\n", - __func__, __LINE__); + cJSON* dbs = cJSON_GetObjectItem(root, "databases"); + if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) { + tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN); + } else if (!dbs) { + printf("ERROR: failed to read json, databases not found\n"); goto PARSE_OVER; - } - } else { - g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; } - cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval"); - if (superInterval && superInterval->type == cJSON_Number) { - if (superInterval->valueint < 0) { - errorPrint("%s() LN%d, failed to read json, interval input mistake\n", - __func__, __LINE__); - goto PARSE_OVER; - } - g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint; - } else if (!superInterval) { - //printf("failed to read json, subscribe interval no found\n"); - //goto PARSE_OVER; - g_queryInfo.superQueryInfo.subscribeInterval = 10000; - } - - cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); - if (subrestart && subrestart->type == cJSON_String - && subrestart->valuestring != NULL) { - if (0 == strcmp("yes", subrestart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = true; - } else if (0 == strcmp("no", subrestart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = false; - } else { - printf("ERROR: failed to read json, subscribe restart error\n"); - goto PARSE_OVER; - } + cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode"); + if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) { + tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE); + } else if (!queryMode) { + tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE); } else { - g_queryInfo.superQueryInfo.subscribeRestart = true; + printf("ERROR: failed to read json, query_mode not found\n"); + goto PARSE_OVER; } - cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); - if (superkeepProgress && - superkeepProgress->type == cJSON_String - && superkeepProgress->valuestring != NULL) { - if (0 == strcmp("yes", superkeepProgress->valuestring)) { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; - } else if (0 == strcmp("no", superkeepProgress->valuestring)) { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; - } else { - printf("ERROR: failed to read json, subscribe super table keepProgress error\n"); + // specified_table_query + cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query"); + if (!specifiedQuery) { + g_queryInfo.specifiedQueryInfo.concurrent = 1; + g_queryInfo.specifiedQueryInfo.sqlCount = 0; + } else if (specifiedQuery->type != cJSON_Object) { + printf("ERROR: failed to read json, super_table_query not found\n"); goto PARSE_OVER; - } } else { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; - } - - // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.endAfterConsume = -1; - cJSON* superEndAfterConsume = - cJSON_GetObjectItem(superQuery, "endAfterConsume"); - if (superEndAfterConsume - && superEndAfterConsume->type == cJSON_Number) { - g_queryInfo.superQueryInfo.endAfterConsume = - superEndAfterConsume->valueint; - } - if (g_queryInfo.superQueryInfo.endAfterConsume < -1) - g_queryInfo.superQueryInfo.endAfterConsume = -1; + cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval"); + if (queryInterval && queryInterval->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint; + } else if (!queryInterval) { + g_queryInfo.specifiedQueryInfo.queryInterval = 0; + } - // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.resubAfterConsume = -1; - cJSON* superResubAfterConsume = - cJSON_GetObjectItem(superQuery, "resubAfterConsume"); - if ((superResubAfterConsume) - && (superResubAfterConsume->type == cJSON_Number) - && (superResubAfterConsume->valueint >= 0)) { - g_queryInfo.superQueryInfo.resubAfterConsume = - superResubAfterConsume->valueint; - } - if (g_queryInfo.superQueryInfo.resubAfterConsume < -1) - g_queryInfo.superQueryInfo.resubAfterConsume = -1; + cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, + "query_times"); + if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { + if (specifiedQueryTimes->valueint <= 0) { + errorPrint( + "%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", + __func__, __LINE__, specifiedQueryTimes->valueint); + goto PARSE_OVER; - // supert table sqls - cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); - if (!superSqls) { - g_queryInfo.superQueryInfo.sqlCount = 0; - } else if (superSqls->type != cJSON_Array) { - errorPrint("%s() LN%d: failed to read json, super sqls not found\n", - __func__, __LINE__); - goto PARSE_OVER; - } else { - int superSqlSize = cJSON_GetArraySize(superSqls); - if (superSqlSize > MAX_QUERY_SQL_COUNT) { - errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n", - __func__, __LINE__, MAX_QUERY_SQL_COUNT); - goto PARSE_OVER; - } + } + g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; + } else if (!specifiedQueryTimes) { + g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; + } else { + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; + } - g_queryInfo.superQueryInfo.sqlCount = superSqlSize; - for (int j = 0; j < superSqlSize; ++j) { - cJSON* sql = cJSON_GetArrayItem(superSqls, j); - if (sql == NULL) continue; + cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); + if (concurrent && concurrent->type == cJSON_Number) { + if (concurrent->valueint <= 0) { + errorPrint( + "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", + __func__, __LINE__, + g_queryInfo.specifiedQueryInfo.sqlCount, + g_queryInfo.specifiedQueryInfo.concurrent); + goto PARSE_OVER; + } + g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint; + } else if (!concurrent) { + g_queryInfo.specifiedQueryInfo.concurrent = 1; + } - cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); - if (!sqlStr || sqlStr->type != cJSON_String - || sqlStr->valuestring == NULL) { - errorPrint("%s() LN%d, failed to read json, sql not found\n", - __func__, __LINE__); - goto PARSE_OVER; - } - tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, - MAX_QUERY_SQL_LENGTH); - - cJSON *result = cJSON_GetObjectItem(sql, "result"); - if (result != NULL && result->type == cJSON_String - && result->valuestring != NULL){ - tstrncpy(g_queryInfo.superQueryInfo.result[j], - result->valuestring, MAX_FILE_NAME_LEN); - } else if (NULL == result) { - memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); - } else { - errorPrint("%s() LN%d, failed to read json, sub query result file not found\n", - __func__, __LINE__); - goto PARSE_OVER; + cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode"); + if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String + && specifiedAsyncMode->valuestring != NULL) { + if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) { + g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE; + } else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) { + g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE; + } else { + errorPrint("%s() LN%d, failed to read json, async mode input error\n", + __func__, __LINE__); + goto PARSE_OVER; + } + } else { + g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE; } - } - } - } - ret = true; + cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); + if (interval && interval->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint; + } else if (!interval) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000; + } -PARSE_OVER: - return ret; -} + cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); + if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { + if (0 == strcmp("yes", restart->valuestring)) { + g_queryInfo.specifiedQueryInfo.subscribeRestart = true; + } else if (0 == strcmp("no", restart->valuestring)) { + g_queryInfo.specifiedQueryInfo.subscribeRestart = false; + } else { + printf("ERROR: failed to read json, subscribe restart error\n"); + goto PARSE_OVER; + } + } else { + g_queryInfo.specifiedQueryInfo.subscribeRestart = true; + } -static bool getInfoFromJsonFile(char* file) { - debugPrint("%s %d %s\n", __func__, __LINE__, file); + cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); + if (keepProgress + && keepProgress->type == cJSON_String + && keepProgress->valuestring != NULL) { + if (0 == strcmp("yes", keepProgress->valuestring)) { + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1; + } else if (0 == strcmp("no", keepProgress->valuestring)) { + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; + } else { + printf("ERROR: failed to read json, subscribe keepProgress error\n"); + goto PARSE_OVER; + } + } else { + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; + } - FILE *fp = fopen(file, "r"); - if (!fp) { - printf("failed to read %s, reason:%s\n", file, strerror(errno)); - return false; - } + // sqls + cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); + if (!specifiedSqls) { + g_queryInfo.specifiedQueryInfo.sqlCount = 0; + } else if (specifiedSqls->type != cJSON_Array) { + errorPrint("%s() LN%d, failed to read json, super sqls not found\n", + __func__, __LINE__); + goto PARSE_OVER; + } else { + int superSqlSize = cJSON_GetArraySize(specifiedSqls); + if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent + > MAX_QUERY_SQL_COUNT) { + errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n", + __func__, __LINE__, + superSqlSize, + g_queryInfo.specifiedQueryInfo.concurrent, + MAX_QUERY_SQL_COUNT); + goto PARSE_OVER; + } - bool ret = false; - int maxLen = 6400000; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - printf("failed to read %s, content is null", file); - return false; - } + g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize; + for (int j = 0; j < superSqlSize; ++j) { + cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j); + if (sql == NULL) continue; - content[len] = 0; - cJSON* root = cJSON_Parse(content); - if (root == NULL) { - printf("ERROR: failed to cjson parse %s, invalid json format\n", file); - goto PARSE_OVER; - } + cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); + if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { + printf("ERROR: failed to read json, sql not found\n"); + goto PARSE_OVER; + } + tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], + sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + + // default value is -1, which mean infinite loop + g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; + cJSON* endAfterConsume = + cJSON_GetObjectItem(specifiedQuery, "endAfterConsume"); + if (endAfterConsume + && endAfterConsume->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.endAfterConsume[j] + = endAfterConsume->valueint; + } + if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1) + g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; + + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; + cJSON* resubAfterConsume = + cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); + if ((resubAfterConsume) + && (resubAfterConsume->type == cJSON_Number) + && (resubAfterConsume->valueint >= 0)) { + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] + = resubAfterConsume->valueint; + } - cJSON* filetype = cJSON_GetObjectItem(root, "filetype"); - if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) { - if (0 == strcasecmp("insert", filetype->valuestring)) { - g_args.test_mode = INSERT_TEST; - } else if (0 == strcasecmp("query", filetype->valuestring)) { - g_args.test_mode = QUERY_TEST; - } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { - g_args.test_mode = SUBSCRIBE_TEST; - } else { - printf("ERROR: failed to read json, filetype not support\n"); - goto PARSE_OVER; + if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1) + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; + + cJSON *result = cJSON_GetObjectItem(sql, "result"); + if ((NULL != result) && (result->type == cJSON_String) + && (result->valuestring != NULL)) { + tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], + result->valuestring, MAX_FILE_NAME_LEN); + } else if (NULL == result) { + memset(g_queryInfo.specifiedQueryInfo.result[j], + 0, MAX_FILE_NAME_LEN); + } else { + printf("ERROR: failed to read json, super query result file not found\n"); + goto PARSE_OVER; + } + } + } } - } else if (!filetype) { - g_args.test_mode = INSERT_TEST; - } else { - printf("ERROR: failed to read json, filetype not found\n"); - goto PARSE_OVER; - } - if (INSERT_TEST == g_args.test_mode) { - ret = getMetaFromInsertJsonFile(root); - } else if ((QUERY_TEST == g_args.test_mode) - || (SUBSCRIBE_TEST == g_args.test_mode)) { - ret = getMetaFromQueryJsonFile(root); - } else { - errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", - __func__, __LINE__); - goto PARSE_OVER; - } + // super_table_query + cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); + if (!superQuery) { + g_queryInfo.superQueryInfo.threadCnt = 1; + g_queryInfo.superQueryInfo.sqlCount = 0; + } else if (superQuery->type != cJSON_Object) { + printf("ERROR: failed to read json, sub_table_query not found\n"); + ret = true; + goto PARSE_OVER; + } else { + cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval"); + if (subrate && subrate->type == cJSON_Number) { + g_queryInfo.superQueryInfo.queryInterval = subrate->valueint; + } else if (!subrate) { + g_queryInfo.superQueryInfo.queryInterval = 0; + } + + cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times"); + if (superQueryTimes && superQueryTimes->type == cJSON_Number) { + if (superQueryTimes->valueint <= 0) { + errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", + __func__, __LINE__, superQueryTimes->valueint); + goto PARSE_OVER; + } + g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint; + } else if (!superQueryTimes) { + g_queryInfo.superQueryInfo.queryTimes = g_args.query_times; + } else { + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; + } -PARSE_OVER: - free(content); - cJSON_Delete(root); - fclose(fp); - return ret; -} + cJSON* threads = cJSON_GetObjectItem(superQuery, "threads"); + if (threads && threads->type == cJSON_Number) { + if (threads->valueint <= 0) { + errorPrint("%s() LN%d, failed to read json, threads input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; -static int prepareSampleData() { - for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { - if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) { - return -1; + } + g_queryInfo.superQueryInfo.threadCnt = threads->valueint; + } else if (!threads) { + g_queryInfo.superQueryInfo.threadCnt = 1; } - } - } - } - return 0; -} + //cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count"); + //if (subTblCnt && subTblCnt->type == cJSON_Number) { + // g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint; + //} else if (!subTblCnt) { + // g_queryInfo.superQueryInfo.childTblCount = 0; + //} -static void postFreeResource() { - tmfclose(g_fpOfInsertResult); - for (int i = 0; i < g_Dbs.dbCount; i++) { - for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { - free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; - } - if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { - free(g_Dbs.db[i].superTbls[j].sampleDataBuf); - g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; - } - if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { - free(g_Dbs.db[i].superTbls[j].tagDataBuf); - g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; - } - if (0 != g_Dbs.db[i].superTbls[j].childTblName) { - free(g_Dbs.db[i].superTbls[j].childTblName); - g_Dbs.db[i].superTbls[j].childTblName = NULL; - } + cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); + if (stblname && stblname->type == cJSON_String + && stblname->valuestring != NULL) { + tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, + TSDB_TABLE_NAME_LEN); + } else { + errorPrint("%s() LN%d, failed to read json, super table name input error\n", + __func__, __LINE__); + goto PARSE_OVER; + } + + cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode"); + if (superAsyncMode && superAsyncMode->type == cJSON_String + && superAsyncMode->valuestring != NULL) { + if (0 == strcmp("sync", superAsyncMode->valuestring)) { + g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; + } else if (0 == strcmp("async", superAsyncMode->valuestring)) { + g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE; + } else { + errorPrint("%s() LN%d, failed to read json, async mode input error\n", + __func__, __LINE__); + goto PARSE_OVER; + } + } else { + g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; + } + + cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval"); + if (superInterval && superInterval->type == cJSON_Number) { + if (superInterval->valueint < 0) { + errorPrint("%s() LN%d, failed to read json, interval input mistake\n", + __func__, __LINE__); + goto PARSE_OVER; + } + g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint; + } else if (!superInterval) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.superQueryInfo.subscribeInterval = 10000; + } + + cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); + if (subrestart && subrestart->type == cJSON_String + && subrestart->valuestring != NULL) { + if (0 == strcmp("yes", subrestart->valuestring)) { + g_queryInfo.superQueryInfo.subscribeRestart = true; + } else if (0 == strcmp("no", subrestart->valuestring)) { + g_queryInfo.superQueryInfo.subscribeRestart = false; + } else { + printf("ERROR: failed to read json, subscribe restart error\n"); + goto PARSE_OVER; + } + } else { + g_queryInfo.superQueryInfo.subscribeRestart = true; + } + + cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); + if (superkeepProgress && + superkeepProgress->type == cJSON_String + && superkeepProgress->valuestring != NULL) { + if (0 == strcmp("yes", superkeepProgress->valuestring)) { + g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; + } else if (0 == strcmp("no", superkeepProgress->valuestring)) { + g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; + } else { + printf("ERROR: failed to read json, subscribe super table keepProgress error\n"); + goto PARSE_OVER; + } + } else { + g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; + } + + // default value is -1, which mean do not resub + g_queryInfo.superQueryInfo.endAfterConsume = -1; + cJSON* superEndAfterConsume = + cJSON_GetObjectItem(superQuery, "endAfterConsume"); + if (superEndAfterConsume + && superEndAfterConsume->type == cJSON_Number) { + g_queryInfo.superQueryInfo.endAfterConsume = + superEndAfterConsume->valueint; + } + if (g_queryInfo.superQueryInfo.endAfterConsume < -1) + g_queryInfo.superQueryInfo.endAfterConsume = -1; + + // default value is -1, which mean do not resub + g_queryInfo.superQueryInfo.resubAfterConsume = -1; + cJSON* superResubAfterConsume = + cJSON_GetObjectItem(superQuery, "resubAfterConsume"); + if ((superResubAfterConsume) + && (superResubAfterConsume->type == cJSON_Number) + && (superResubAfterConsume->valueint >= 0)) { + g_queryInfo.superQueryInfo.resubAfterConsume = + superResubAfterConsume->valueint; + } + if (g_queryInfo.superQueryInfo.resubAfterConsume < -1) + g_queryInfo.superQueryInfo.resubAfterConsume = -1; + + // supert table sqls + cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); + if (!superSqls) { + g_queryInfo.superQueryInfo.sqlCount = 0; + } else if (superSqls->type != cJSON_Array) { + errorPrint("%s() LN%d: failed to read json, super sqls not found\n", + __func__, __LINE__); + goto PARSE_OVER; + } else { + int superSqlSize = cJSON_GetArraySize(superSqls); + if (superSqlSize > MAX_QUERY_SQL_COUNT) { + errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n", + __func__, __LINE__, MAX_QUERY_SQL_COUNT); + goto PARSE_OVER; + } + + g_queryInfo.superQueryInfo.sqlCount = superSqlSize; + for (int j = 0; j < superSqlSize; ++j) { + cJSON* sql = cJSON_GetArrayItem(superSqls, j); + if (sql == NULL) continue; + + cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); + if (!sqlStr || sqlStr->type != cJSON_String + || sqlStr->valuestring == NULL) { + errorPrint("%s() LN%d, failed to read json, sql not found\n", + __func__, __LINE__); + goto PARSE_OVER; + } + tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, + MAX_QUERY_SQL_LENGTH); + + cJSON *result = cJSON_GetObjectItem(sql, "result"); + if (result != NULL && result->type == cJSON_String + && result->valuestring != NULL){ + tstrncpy(g_queryInfo.superQueryInfo.result[j], + result->valuestring, MAX_FILE_NAME_LEN); + } else if (NULL == result) { + memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); + } else { + errorPrint("%s() LN%d, failed to read json, sub query result file not found\n", + __func__, __LINE__); + goto PARSE_OVER; + } + } + } + } + + ret = true; + +PARSE_OVER: + return ret; +} + +static bool getInfoFromJsonFile(char* file) { + debugPrint("%s %d %s\n", __func__, __LINE__, file); + + FILE *fp = fopen(file, "r"); + if (!fp) { + printf("failed to read %s, reason:%s\n", file, strerror(errno)); + return false; + } + + bool ret = false; + int maxLen = 6400000; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + printf("failed to read %s, content is null", file); + return false; + } + + content[len] = 0; + cJSON* root = cJSON_Parse(content); + if (root == NULL) { + printf("ERROR: failed to cjson parse %s, invalid json format\n", file); + goto PARSE_OVER; + } + + cJSON* filetype = cJSON_GetObjectItem(root, "filetype"); + if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) { + if (0 == strcasecmp("insert", filetype->valuestring)) { + g_args.test_mode = INSERT_TEST; + } else if (0 == strcasecmp("query", filetype->valuestring)) { + g_args.test_mode = QUERY_TEST; + } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { + g_args.test_mode = SUBSCRIBE_TEST; + } else { + printf("ERROR: failed to read json, filetype not support\n"); + goto PARSE_OVER; + } + } else if (!filetype) { + g_args.test_mode = INSERT_TEST; + } else { + printf("ERROR: failed to read json, filetype not found\n"); + goto PARSE_OVER; + } + + if (INSERT_TEST == g_args.test_mode) { + ret = getMetaFromInsertJsonFile(root); + } else if ((QUERY_TEST == g_args.test_mode) + || (SUBSCRIBE_TEST == g_args.test_mode)) { + ret = getMetaFromQueryJsonFile(root); + } else { + errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", + __func__, __LINE__); + goto PARSE_OVER; + } + +PARSE_OVER: + free(content); + cJSON_Delete(root); + fclose(fp); + return ret; +} + +static int prepareSampleData() { + for (int i = 0; i < g_Dbs.dbCount; i++) { + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { + if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) { + return -1; + } + } + } + } + + return 0; +} + +static void postFreeResource() { + tmfclose(g_fpOfInsertResult); + for (int i = 0; i < g_Dbs.dbCount; i++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { + free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; + } + if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { + free(g_Dbs.db[i].superTbls[j].sampleDataBuf); + g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; + } + if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { + free(g_Dbs.db[i].superTbls[j].tagDataBuf); + g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; + } + if (0 != g_Dbs.db[i].superTbls[j].childTblName) { + free(g_Dbs.db[i].superTbls[j].childTblName); + g_Dbs.db[i].superTbls[j].childTblName = NULL; + } + } } - } } static int getRowDataFromSample( @@ -5033,30 +5049,30 @@ static int64_t generateData(char *recBuf, char **data_type, } static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { - char* sampleDataBuf = NULL; + char* sampleDataBuf = NULL; - sampleDataBuf = calloc( + sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); - if (sampleDataBuf == NULL) { - errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", - __func__, __LINE__, - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, - strerror(errno)); - return -1; - } + if (sampleDataBuf == NULL) { + errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", + __func__, __LINE__, + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, + strerror(errno)); + return -1; + } - superTblInfo->sampleDataBuf = sampleDataBuf; - int ret = readSampleFromCsvFileToMem(superTblInfo); + superTblInfo->sampleDataBuf = sampleDataBuf; + int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { - errorPrint("%s() LN%d, read sample from csv file failed.\n", - __func__, __LINE__); - tmfree(sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return -1; - } + if (0 != ret) { + errorPrint("%s() LN%d, read sample from csv file failed.\n", + __func__, __LINE__); + tmfree(sampleDataBuf); + superTblInfo->sampleDataBuf = NULL; + return -1; + } - return 0; + return 0; } static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) @@ -5155,54 +5171,54 @@ static int32_t generateDataTailWithoutStb( uint64_t recordFrom, int64_t startTime, /* int64_t *pSamplePos, */int64_t *dataLen) { - uint64_t len = 0; - char *pstr = buffer; + uint64_t len = 0; + char *pstr = buffer; - verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); - int32_t k = 0; - for (k = 0; k < batch;) { - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); + int32_t k = 0; + for (k = 0; k < batch;) { + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); - int64_t retLen = 0; + int64_t retLen = 0; - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; - if (g_args.disorderRatio) { - retLen = generateData(data, data_type, - startTime + getTSRandTail( - (int64_t) DEFAULT_TIMESTAMP_STEP, k, - g_args.disorderRatio, - g_args.disorderRange), - lenOfBinary); - } else { - retLen = generateData(data, data_type, - startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k), - lenOfBinary); - } + if (g_args.disorderRatio) { + retLen = generateData(data, data_type, + startTime + getTSRandTail( + (int64_t) DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange), + lenOfBinary); + } else { + retLen = generateData(data, data_type, + startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k), + lenOfBinary); + } - if (len > remainderBufLen) - break; + if (len > remainderBufLen) + break; - pstr += sprintf(pstr, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; + pstr += sprintf(pstr, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; - verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", - __func__, __LINE__, len, k, buffer); + verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); - recordFrom ++; + recordFrom ++; - if (recordFrom >= insertRows) { - break; + if (recordFrom >= insertRows) { + break; + } } - } - *dataLen = len; - return k; + *dataLen = len; + return k; } static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, @@ -5297,82 +5313,82 @@ static int generateSQLHeadWithoutStb(char *tableName, char *dbName, char *buffer, int remainderBufLen) { - int len; + int len; - char headBuf[HEAD_BUFF_LEN]; + char headBuf[HEAD_BUFF_LEN]; - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - dbName, - tableName); + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); - if (len > remainderBufLen) - return -1; + if (len > remainderBufLen) + return -1; - tstrncpy(buffer, headBuf, len + 1); + tstrncpy(buffer, headBuf, len + 1); - return len; + return len; } static int generateStbSQLHead( SSuperTable* superTblInfo, - char *tableName, int32_t tableSeq, + char *tableName, int64_t tableSeq, char *dbName, char *buffer, int remainderBufLen) { - int len; + int len; - char headBuf[HEAD_BUFF_LEN]; + char headBuf[HEAD_BUFF_LEN]; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq); - } else { + } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, tableSeq % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - errorPrint("%s() LN%d, tag buf failed to allocate memory\n", - __func__, __LINE__); - return -1; - } + } + if (NULL == tagsValBuf) { + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", + __func__, __LINE__); + return -1; + } - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s using %s.%s TAGS%s values", - dbName, - tableName, - dbName, - superTblInfo->sTblName, - tagsValBuf); - tmfree(tagsValBuf); + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s using %s.%s TAGS%s values", + dbName, + tableName, + dbName, + superTblInfo->sTblName, + tagsValBuf); + tmfree(tagsValBuf); } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - dbName, - tableName); + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); } else { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - dbName, - tableName); - } + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); + } - if (len > remainderBufLen) - return -1; + if (len > remainderBufLen) + return -1; - tstrncpy(buffer, headBuf, len + 1); + tstrncpy(buffer, headBuf, len + 1); - return len; + return len; } static int32_t generateStbInterlaceData( @@ -5650,8 +5666,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, *ptr += bind->buffer_length; } else { - errorPrint( "No support data type: %s\n", - dataType); + errorPrint( "No support data type: %s\n", dataType); return -1; } @@ -5737,28 +5752,120 @@ static int32_t prepareStmtWithoutStb( return k; } +static int32_t prepareStbStmtBind( + char *bindArray, SSuperTable *stbInfo, bool sourceRand, + int64_t startTime, int32_t recSeq, + bool isColumn) +{ + char *bindBuffer = calloc(1, g_args.len_of_binary); + if (bindBuffer == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", + __func__, __LINE__, g_args.len_of_binary); + return -1; + } + + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + char *ptr = data; + + TAOS_BIND *bind; + + if (isColumn) { + for (int i = 0; i < stbInfo->columnCount + 1; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); + + if (i == 0) { + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (stbInfo->disorderRatio) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, recSeq, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * recSeq; + } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + } else { + int cursor = 0; + + if (sourceRand) { + if ( -1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[i-1].dataType, + stbInfo->columns[i-1].dataLen, + &ptr, + NULL)) { + free(bindBuffer); + return -1; + } + } else { + char *restStr = stbInfo->sampleDataBuf + cursor; + int lengthOfRest = strlen(restStr); + + int index = 0; + for (index = 0; index < lengthOfRest; index ++) { + if (restStr[index] == ',') { + break; + } + } + + memset(bindBuffer, 0, g_args.len_of_binary); + strncpy(bindBuffer, restStr, index); + cursor += index + 1; // skip ',' too + + if ( -1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[i-1].dataType, + stbInfo->columns[i-1].dataLen, + &ptr, + bindBuffer)) { + free(bindBuffer); + return -1; + } + } + } + } + } else { + TAOS_BIND *tag; + + for (int t = 0; t < stbInfo->tagCount; t ++) { + tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t)); + if ( -1 == prepareStmtBindArrayByType( + tag, + stbInfo->tags[t].dataType, + stbInfo->tags[t].dataLen, + &ptr, + NULL)) { + free(bindBuffer); + return -1; + } + } + + } + + return 0; +} + static int32_t prepareStbStmt( SSuperTable *stbInfo, TAOS_STMT *stmt, - char *tableName, uint32_t batch, + char *tableName, + int64_t tableSeq, + uint32_t batch, uint64_t insertRows, uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos) { - int ret = taos_stmt_set_tbname(stmt, tableName); - if (ret != 0) { - errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", - tableName, ret, taos_errstr(NULL)); - return ret; - } - - char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); - if (bindArray == NULL) { - errorPrint("%s() LN%d, Failed to allocate %d bind params\n", - __func__, __LINE__, (stbInfo->columnCount + 1)); - return -1; - } + int ret; bool sourceRand; if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { @@ -5767,83 +5874,68 @@ static int32_t prepareStbStmt( sourceRand = false; // from sample data file } - char *bindBuffer = malloc(g_args.len_of_binary); - if (bindBuffer == NULL) { - errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", - __func__, __LINE__, g_args.len_of_binary); - free(bindArray); - return -1; - } + if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) { + char* tagsValBuf = NULL; - uint32_t k; - for (k = 0; k < batch;) { - /* columnCount + 1 (ts) */ - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); + bool tagRand; + if (0 == stbInfo->tagSource) { + tagRand = true; + tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq); + } else { + tagRand = false; + tagsValBuf = getTagValueFromTagSample( + stbInfo, + tableSeq % stbInfo->tagSampleCount); + } - char *ptr = data; - TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + if (NULL == tagsValBuf) { + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", + __func__, __LINE__); + return -1; + } - int64_t *bind_ts; + char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount); + if (NULL == tagsArray) { + tmfree(tagsValBuf); + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", + __func__, __LINE__); + return -1; + } - bind_ts = (int64_t *)ptr; - bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - if (stbInfo->disorderRatio) { - *bind_ts = startTime + getTSRandTail( - stbInfo->timeStampStep, k, - stbInfo->disorderRatio, - stbInfo->disorderRange); - } else { - *bind_ts = startTime + stbInfo->timeStampStep * k; + if (-1 == prepareStbStmtBind( + tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { + free(tagsArray); + return -1; } - bind->buffer_length = sizeof(int64_t); - bind->buffer = bind_ts; - bind->length = &bind->buffer_length; - bind->is_null = NULL; - ptr += bind->buffer_length; + ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray); - int cursor = 0; - for (int i = 0; i < stbInfo->columnCount; i ++) { - bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); - - if (sourceRand) { - if ( -1 == prepareStmtBindArrayByType( - bind, - stbInfo->columns[i].dataType, - stbInfo->columns[i].dataLen, - &ptr, - NULL)) { - free(bindArray); - free(bindBuffer); - return -1; - } - } else { - char *restStr = stbInfo->sampleDataBuf + cursor; - int lengthOfRest = strlen(restStr); + tmfree(tagsValBuf); + tmfree((char *)tagsArray); + } else { + ret = taos_stmt_set_tbname(stmt, tableName); + } - int index = 0; - for (index = 0; index < lengthOfRest; index ++) { - if (restStr[index] == ',') { - break; - } - } + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } - memset(bindBuffer, 0, g_args.len_of_binary); - strncpy(bindBuffer, restStr, index); - cursor += index + 1; // skip ',' too - - if ( -1 == prepareStmtBindArrayByType( - bind, - stbInfo->columns[i].dataType, - stbInfo->columns[i].dataLen, - &ptr, - bindBuffer)) { - free(bindArray); - free(bindBuffer); - return -1; - } - } + char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind params\n", + __func__, __LINE__, (stbInfo->columnCount + 1)); + return -1; + } + + uint32_t k; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, + startTime, k, true /* is column */)) { + free(bindArray); + return -1; } taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); // if msg > 3MB, break @@ -5861,7 +5953,6 @@ static int32_t prepareStbStmt( } } - free(bindBuffer); free(bindArray); return k; } @@ -5869,7 +5960,9 @@ static int32_t prepareStbStmt( static int32_t prepareStbStmtInterlace( SSuperTable *stbInfo, TAOS_STMT *stmt, - char *tableName, uint32_t batch, + char *tableName, + int64_t tableSeq, + uint32_t batch, uint64_t insertRows, uint64_t recordFrom, int64_t startTime, @@ -5879,6 +5972,7 @@ static int32_t prepareStbStmtInterlace( stbInfo, stmt, tableName, + tableSeq, batch, insertRows, 0, startTime, pSamplePos); @@ -5887,7 +5981,9 @@ static int32_t prepareStbStmtInterlace( static int32_t prepareStbStmtProgressive( SSuperTable *stbInfo, TAOS_STMT *stmt, - char *tableName, uint32_t batch, + char *tableName, + int64_t tableSeq, + uint32_t batch, uint64_t insertRows, uint64_t recordFrom, int64_t startTime, @@ -5897,6 +5993,7 @@ static int32_t prepareStbStmtProgressive( stbInfo, stmt, tableName, + tableSeq, g_args.num_of_RPR, insertRows, recordFrom, startTime, pSamplePos); @@ -6097,6 +6194,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { superTblInfo, pThreadInfo->stmt, tableName, + tableSeq, batchPerTbl, insertRows, i, startTime, @@ -6263,164 +6361,165 @@ free_of_interlace: // sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { - debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); + debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; - int64_t insertRows = + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int64_t timeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - pThreadInfo->buffer = calloc(maxSqlLen, 1); - if (NULL == pThreadInfo->buffer) { - errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", - maxSqlLen, - strerror(errno)); - return NULL; - } - - uint64_t lastPrintTime = taosGetTimestampMs(); - uint64_t startTs = taosGetTimestampMs(); - uint64_t endTs; - - pThreadInfo->totalInsertRows = 0; - pThreadInfo->totalAffectedRows = 0; - - pThreadInfo->samplePos = 0; - - for (uint64_t tableSeq = pThreadInfo->start_table_from; - tableSeq <= pThreadInfo->end_table_to; - tableSeq ++) { - int64_t start_time = pThreadInfo->start_time; - - for (uint64_t i = 0; i < insertRows;) { - char tableName[TSDB_TABLE_NAME_LEN]; - getTableName(tableName, pThreadInfo, tableSeq); - verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", - __func__, __LINE__, - pThreadInfo->threadID, tableSeq, tableName); - if (0 == strlen(tableName)) { - errorPrint("[%d] %s() LN%d, getTableName return null\n", - pThreadInfo->threadID, __func__, __LINE__); - free(pThreadInfo->buffer); + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { + errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", + maxSqlLen, + strerror(errno)); return NULL; - } + } + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + pThreadInfo->samplePos = 0; + + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { + int64_t start_time = pThreadInfo->start_time; + + for (uint64_t i = 0; i < insertRows;) { + char tableName[TSDB_TABLE_NAME_LEN]; + getTableName(tableName, pThreadInfo, tableSeq); + verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", + __func__, __LINE__, + pThreadInfo->threadID, tableSeq, tableName); + if (0 == strlen(tableName)) { + errorPrint("[%d] %s() LN%d, getTableName return null\n", + pThreadInfo->threadID, __func__, __LINE__); + free(pThreadInfo->buffer); + return NULL; + } - int64_t remainderBufLen = maxSqlLen; - char *pstr = pThreadInfo->buffer; + int64_t remainderBufLen = maxSqlLen; + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, - strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); - pstr += len; - remainderBufLen -= len; + pstr += len; + remainderBufLen -= len; - int32_t generated; - if (superTblInfo) { - if (superTblInfo->iface == STMT_IFACE) { + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { #if STMT_IFACE_ENABLED == 1 - generated = prepareStbStmtProgressive( - superTblInfo, - pThreadInfo->stmt, - tableName, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); + generated = prepareStbStmtProgressive( + superTblInfo, + pThreadInfo->stmt, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); #else - generated = -1; + generated = -1; #endif - } else { - generated = generateStbProgressiveData( - superTblInfo, - tableName, tableSeq, pThreadInfo->db_name, pstr, - insertRows, i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); - } - } else { - if (g_args.iface == STMT_IFACE) { + } else { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, + insertRows, i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { #if STMT_IFACE_ENABLED == 1 - generated = prepareStmtWithoutStb( - pThreadInfo->stmt, - tableName, - g_args.num_of_RPR, - insertRows, i, - start_time); + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, + start_time); #else - generated = -1; + generated = -1; #endif - } else { - generated = generateProgressiveDataWithoutStb( - tableName, - /* tableSeq, */ - pThreadInfo, pstr, insertRows, - i, start_time, - /* &(pThreadInfo->samplePos), */ - &remainderBufLen); - } - } - if (generated > 0) - i += generated; - else - goto free_of_progressive; + } else { + generated = generateProgressiveDataWithoutStb( + tableName, + /* tableSeq, */ + pThreadInfo, pstr, insertRows, + i, start_time, + /* &(pThreadInfo->samplePos), */ + &remainderBufLen); + } + } + if (generated > 0) + i += generated; + else + goto free_of_progressive; - start_time += generated * timeStampStep; - pThreadInfo->totalInsertRows += generated; + start_time += generated * timeStampStep; + pThreadInfo->totalInsertRows += generated; - startTs = taosGetTimestampMs(); + startTs = taosGetTimestampMs(); - int32_t affectedRows = execInsert(pThreadInfo, generated); + int32_t affectedRows = execInsert(pThreadInfo, generated); - endTs = taosGetTimestampMs(); - uint64_t delay = endTs - startTs; - performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", - __func__, __LINE__, delay); - verbosePrint("[%d] %s() LN%d affectedRows=%d\n", - pThreadInfo->threadID, - __func__, __LINE__, affectedRows); + endTs = taosGetTimestampMs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", + __func__, __LINE__, delay); + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); - if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; - if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; - pThreadInfo->cntDelay++; - pThreadInfo->totalDelay += delay; + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; - if (affectedRows < 0) { - errorPrint("%s() LN%d, affected rows: %d\n", - __func__, __LINE__, affectedRows); - goto free_of_progressive; - } + if (affectedRows < 0) { + errorPrint("%s() LN%d, affected rows: %d\n", + __func__, __LINE__, affectedRows); + goto free_of_progressive; + } - pThreadInfo->totalAffectedRows += affectedRows; + pThreadInfo->totalAffectedRows += affectedRows; - int64_t currentPrintTime = taosGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", - pThreadInfo->threadID, - pThreadInfo->totalInsertRows, - pThreadInfo->totalAffectedRows); - lastPrintTime = currentPrintTime; - } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } - if (i >= insertRows) - break; - } // num_of_DPT + if (i >= insertRows) + break; + } // num_of_DPT - if ((g_args.verbose_print) && - (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && - (0 == strncasecmp( - superTblInfo->dataSource, "sample", strlen("sample")))) { - verbosePrint("%s() LN%d samplePos=%"PRId64"\n", - __func__, __LINE__, pThreadInfo->samplePos); - } - } // tableSeq + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && + (0 == strncasecmp( + superTblInfo->dataSource, "sample", strlen("sample")))) { + verbosePrint("%s() LN%d samplePos=%"PRId64"\n", + __func__, __LINE__, pThreadInfo->samplePos); + } + } // tableSeq free_of_progressive: - tmfree(pThreadInfo->buffer); - printStatPerThread(pThreadInfo); - return NULL; + tmfree(pThreadInfo->buffer); + printStatPerThread(pThreadInfo); + return NULL; } static void* syncWrite(void *sarg) { @@ -6428,6 +6527,8 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("syncWrite"); + uint32_t interlaceRows; if (superTblInfo) { @@ -6513,6 +6614,8 @@ static void *asyncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + setThreadName("asyncWrite"); + pThreadInfo->st = 0; pThreadInfo->et = 0; pThreadInfo->lastTs = pThreadInfo->start_time; @@ -6754,7 +6857,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(-1); } - char buffer[3000]; + char buffer[BUFFER_SIZE]; char *pstr = buffer; if ((superTblInfo) @@ -6913,6 +7016,7 @@ static void *readTable(void *sarg) { #if 1 threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS *taos = pThreadInfo->taos; + setThreadName("readTable"); char command[BUFFER_SIZE] = "\0"; uint64_t sTime = pThreadInfo->start_time; char *tb_prefix = pThreadInfo->tb_prefix; @@ -6985,6 +7089,7 @@ static void *readMetric(void *sarg) { #if 1 threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS *taos = pThreadInfo->taos; + setThreadName("readMetric"); char command[BUFFER_SIZE] = "\0"; FILE *fp = fopen(pThreadInfo->filePath, "a"); if (NULL == fp) { @@ -7161,6 +7266,8 @@ static int insertTestProcess() { static void *specifiedTableQuery(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; + setThreadName("specTableQuery"); + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -7260,6 +7367,8 @@ static void *superTableQuery(void *sarg) { char sqlstr[MAX_QUERY_SQL_LENGTH]; threadInfo *pThreadInfo = (threadInfo *)sarg; + setThreadName("superTableQuery"); + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -7562,6 +7671,8 @@ static void *superSubscribe(void *sarg) { TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; uint64_t tsubSeq; + setThreadName("superSub"); + if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); @@ -7708,6 +7819,8 @@ static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; // TAOS_SUB* tsub = NULL; + setThreadName("specSub"); + if (pThreadInfo->taos == NULL) { pThreadInfo->taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -8140,55 +8253,55 @@ static int isCommentLine(char *line) { static void querySqlFile(TAOS* taos, char* sqlFile) { - FILE *fp = fopen(sqlFile, "r"); - if (fp == NULL) { - printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); - return; - } + FILE *fp = fopen(sqlFile, "r"); + if (fp == NULL) { + printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); + return; + } - int read_len = 0; - char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW); - size_t cmd_len = 0; - char * line = NULL; - size_t line_len = 0; + int read_len = 0; + char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; - double t = taosGetTimestampMs(); + double t = taosGetTimestampMs(); - while((read_len = tgetline(&line, &line_len, fp)) != -1) { - if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue; - line[--read_len] = '\0'; + while((read_len = tgetline(&line, &line_len, fp)) != -1) { + if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue; + line[--read_len] = '\0'; - if (read_len == 0 || isCommentLine(line)) { // line starts with # - continue; - } + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } - if (line[read_len - 1] == '\\') { - line[read_len - 1] = ' '; - memcpy(cmd + cmd_len, line, read_len); - cmd_len += read_len; - continue; - } + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } - memcpy(cmd + cmd_len, line, read_len); - if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) { - errorPrint("%s() LN%d, queryDbExec %s failed!\n", - __func__, __LINE__, cmd); - tmfree(cmd); - tmfree(line); - tmfclose(fp); - return; + memcpy(cmd + cmd_len, line, read_len); + if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) { + errorPrint("%s() LN%d, queryDbExec %s failed!\n", + __func__, __LINE__, cmd); + tmfree(cmd); + tmfree(line); + tmfclose(fp); + return; + } + memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW); + cmd_len = 0; } - memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW); - cmd_len = 0; - } - t = taosGetTimestampMs() - t; - printf("run %s took %.6f second(s)\n\n", sqlFile, t); + t = taosGetTimestampMs() - t; + printf("run %s took %.6f second(s)\n\n", sqlFile, t); - tmfree(cmd); - tmfree(line); - tmfclose(fp); - return; + tmfree(cmd); + tmfree(line); + tmfclose(fp); + return; } static void testMetaFile() { -- GitLab