未验证 提交 92179b81 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3575 support stb query times (#5605)

* [TD-3575] <fix>: support query times in specified and super query.

refactor code.

* [TD-3575] <fix>: support query times in specified and super query.

refactor code 2.

* [TD-3575] <fix>: support query times in specified and super query.

refactor code 3.

* [TD-3575] <fix>: support query times in specified and super query.

add query times parse in sub sections.

* [TD-3575] <fix>: support query times in specified and super query.

replace query times with specified query or super query.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 f73f8a56
...@@ -348,20 +348,21 @@ typedef struct SDbs_S { ...@@ -348,20 +348,21 @@ typedef struct SDbs_S {
} SDbs; } SDbs;
typedef struct SuperQueryInfo_S { typedef struct SpecifiedQueryInfo_S {
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int concurrent; int concurrent;
int sqlCount; int sqlCount;
int subscribeMode; // 0: sync, 1: async int subscribeMode; // 0: sync, 1: async
int subscribeInterval; // ms int subscribeInterval; // ms
int queryTimes;
int subscribeRestart; int subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
} SuperQueryInfo; } SpecifiedQueryInfo;
typedef struct SubQueryInfo_S { typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int threadCnt; int threadCnt;
...@@ -369,6 +370,7 @@ typedef struct SubQueryInfo_S { ...@@ -369,6 +370,7 @@ typedef struct SubQueryInfo_S {
int subscribeInterval; // ms int subscribeInterval; // ms
int subscribeRestart; int subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
int queryTimes;
int childTblCount; int childTblCount;
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
int sqlCount; int sqlCount;
...@@ -377,7 +379,7 @@ typedef struct SubQueryInfo_S { ...@@ -377,7 +379,7 @@ typedef struct SubQueryInfo_S {
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
} SubQueryInfo; } SuperQueryInfo;
typedef struct SQueryMetaInfo_S { typedef struct SQueryMetaInfo_S {
char cfgDir[MAX_FILE_NAME_LEN+1]; char cfgDir[MAX_FILE_NAME_LEN+1];
...@@ -388,8 +390,8 @@ typedef struct SQueryMetaInfo_S { ...@@ -388,8 +390,8 @@ typedef struct SQueryMetaInfo_S {
char dbName[MAX_DB_NAME_SIZE+1]; char dbName[MAX_DB_NAME_SIZE+1];
char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo; SuperQueryInfo superQueryInfo;
SubQueryInfo subQueryInfo;
} SQueryMetaInfo; } SQueryMetaInfo;
typedef struct SThreadInfo_S { typedef struct SThreadInfo_S {
...@@ -1434,38 +1436,38 @@ static void printfQueryMeta() { ...@@ -1434,38 +1436,38 @@ static void printfQueryMeta() {
printf("\n"); printf("\n");
printf("specified table query info: \n"); printf("specified table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate);
printf("query times: \033[33m%d\033[0m\n", g_args.query_times); printf("query times: \033[33m%d\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent); printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
printf("super table query info: \n"); printf("super table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate);
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.threadCnt); printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt);
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount); printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName); printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeKeepProgress); printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.sqlCount); printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.subQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
...@@ -3761,85 +3763,95 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3761,85 +3763,95 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
// super_table_query // super_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "specified_table_query"); cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!superQuery) { if (!specifiedQuery) {
g_queryInfo.superQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
g_queryInfo.superQueryInfo.sqlCount = 0; g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superQuery->type != cJSON_Object) { } else if (specifiedQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, super_table_query not found\n"); printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval"); cJSON* rate = cJSON_GetObjectItem(specifiedQuery, "query_interval");
if (rate && rate->type == cJSON_Number) { if (rate && rate->type == cJSON_Number) {
g_queryInfo.superQueryInfo.rate = rate->valueint; g_queryInfo.specifiedQueryInfo.rate = rate->valueint;
} else if (!rate) { } else if (!rate) {
g_queryInfo.superQueryInfo.rate = 0; g_queryInfo.specifiedQueryInfo.rate = 0;
} }
cJSON* concurrent = cJSON_GetObjectItem(superQuery, "concurrent"); cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) { if (concurrent && concurrent->type == cJSON_Number) {
g_queryInfo.superQueryInfo.concurrent = concurrent->valueint; g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
} else if (!concurrent) { } else if (!concurrent) {
g_queryInfo.superQueryInfo.concurrent = 1; g_queryInfo.specifiedQueryInfo.concurrent = 1;
} }
cJSON* mode = cJSON_GetObjectItem(superQuery, "mode"); cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) { if (mode && mode->type == cJSON_String && mode->valuestring != NULL) {
if (0 == strcmp("sync", mode->valuestring)) { if (0 == strcmp("sync", mode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
} else if (0 == strcmp("async", mode->valuestring)) { } else if (0 == strcmp("async", mode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1; g_queryInfo.specifiedQueryInfo.subscribeMode = 1;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); printf("ERROR: failed to read json, subscribe mod error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
} }
cJSON* interval = cJSON_GetObjectItem(superQuery, "interval"); cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
if (interval && interval->type == cJSON_Number) { if (interval && interval->type == cJSON_Number) {
g_queryInfo.superQueryInfo.subscribeInterval = interval->valueint; g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint;
} else if (!interval) { } else if (!interval) {
//printf("failed to read json, subscribe interval no found\n"); //printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER; //goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000; g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000;
} }
cJSON* restart = cJSON_GetObjectItem(superQuery, "restart"); cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) { if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.specifiedQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", restart->valuestring)) { } else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 0; g_queryInfo.specifiedQueryInfo.subscribeRestart = 0;
} else { } else {
printf("ERROR: failed to read json, subscribe restart error\n"); printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.specifiedQueryInfo.subscribeRestart = 1;
} }
cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
if (keepProgress if (keepProgress
&& keepProgress->type == cJSON_String && keepProgress->type == cJSON_String
&& keepProgress->valuestring != NULL) { && keepProgress->valuestring != NULL) {
if (0 == strcmp("yes", keepProgress->valuestring)) { if (0 == strcmp("yes", keepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", keepProgress->valuestring)) { } else if (0 == strcmp("no", keepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
} else { } else {
printf("ERROR: failed to read json, subscribe keepProgress error\n"); printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
} }
// sqls // sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); cJSON* superSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!superSqls) { if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0; g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) { } else if (superSqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -3850,7 +3862,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3850,7 +3862,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
g_queryInfo.superQueryInfo.sqlCount = superSqlSize; g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) { for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j); cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue; if (sql == NULL) continue;
...@@ -3860,13 +3872,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3860,13 +3872,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
printf("ERROR: failed to read json, sql not found\n"); printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.specifiedQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, super query result file not found\n"); printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -3876,101 +3888,111 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3876,101 +3888,111 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
// sub_table_query // sub_table_query
cJSON *subQuery = cJSON_GetObjectItem(root, "super_table_query"); cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!subQuery) { if (!superQuery) {
g_queryInfo.subQueryInfo.threadCnt = 0; g_queryInfo.superQueryInfo.threadCnt = 0;
g_queryInfo.subQueryInfo.sqlCount = 0; g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subQuery->type != cJSON_Object) { } else if (superQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, sub_table_query not found\n"); printf("ERROR: failed to read json, sub_table_query not found\n");
ret = true; ret = true;
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
cJSON* subrate = cJSON_GetObjectItem(subQuery, "query_interval"); cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval");
if (subrate && subrate->type == cJSON_Number) { if (subrate && subrate->type == cJSON_Number) {
g_queryInfo.subQueryInfo.rate = subrate->valueint; g_queryInfo.superQueryInfo.rate = subrate->valueint;
} else if (!subrate) { } else if (!subrate) {
g_queryInfo.subQueryInfo.rate = 0; g_queryInfo.superQueryInfo.rate = 0;
} }
cJSON* threads = cJSON_GetObjectItem(subQuery, "threads"); cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
cJSON* threads = cJSON_GetObjectItem(superQuery, "threads");
if (threads && threads->type == cJSON_Number) { if (threads && threads->type == cJSON_Number) {
g_queryInfo.subQueryInfo.threadCnt = threads->valueint; g_queryInfo.superQueryInfo.threadCnt = threads->valueint;
} else if (!threads) { } else if (!threads) {
g_queryInfo.subQueryInfo.threadCnt = 1; g_queryInfo.superQueryInfo.threadCnt = 1;
} }
//cJSON* subTblCnt = cJSON_GetObjectItem(subQuery, "childtable_count"); //cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count");
//if (subTblCnt && subTblCnt->type == cJSON_Number) { //if (subTblCnt && subTblCnt->type == cJSON_Number) {
// g_queryInfo.subQueryInfo.childTblCount = subTblCnt->valueint; // g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint;
//} else if (!subTblCnt) { //} else if (!subTblCnt) {
// g_queryInfo.subQueryInfo.childTblCount = 0; // g_queryInfo.superQueryInfo.childTblCount = 0;
//} //}
cJSON* stblname = cJSON_GetObjectItem(subQuery, "stblname"); cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) { if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.subQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE);
} else { } else {
printf("ERROR: failed to read json, super table name not found\n"); printf("ERROR: failed to read json, super table name not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* submode = cJSON_GetObjectItem(subQuery, "mode"); cJSON* submode = cJSON_GetObjectItem(superQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { if (submode && submode->type == cJSON_String && submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) { if (0 == strcmp("sync", submode->valuestring)) {
g_queryInfo.subQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.subscribeMode = 0;
} else if (0 == strcmp("async", submode->valuestring)) { } else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.subQueryInfo.subscribeMode = 1; g_queryInfo.superQueryInfo.subscribeMode = 1;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); printf("ERROR: failed to read json, subscribe mod error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.subQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.subscribeMode = 0;
} }
cJSON* subinterval = cJSON_GetObjectItem(subQuery, "interval"); cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
if (subinterval && subinterval->type == cJSON_Number) { if (subinterval && subinterval->type == cJSON_Number) {
g_queryInfo.subQueryInfo.subscribeInterval = subinterval->valueint; g_queryInfo.superQueryInfo.subscribeInterval = subinterval->valueint;
} else if (!subinterval) { } else if (!subinterval) {
//printf("failed to read json, subscribe interval no found\n"); //printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER; //goto PARSE_OVER;
g_queryInfo.subQueryInfo.subscribeInterval = 10000; g_queryInfo.superQueryInfo.subscribeInterval = 10000;
} }
cJSON* subrestart = cJSON_GetObjectItem(subQuery, "restart"); cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) { if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.subQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", subrestart->valuestring)) { } else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.subQueryInfo.subscribeRestart = 0; g_queryInfo.superQueryInfo.subscribeRestart = 0;
} else { } else {
printf("ERROR: failed to read json, subscribe restart error\n"); printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.subQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = 1;
} }
cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress"); cJSON* subkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (subkeepProgress && if (subkeepProgress &&
subkeepProgress->type == cJSON_String subkeepProgress->type == cJSON_String
&& subkeepProgress->valuestring != NULL) { && subkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) { if (0 == strcmp("yes", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 1; g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", subkeepProgress->valuestring)) { } else if (0 == strcmp("no", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else { } else {
printf("ERROR: failed to read json, subscribe keepProgress error\n"); printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} }
// sqls // sqls
cJSON* subsqls = cJSON_GetObjectItem(subQuery, "sqls"); cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!subsqls) { if (!subsqls) {
g_queryInfo.subQueryInfo.sqlCount = 0; g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) { } else if (subsqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -3981,7 +4003,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3981,7 +4003,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
g_queryInfo.subQueryInfo.sqlCount = superSqlSize; g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) { for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(subsqls, j); cJSON* sql = cJSON_GetArrayItem(subsqls, j);
if (sql == NULL) continue; if (sql == NULL) continue;
...@@ -3991,13 +4013,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3991,13 +4013,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
printf("ERROR: failed to read json, sql not found\n"); printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.subQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){
tstrncpy(g_queryInfo.subQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.subQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, sub query result file not found\n"); printf("ERROR: failed to read json, sub query result file not found\n");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -5494,32 +5516,32 @@ static void *superQueryProcess(void *sarg) { ...@@ -5494,32 +5516,32 @@ static void *superQueryProcess(void *sarg) {
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0;
int queryTimes = g_args.query_times; int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.superQueryInfo.rate && (et - st) < if (g_queryInfo.specifiedQueryInfo.rate && (et - st) <
(int64_t)g_queryInfo.superQueryInfo.rate*1000) { (int64_t)g_queryInfo.specifiedQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
} }
st = taosGetTimestampUs(); st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else { } else {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
...@@ -5542,7 +5564,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { ...@@ -5542,7 +5564,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
char subTblName[MAX_TB_NAME_SIZE*3]; char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s", sprintf(subTblName, "%s.%s",
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); g_queryInfo.superQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
//printf("inSql: %s\n", inSql); //printf("inSql: %s\n", inSql);
...@@ -5580,25 +5602,26 @@ static void *subQueryProcess(void *sarg) { ...@@ -5580,25 +5602,26 @@ static void *subQueryProcess(void *sarg) {
} }
int64_t st = 0; int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; int64_t et = (int64_t)g_queryInfo.superQueryInfo.rate*1000;
int queryTimes = g_args.query_times;
int queryTimes = g_queryInfo.superQueryInfo.queryTimes;
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.subQueryInfo.rate if (g_queryInfo.superQueryInfo.rate
&& (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
} }
st = taosGetTimestampUs(); st = taosGetTimestampUs();
for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr)); memset(sqlstr,0,sizeof(sqlstr));
replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); replaceSubTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[j][0] != 0) { if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.subQueryInfo.result[j], g_queryInfo.superQueryInfo.result[j],
winfo->threadID); winfo->threadID);
} }
selectAndGetResult(winfo->taos, sqlstr, tmpFile); selectAndGetResult(winfo->taos, sqlstr, tmpFile);
...@@ -5633,12 +5656,12 @@ static int queryTestProcess() { ...@@ -5633,12 +5656,12 @@ static int queryTestProcess() {
exit(-1); exit(-1);
} }
if (0 != g_queryInfo.subQueryInfo.sqlCount) { if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos, getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName, g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.superQueryInfo.childTblCount);
} }
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
...@@ -5651,22 +5674,22 @@ static int queryTestProcess() { ...@@ -5651,22 +5674,22 @@ static int queryTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
if (g_queryInfo.superQueryInfo.sqlCount > 0 if (g_queryInfo.specifiedQueryInfo.sqlCount > 0
&& g_queryInfo.superQueryInfo.concurrent > 0) { && g_queryInfo.specifiedQueryInfo.concurrent > 0) {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
if (NULL == pids) { if (NULL == pids) {
taos_close(taos); taos_close(taos);
ERROR_EXIT("memory allocation failed\n"); ERROR_EXIT("memory allocation failed\n");
} }
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if (NULL == infos) { if (NULL == infos) {
taos_close(taos); taos_close(taos);
free(pids); free(pids);
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -5690,7 +5713,7 @@ static int queryTestProcess() { ...@@ -5690,7 +5713,7 @@ static int queryTestProcess() {
pthread_create(pids + i, NULL, superQueryProcess, t_info); pthread_create(pids + i, NULL, superQueryProcess, t_info);
} }
} else { } else {
g_queryInfo.superQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
} }
taos_close(taos); taos_close(taos);
...@@ -5698,9 +5721,9 @@ static int queryTestProcess() { ...@@ -5698,9 +5721,9 @@ static int queryTestProcess() {
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table //==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.subQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
if (NULL == pidsOfSub) { if (NULL == pidsOfSub) {
free(infos); free(infos);
free(pids); free(pids);
...@@ -5708,7 +5731,7 @@ static int queryTestProcess() { ...@@ -5708,7 +5731,7 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if (NULL == infosOfSub) { if (NULL == infosOfSub) {
free(pidsOfSub); free(pidsOfSub);
free(infos); free(infos);
...@@ -5716,8 +5739,8 @@ static int queryTestProcess() { ...@@ -5716,8 +5739,8 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
int ntables = g_queryInfo.subQueryInfo.childTblCount; int ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt; int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads; int a = ntables / threads;
if (a < 1) { if (a < 1) {
...@@ -5743,19 +5766,19 @@ static int queryTestProcess() { ...@@ -5743,19 +5766,19 @@ static int queryTestProcess() {
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info);
} }
g_queryInfo.subQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
} else { } else {
g_queryInfo.subQueryInfo.threadCnt = 0; g_queryInfo.superQueryInfo.threadCnt = 0;
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
tmfree((char*)pids); tmfree((char*)pids);
tmfree((char*)infos); tmfree((char*)infos);
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL); pthread_join(pidsOfSub[i], NULL);
} }
...@@ -5780,14 +5803,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -5780,14 +5803,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (g_queryInfo.superQueryInfo.subscribeMode) { if (g_queryInfo.specifiedQueryInfo.subscribeMode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else { } else {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, 0);
} }
...@@ -5830,25 +5853,25 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5830,25 +5853,25 @@ static void *subSubscribeProcess(void *sarg) {
//int64_t st = 0; //int64_t st = 0;
//int64_t et = 0; //int64_t et = 0;
do { do {
//if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) {
// taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
//} //}
//st = taosGetTimestampMs(); //st = taosGetTimestampMs();
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i); sprintf(topic, "taosdemo-subscribe-%d", i);
memset(subSqlstr,0,sizeof(subSqlstr)); memset(subSqlstr,0,sizeof(subSqlstr));
replaceSubTblName(g_queryInfo.subQueryInfo.sql[i], subSqlstr, i); replaceSubTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.subQueryInfo.result[i], winfo->threadID); g_queryInfo.superQueryInfo.result[i], winfo->threadID);
} }
g_queryInfo.subQueryInfo.tsub[i] = subscribeImpl( g_queryInfo.superQueryInfo.tsub[i] = subscribeImpl(
winfo->taos, subSqlstr, topic, tmpFile); winfo->taos, subSqlstr, topic, tmpFile);
if (NULL == g_queryInfo.subQueryInfo.tsub[i]) { if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
} }
...@@ -5860,17 +5883,17 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5860,17 +5883,17 @@ static void *subSubscribeProcess(void *sarg) {
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while (1) { while (1) {
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.subQueryInfo.subscribeMode) { if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
continue; continue;
} }
res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]); res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.subQueryInfo.result[i], g_queryInfo.superQueryInfo.result[i],
winfo->threadID); winfo->threadID);
} }
getResult(res, tmpFile); getResult(res, tmpFile);
...@@ -5879,9 +5902,9 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5879,9 +5902,9 @@ static void *subSubscribeProcess(void *sarg) {
} }
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i], taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
g_queryInfo.subQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);
...@@ -5918,25 +5941,25 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -5918,25 +5941,25 @@ static void *superSubscribeProcess(void *sarg) {
//int64_t st = 0; //int64_t st = 0;
//int64_t et = 0; //int64_t et = 0;
do { do {
//if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) {
// taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
//} //}
//st = taosGetTimestampMs(); //st = taosGetTimestampMs();
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i); sprintf(topic, "taosdemo-subscribe-%d", i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
g_queryInfo.superQueryInfo.tsub[i] = g_queryInfo.specifiedQueryInfo.tsub[i] =
subscribeImpl(winfo->taos, subscribeImpl(winfo->taos,
g_queryInfo.superQueryInfo.sql[i], g_queryInfo.specifiedQueryInfo.sql[i],
topic, tmpFile); topic, tmpFile);
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
} }
...@@ -5948,17 +5971,17 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -5948,17 +5971,17 @@ static void *superSubscribeProcess(void *sarg) {
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while (1) { while (1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.superQueryInfo.subscribeMode) { if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) {
continue; continue;
} }
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]); res = taos_consume(g_queryInfo.specifiedQueryInfo.tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
getResult(res, tmpFile); getResult(res, tmpFile);
} }
...@@ -5966,9 +5989,9 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -5966,9 +5989,9 @@ static void *superSubscribeProcess(void *sarg) {
} }
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[i],
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);
...@@ -5997,12 +6020,12 @@ static int subscribeTestProcess() { ...@@ -5997,12 +6020,12 @@ static int subscribeTestProcess() {
exit(-1); exit(-1);
} }
if (0 != g_queryInfo.subQueryInfo.sqlCount) { if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos, getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName, g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.superQueryInfo.childTblCount);
} }
taos_close(taos); // TODO: workaround to use separate taos connection; taos_close(taos); // TODO: workaround to use separate taos connection;
...@@ -6010,22 +6033,22 @@ static int subscribeTestProcess() { ...@@ -6010,22 +6033,22 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from super table //==== create sub threads for query from super table
if ((g_queryInfo.superQueryInfo.sqlCount <= 0) || if ((g_queryInfo.specifiedQueryInfo.sqlCount <= 0) ||
(g_queryInfo.superQueryInfo.concurrent <= 0)) { (g_queryInfo.specifiedQueryInfo.concurrent <= 0)) {
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount, __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.superQueryInfo.concurrent); g_queryInfo.specifiedQueryInfo.concurrent);
exit(-1); exit(-1);
} }
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
...@@ -6035,11 +6058,11 @@ static int subscribeTestProcess() { ...@@ -6035,11 +6058,11 @@ static int subscribeTestProcess() {
//==== create sub threads for query from sub table //==== create sub threads for query from sub table
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
if ((g_queryInfo.subQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t)); sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", errorPrint("%s() LN%d, malloc failed for create threads\n",
...@@ -6048,8 +6071,8 @@ static int subscribeTestProcess() { ...@@ -6048,8 +6071,8 @@ static int subscribeTestProcess() {
exit(-1); exit(-1);
} }
int ntables = g_queryInfo.subQueryInfo.childTblCount; int ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt; int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads; int a = ntables / threads;
if (a < 1) { if (a < 1) {
...@@ -6075,14 +6098,14 @@ static int subscribeTestProcess() { ...@@ -6075,14 +6098,14 @@ static int subscribeTestProcess() {
pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info);
} }
g_queryInfo.subQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL); pthread_join(pidsOfSub[i], NULL);
} }
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册