diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index ff69c9f81ad7a112cacb8a6f87b185c54470345d..fd4023964abfb64311c2e8a2827c438d534e80d0 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -348,20 +348,21 @@ typedef struct SDbs_S { } SDbs; -typedef struct SuperQueryInfo_S { +typedef struct SpecifiedQueryInfo_S { int rate; // 0: unlimit > 0 loop/s int concurrent; int sqlCount; int subscribeMode; // 0: sync, 1: async int subscribeInterval; // ms + int queryTimes; int subscribeRestart; int subscribeKeepProgress; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; -} SuperQueryInfo; +} SpecifiedQueryInfo; -typedef struct SubQueryInfo_S { +typedef struct SuperQueryInfo_S { char sTblName[MAX_TB_NAME_SIZE+1]; int rate; // 0: unlimit > 0 loop/s int threadCnt; @@ -369,6 +370,7 @@ typedef struct SubQueryInfo_S { int subscribeInterval; // ms int subscribeRestart; int subscribeKeepProgress; + int queryTimes; int childTblCount; char childTblPrefix[MAX_TB_NAME_SIZE]; int sqlCount; @@ -377,7 +379,7 @@ typedef struct SubQueryInfo_S { TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; -} SubQueryInfo; +} SuperQueryInfo; typedef struct SQueryMetaInfo_S { char cfgDir[MAX_FILE_NAME_LEN+1]; @@ -388,8 +390,8 @@ typedef struct SQueryMetaInfo_S { char dbName[MAX_DB_NAME_SIZE+1]; char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful - SuperQueryInfo superQueryInfo; - SubQueryInfo subQueryInfo; + SpecifiedQueryInfo specifiedQueryInfo; + SuperQueryInfo superQueryInfo; } SQueryMetaInfo; typedef struct SThreadInfo_S { @@ -1434,38 +1436,38 @@ static void printfQueryMeta() { printf("\n"); printf("specified table query info: \n"); - printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate); + printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate); printf("query times: \033[33m%d\033[0m\n", g_args.query_times); - printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent); - printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); + printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent); + printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); if (SUBSCRIBE_TEST == g_args.test_mode) { - printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); - printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); - printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); - printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); + printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode); + printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval); + printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart); + printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); } - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]); } printf("\n"); printf("super table query info: \n"); - printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.rate); - printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.threadCnt); - printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount); - printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName); + printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate); + printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt); + printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount); + printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName); if (SUBSCRIBE_TEST == g_args.test_mode) { - printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode); - printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval); - printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart); - printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeKeepProgress); + printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); + printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); + printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); + printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); } - printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.sqlCount); - for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { - printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.subQueryInfo.sql[i]); + printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); } printf("\n"); @@ -3761,85 +3763,95 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } // super_table_query - cJSON *superQuery = cJSON_GetObjectItem(root, "specified_table_query"); - if (!superQuery) { - g_queryInfo.superQueryInfo.concurrent = 0; - g_queryInfo.superQueryInfo.sqlCount = 0; - } else if (superQuery->type != cJSON_Object) { + cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query"); + if (!specifiedQuery) { + g_queryInfo.specifiedQueryInfo.concurrent = 0; + g_queryInfo.specifiedQueryInfo.sqlCount = 0; + } else if (specifiedQuery->type != cJSON_Object) { printf("ERROR: failed to read json, super_table_query not found\n"); goto PARSE_OVER; } else { - cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval"); + cJSON* rate = cJSON_GetObjectItem(specifiedQuery, "query_interval"); if (rate && rate->type == cJSON_Number) { - g_queryInfo.superQueryInfo.rate = rate->valueint; + g_queryInfo.specifiedQueryInfo.rate = rate->valueint; } else if (!rate) { - g_queryInfo.superQueryInfo.rate = 0; + g_queryInfo.specifiedQueryInfo.rate = 0; + } + + cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times"); + if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; + } else if (!specifiedQueryTimes) { + g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; + } else { + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); + goto PARSE_OVER; } - cJSON* concurrent = cJSON_GetObjectItem(superQuery, "concurrent"); + cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); if (concurrent && concurrent->type == cJSON_Number) { - g_queryInfo.superQueryInfo.concurrent = concurrent->valueint; + g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint; } else if (!concurrent) { - g_queryInfo.superQueryInfo.concurrent = 1; + g_queryInfo.specifiedQueryInfo.concurrent = 1; } - cJSON* mode = cJSON_GetObjectItem(superQuery, "mode"); + cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode"); if (mode && mode->type == cJSON_String && mode->valuestring != NULL) { if (0 == strcmp("sync", mode->valuestring)) { - g_queryInfo.superQueryInfo.subscribeMode = 0; + g_queryInfo.specifiedQueryInfo.subscribeMode = 0; } else if (0 == strcmp("async", mode->valuestring)) { - g_queryInfo.superQueryInfo.subscribeMode = 1; + g_queryInfo.specifiedQueryInfo.subscribeMode = 1; } else { printf("ERROR: failed to read json, subscribe mod error\n"); goto PARSE_OVER; } } else { - g_queryInfo.superQueryInfo.subscribeMode = 0; + g_queryInfo.specifiedQueryInfo.subscribeMode = 0; } - cJSON* interval = cJSON_GetObjectItem(superQuery, "interval"); + cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); if (interval && interval->type == cJSON_Number) { - g_queryInfo.superQueryInfo.subscribeInterval = interval->valueint; + g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint; } else if (!interval) { //printf("failed to read json, subscribe interval no found\n"); //goto PARSE_OVER; - g_queryInfo.superQueryInfo.subscribeInterval = 10000; + g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000; } - cJSON* restart = cJSON_GetObjectItem(superQuery, "restart"); + cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { if (0 == strcmp("yes", restart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = 1; + g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; } else if (0 == strcmp("no", restart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = 0; + g_queryInfo.specifiedQueryInfo.subscribeRestart = 0; } else { printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { - g_queryInfo.superQueryInfo.subscribeRestart = 1; + g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; } - cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); + cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); if (keepProgress && keepProgress->type == cJSON_String && keepProgress->valuestring != NULL) { if (0 == strcmp("yes", keepProgress->valuestring)) { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1; } else if (0 == strcmp("no", keepProgress->valuestring)) { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; } else { printf("ERROR: failed to read json, subscribe keepProgress error\n"); goto PARSE_OVER; } } else { - g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; } // sqls - cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); + cJSON* superSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); if (!superSqls) { - g_queryInfo.superQueryInfo.sqlCount = 0; + g_queryInfo.specifiedQueryInfo.sqlCount = 0; } else if (superSqls->type != cJSON_Array) { printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; @@ -3850,7 +3862,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { goto PARSE_OVER; } - g_queryInfo.superQueryInfo.sqlCount = superSqlSize; + g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize; for (int j = 0; j < superSqlSize; ++j) { cJSON* sql = cJSON_GetArrayItem(superSqls, j); if (sql == NULL) continue; @@ -3860,13 +3872,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { printf("ERROR: failed to read json, sql not found\n"); goto PARSE_OVER; } - tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); cJSON *result = cJSON_GetObjectItem(sql, "result"); if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { - tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); + tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); } else if (NULL == result) { - memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); + memset(g_queryInfo.specifiedQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); } else { printf("ERROR: failed to read json, super query result file not found\n"); goto PARSE_OVER; @@ -3876,101 +3888,111 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } // sub_table_query - cJSON *subQuery = cJSON_GetObjectItem(root, "super_table_query"); - if (!subQuery) { - g_queryInfo.subQueryInfo.threadCnt = 0; - g_queryInfo.subQueryInfo.sqlCount = 0; - } else if (subQuery->type != cJSON_Object) { + cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); + if (!superQuery) { + g_queryInfo.superQueryInfo.threadCnt = 0; + g_queryInfo.superQueryInfo.sqlCount = 0; + } else if (superQuery->type != cJSON_Object) { printf("ERROR: failed to read json, sub_table_query not found\n"); ret = true; goto PARSE_OVER; } else { - cJSON* subrate = cJSON_GetObjectItem(subQuery, "query_interval"); + cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval"); if (subrate && subrate->type == cJSON_Number) { - g_queryInfo.subQueryInfo.rate = subrate->valueint; + g_queryInfo.superQueryInfo.rate = subrate->valueint; } else if (!subrate) { - g_queryInfo.subQueryInfo.rate = 0; + g_queryInfo.superQueryInfo.rate = 0; + } + + cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times"); + if (superQueryTimes && superQueryTimes->type == cJSON_Number) { + g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint; + } else if (!superQueryTimes) { + g_queryInfo.superQueryInfo.queryTimes = g_args.query_times; + } else { + errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); + goto PARSE_OVER; } - cJSON* threads = cJSON_GetObjectItem(subQuery, "threads"); + cJSON* threads = cJSON_GetObjectItem(superQuery, "threads"); if (threads && threads->type == cJSON_Number) { - g_queryInfo.subQueryInfo.threadCnt = threads->valueint; + g_queryInfo.superQueryInfo.threadCnt = threads->valueint; } else if (!threads) { - g_queryInfo.subQueryInfo.threadCnt = 1; + g_queryInfo.superQueryInfo.threadCnt = 1; } - //cJSON* subTblCnt = cJSON_GetObjectItem(subQuery, "childtable_count"); + //cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count"); //if (subTblCnt && subTblCnt->type == cJSON_Number) { - // g_queryInfo.subQueryInfo.childTblCount = subTblCnt->valueint; + // g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint; //} else if (!subTblCnt) { - // g_queryInfo.subQueryInfo.childTblCount = 0; + // g_queryInfo.superQueryInfo.childTblCount = 0; //} - cJSON* stblname = cJSON_GetObjectItem(subQuery, "stblname"); + cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) { - tstrncpy(g_queryInfo.subQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); + tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); } else { printf("ERROR: failed to read json, super table name not found\n"); goto PARSE_OVER; } - cJSON* submode = cJSON_GetObjectItem(subQuery, "mode"); + cJSON* submode = cJSON_GetObjectItem(superQuery, "mode"); if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { if (0 == strcmp("sync", submode->valuestring)) { - g_queryInfo.subQueryInfo.subscribeMode = 0; + g_queryInfo.superQueryInfo.subscribeMode = 0; } else if (0 == strcmp("async", submode->valuestring)) { - g_queryInfo.subQueryInfo.subscribeMode = 1; + g_queryInfo.superQueryInfo.subscribeMode = 1; } else { printf("ERROR: failed to read json, subscribe mod error\n"); goto PARSE_OVER; } } else { - g_queryInfo.subQueryInfo.subscribeMode = 0; + g_queryInfo.superQueryInfo.subscribeMode = 0; } - cJSON* subinterval = cJSON_GetObjectItem(subQuery, "interval"); + cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval"); if (subinterval && subinterval->type == cJSON_Number) { - g_queryInfo.subQueryInfo.subscribeInterval = subinterval->valueint; + g_queryInfo.superQueryInfo.subscribeInterval = subinterval->valueint; } else if (!subinterval) { //printf("failed to read json, subscribe interval no found\n"); //goto PARSE_OVER; - g_queryInfo.subQueryInfo.subscribeInterval = 10000; + g_queryInfo.superQueryInfo.subscribeInterval = 10000; } - cJSON* subrestart = cJSON_GetObjectItem(subQuery, "restart"); + cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { if (0 == strcmp("yes", subrestart->valuestring)) { - g_queryInfo.subQueryInfo.subscribeRestart = 1; + g_queryInfo.superQueryInfo.subscribeRestart = 1; } else if (0 == strcmp("no", subrestart->valuestring)) { - g_queryInfo.subQueryInfo.subscribeRestart = 0; + g_queryInfo.superQueryInfo.subscribeRestart = 0; } else { printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { - g_queryInfo.subQueryInfo.subscribeRestart = 1; + g_queryInfo.superQueryInfo.subscribeRestart = 1; } - cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress"); + cJSON* subkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); if (subkeepProgress && subkeepProgress->type == cJSON_String && subkeepProgress->valuestring != NULL) { if (0 == strcmp("yes", subkeepProgress->valuestring)) { - g_queryInfo.subQueryInfo.subscribeKeepProgress = 1; + g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; } else if (0 == strcmp("no", subkeepProgress->valuestring)) { - g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; + g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } else { printf("ERROR: failed to read json, subscribe keepProgress error\n"); goto PARSE_OVER; } } else { - g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; + g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } // sqls - cJSON* subsqls = cJSON_GetObjectItem(subQuery, "sqls"); + cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls"); if (!subsqls) { - g_queryInfo.subQueryInfo.sqlCount = 0; + g_queryInfo.superQueryInfo.sqlCount = 0; } else if (subsqls->type != cJSON_Array) { printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; @@ -3981,7 +4003,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { goto PARSE_OVER; } - g_queryInfo.subQueryInfo.sqlCount = superSqlSize; + g_queryInfo.superQueryInfo.sqlCount = superSqlSize; for (int j = 0; j < superSqlSize; ++j) { cJSON* sql = cJSON_GetArrayItem(subsqls, j); if (sql == NULL) continue; @@ -3991,13 +4013,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { printf("ERROR: failed to read json, sql not found\n"); goto PARSE_OVER; } - tstrncpy(g_queryInfo.subQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); cJSON *result = cJSON_GetObjectItem(sql, "result"); if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ - tstrncpy(g_queryInfo.subQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); + tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); } else if (NULL == result) { - memset(g_queryInfo.subQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); + memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); } else { printf("ERROR: failed to read json, sub query result file not found\n"); goto PARSE_OVER; @@ -5494,32 +5516,32 @@ static void *superQueryProcess(void *sarg) { int64_t st = 0; int64_t et = 0; - int queryTimes = g_args.query_times; + int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; while(queryTimes --) { - if (g_queryInfo.superQueryInfo.rate && (et - st) < - (int64_t)g_queryInfo.superQueryInfo.rate*1000) { - taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms + if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < + (int64_t)g_queryInfo.specifiedQueryInfo.rate*1000) { + taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); } st = taosGetTimestampUs(); - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { int64_t t1 = taosGetTimestampUs(); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[i][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); } - selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); + selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile); int64_t t2 = taosGetTimestampUs(); printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); } else { int64_t t1 = taosGetTimestampUs(); int retCode = postProceSql(g_queryInfo.host, - g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); + g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]); int64_t t2 = taosGetTimestampUs(); printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); @@ -5542,7 +5564,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char subTblName[MAX_TB_NAME_SIZE*3]; sprintf(subTblName, "%s.%s", g_queryInfo.dbName, - g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); + g_queryInfo.superQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); //printf("inSql: %s\n", inSql); @@ -5580,25 +5602,26 @@ static void *subQueryProcess(void *sarg) { } int64_t st = 0; - int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; - int queryTimes = g_args.query_times; + int64_t et = (int64_t)g_queryInfo.superQueryInfo.rate*1000; + + int queryTimes = g_queryInfo.superQueryInfo.queryTimes; while(queryTimes --) { - if (g_queryInfo.subQueryInfo.rate - && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { - taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms + if (g_queryInfo.superQueryInfo.rate + && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) { + taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); } st = taosGetTimestampUs(); for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { + for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); - replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); + replaceSubTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.subQueryInfo.result[j][0] != 0) { + if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.subQueryInfo.result[j], + g_queryInfo.superQueryInfo.result[j], winfo->threadID); } selectAndGetResult(winfo->taos, sqlstr, tmpFile); @@ -5633,12 +5656,12 @@ static int queryTestProcess() { exit(-1); } - if (0 != g_queryInfo.subQueryInfo.sqlCount) { + if (0 != g_queryInfo.superQueryInfo.sqlCount) { getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, - g_queryInfo.subQueryInfo.sTblName, - &g_queryInfo.subQueryInfo.childTblName, - &g_queryInfo.subQueryInfo.childTblCount); + g_queryInfo.superQueryInfo.sTblName, + &g_queryInfo.superQueryInfo.childTblName, + &g_queryInfo.superQueryInfo.childTblCount); } if (!g_args.answer_yes) { @@ -5651,22 +5674,22 @@ static int queryTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from specify table - if (g_queryInfo.superQueryInfo.sqlCount > 0 - && g_queryInfo.superQueryInfo.concurrent > 0) { + if (g_queryInfo.specifiedQueryInfo.sqlCount > 0 + && g_queryInfo.specifiedQueryInfo.concurrent > 0) { - pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); + pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); if (NULL == pids) { taos_close(taos); ERROR_EXIT("memory allocation failed\n"); } - infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); + infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); if (NULL == infos) { taos_close(taos); free(pids); ERROR_EXIT("memory allocation failed for create threads\n"); } - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -5690,7 +5713,7 @@ static int queryTestProcess() { pthread_create(pids + i, NULL, superQueryProcess, t_info); } } else { - g_queryInfo.superQueryInfo.concurrent = 0; + g_queryInfo.specifiedQueryInfo.concurrent = 0; } taos_close(taos); @@ -5698,9 +5721,9 @@ static int queryTestProcess() { pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; //==== create sub threads for query from all sub table of the super table - if ((g_queryInfo.subQueryInfo.sqlCount > 0) - && (g_queryInfo.subQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); + if ((g_queryInfo.superQueryInfo.sqlCount > 0) + && (g_queryInfo.superQueryInfo.threadCnt > 0)) { + pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); if (NULL == pidsOfSub) { free(infos); free(pids); @@ -5708,7 +5731,7 @@ static int queryTestProcess() { ERROR_EXIT("memory allocation failed for create threads\n"); } - infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); + infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); if (NULL == infosOfSub) { free(pidsOfSub); free(infos); @@ -5716,8 +5739,8 @@ static int queryTestProcess() { ERROR_EXIT("memory allocation failed for create threads\n"); } - int ntables = g_queryInfo.subQueryInfo.childTblCount; - int threads = g_queryInfo.subQueryInfo.threadCnt; + int ntables = g_queryInfo.superQueryInfo.childTblCount; + int threads = g_queryInfo.superQueryInfo.threadCnt; int a = ntables / threads; if (a < 1) { @@ -5743,19 +5766,19 @@ static int queryTestProcess() { pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); } - g_queryInfo.subQueryInfo.threadCnt = threads; + g_queryInfo.superQueryInfo.threadCnt = threads; } else { - g_queryInfo.subQueryInfo.threadCnt = 0; + g_queryInfo.superQueryInfo.threadCnt = 0; } - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); } tmfree((char*)pids); tmfree((char*)infos); - for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { + for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); } @@ -5780,14 +5803,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { TAOS_SUB* tsub = NULL; - if (g_queryInfo.superQueryInfo.subscribeMode) { + if (g_queryInfo.specifiedQueryInfo.subscribeMode) { tsub = taos_subscribe(taos, - g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeRestart, topic, sql, subscribe_callback, (void*)resultFileName, - g_queryInfo.superQueryInfo.subscribeInterval); + g_queryInfo.specifiedQueryInfo.subscribeInterval); } else { tsub = taos_subscribe(taos, - g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeRestart, topic, sql, NULL, NULL, 0); } @@ -5830,25 +5853,25 @@ static void *subSubscribeProcess(void *sarg) { //int64_t st = 0; //int64_t et = 0; do { - //if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { - // taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms + //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) { + // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //} //st = taosGetTimestampMs(); char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%d", i); memset(subSqlstr,0,sizeof(subSqlstr)); - replaceSubTblName(g_queryInfo.subQueryInfo.sql[i], subSqlstr, i); + replaceSubTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.subQueryInfo.result[i][0] != 0) { + if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.subQueryInfo.result[i], winfo->threadID); + g_queryInfo.superQueryInfo.result[i], winfo->threadID); } - g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl( + g_queryInfo.superQueryInfo.tsub[i] = subscribeImpl( winfo->taos, subSqlstr, topic, tmpFile); - if (NULL == g_queryInfo.subQueryInfo.tsub[i]) { + if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { taos_close(winfo->taos); return NULL; } @@ -5860,17 +5883,17 @@ static void *subSubscribeProcess(void *sarg) { // start loop to consume result TAOS_RES* res = NULL; while (1) { - for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { - if (1 == g_queryInfo.subQueryInfo.subscribeMode) { + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + if (1 == g_queryInfo.superQueryInfo.subscribeMode) { continue; } - res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]); + res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.subQueryInfo.result[i][0] != 0) { + if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.subQueryInfo.result[i], + g_queryInfo.superQueryInfo.result[i], winfo->threadID); } getResult(res, tmpFile); @@ -5879,9 +5902,9 @@ static void *subSubscribeProcess(void *sarg) { } taos_free_result(res); - for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { - taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], - g_queryInfo.subQueryInfo.subscribeKeepProgress); + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], + g_queryInfo.superQueryInfo.subscribeKeepProgress); } taos_close(winfo->taos); @@ -5918,25 +5941,25 @@ static void *superSubscribeProcess(void *sarg) { //int64_t st = 0; //int64_t et = 0; do { - //if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { - // taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms + //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) { + // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //} //st = taosGetTimestampMs(); char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%d", i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.subQueryInfo.result[i][0] != 0) { + if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); } - g_queryInfo.superQueryInfo.tsub[i] = + g_queryInfo.specifiedQueryInfo.tsub[i] = subscribeImpl(winfo->taos, - g_queryInfo.superQueryInfo.sql[i], + g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); - if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { + if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) { taos_close(winfo->taos); return NULL; } @@ -5948,17 +5971,17 @@ static void *superSubscribeProcess(void *sarg) { // start loop to consume result TAOS_RES* res = NULL; while (1) { - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - if (1 == g_queryInfo.superQueryInfo.subscribeMode) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) { continue; } - res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); + res = taos_consume(g_queryInfo.specifiedQueryInfo.tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[i][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); } getResult(res, tmpFile); } @@ -5966,9 +5989,9 @@ static void *superSubscribeProcess(void *sarg) { } taos_free_result(res); - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], - g_queryInfo.superQueryInfo.subscribeKeepProgress); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[i], + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); } taos_close(winfo->taos); @@ -5997,12 +6020,12 @@ static int subscribeTestProcess() { exit(-1); } - if (0 != g_queryInfo.subQueryInfo.sqlCount) { + if (0 != g_queryInfo.superQueryInfo.sqlCount) { getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, - g_queryInfo.subQueryInfo.sTblName, - &g_queryInfo.subQueryInfo.childTblName, - &g_queryInfo.subQueryInfo.childTblCount); + g_queryInfo.superQueryInfo.sTblName, + &g_queryInfo.superQueryInfo.childTblName, + &g_queryInfo.superQueryInfo.childTblCount); } taos_close(taos); // TODO: workaround to use separate taos connection; @@ -6010,22 +6033,22 @@ static int subscribeTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from super table - if ((g_queryInfo.superQueryInfo.sqlCount <= 0) || - (g_queryInfo.superQueryInfo.concurrent <= 0)) { + if ((g_queryInfo.specifiedQueryInfo.sqlCount <= 0) || + (g_queryInfo.specifiedQueryInfo.concurrent <= 0)) { errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", - __func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount, - g_queryInfo.superQueryInfo.concurrent); + __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount, + g_queryInfo.specifiedQueryInfo.concurrent); exit(-1); } - pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); - infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); + pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); + infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); exit(-1); } - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; t_info->taos = NULL; // TODO: workaround to use separate taos connection; @@ -6035,11 +6058,11 @@ static int subscribeTestProcess() { //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; - if ((g_queryInfo.subQueryInfo.sqlCount > 0) - && (g_queryInfo.subQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + if ((g_queryInfo.superQueryInfo.sqlCount > 0) + && (g_queryInfo.superQueryInfo.threadCnt > 0)) { + pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); - infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { errorPrint("%s() LN%d, malloc failed for create threads\n", @@ -6048,8 +6071,8 @@ static int subscribeTestProcess() { exit(-1); } - int ntables = g_queryInfo.subQueryInfo.childTblCount; - int threads = g_queryInfo.subQueryInfo.threadCnt; + int ntables = g_queryInfo.superQueryInfo.childTblCount; + int threads = g_queryInfo.superQueryInfo.threadCnt; int a = ntables / threads; if (a < 1) { @@ -6075,14 +6098,14 @@ static int subscribeTestProcess() { pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); } - g_queryInfo.subQueryInfo.threadCnt = threads; + g_queryInfo.superQueryInfo.threadCnt = threads; - for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { + for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); } } - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); }