From 0c4075e09fa0c2e40efae77b9ded0a314ad657e2 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 26 May 2021 21:02:14 +0800 Subject: [PATCH] [TD-4533]: taosdemo resub if resubAfterConsume != -1 (#6243) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 72 +++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 54dd68de44..29a1f7f1f9 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -364,7 +364,7 @@ typedef struct SDbs_S { typedef struct SpecifiedQueryInfo_S { uint64_t queryInterval; // 0: unlimit > 0 loop/s uint32_t concurrent; - uint64_t sqlCount; + int sqlCount; uint32_t asyncMode; // 0: sync, 1: async uint64_t subscribeInterval; // ms uint64_t queryTimes; @@ -373,6 +373,7 @@ typedef struct SpecifiedQueryInfo_S { char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume[MAX_QUERY_SQL_COUNT]; + int endAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char topic[MAX_QUERY_SQL_COUNT][32]; int consumed[MAX_QUERY_SQL_COUNT]; @@ -391,10 +392,11 @@ typedef struct SuperQueryInfo_S { uint64_t queryTimes; int64_t childTblCount; char childTblPrefix[MAX_TB_NAME_SIZE]; - uint64_t sqlCount; + int sqlCount; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume; + int endAfterConsume; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; @@ -1717,7 +1719,7 @@ static void printfQueryMeta() { if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) { printf("specified table query info: \n"); - printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", + printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) { printf("specified tbl query times:\n"); @@ -1737,15 +1739,15 @@ static void printfQueryMeta() { printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]); } printf("\n"); } printf("super table query info:\n"); - printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", + printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); if (g_queryInfo.superQueryInfo.sqlCount > 0) { @@ -4197,7 +4199,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (concurrent && concurrent->type == cJSON_Number) { if (concurrent->valueint <= 0) { errorPrint( - "%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n", + "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.concurrent); @@ -4296,6 +4298,17 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + cJSON* endAfterConsume = + cJSON_GetObjectItem(specifiedQuery, "endAfterConsume"); + if (endAfterConsume + && endAfterConsume->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.endAfterConsume[j] + = endAfterConsume->valueint; + } else if (!endAfterConsume) { + // default value is -1, which mean infinite loop + g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; + } + cJSON* resubAfterConsume = cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); if (resubAfterConsume @@ -4303,9 +4316,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = resubAfterConsume->valueint; } else if (!resubAfterConsume) { - //printf("failed to read json, subscribe interval no found\n"); - //goto PARSE_OVER; - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1; + // default value is -1, which mean do not resub + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; } cJSON *result = cJSON_GetObjectItem(sql, "result"); @@ -4449,16 +4461,26 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } + cJSON* superEndAfterConsume = + cJSON_GetObjectItem(superQuery, "endAfterConsume"); + if (superEndAfterConsume + && superEndAfterConsume->type == cJSON_Number) { + g_queryInfo.superQueryInfo.endAfterConsume = + superEndAfterConsume->valueint; + } else if (!superEndAfterConsume) { + // default value is -1, which mean do not resub + g_queryInfo.superQueryInfo.endAfterConsume = -1; + } + cJSON* superResubAfterConsume = - cJSON_GetObjectItem(superQuery, "resubAfterConsume"); + cJSON_GetObjectItem(superQuery, "endAfterConsume"); if (superResubAfterConsume && superResubAfterConsume->type == cJSON_Number) { - g_queryInfo.superQueryInfo.resubAfterConsume = + g_queryInfo.superQueryInfo.endAfterConsume = superResubAfterConsume->valueint; } else if (!superResubAfterConsume) { - //printf("failed to read json, subscribe interval no found\n"); - ////goto PARSE_OVER; - g_queryInfo.superQueryInfo.resubAfterConsume = 1; + // default value is -1, which mean do not resub + g_queryInfo.superQueryInfo.endAfterConsume = -1; } // supert table sqls @@ -6679,7 +6701,10 @@ static void *superSubscribe(void *sarg) { uint64_t st = 0, et = 0; - while(1) { + while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) + || (g_queryInfo.superQueryInfo.endAfterConsume < + consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) { + for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { tsubSeq = i - pThreadInfo->start_table_from; @@ -6708,7 +6733,7 @@ static void *superSubscribe(void *sarg) { } consumed[tsubSeq] ++; - if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) + if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) && (consumed[tsubSeq] >= g_queryInfo.superQueryInfo.resubAfterConsume)) { printf("keepProgress:%d, resub super table query: %"PRIu64"\n", @@ -6790,7 +6815,10 @@ static void *specifiedSubscribe(void *sarg) { // start loop to consume result g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; - while(1) { + while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1) + || (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] < + g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) { + if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } @@ -6806,7 +6834,7 @@ static void *specifiedSubscribe(void *sarg) { } g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; - if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) + if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1) && (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >= g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { printf("keepProgress:%d, resub specified query: %"PRIu64"\n", @@ -6873,12 +6901,12 @@ static int subscribeTestProcess() { //==== create threads for query for specified table if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { - debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + debugPrint("%s() LN%d, sepcified query sqlCount %d.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); } else { if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) { - errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + errorPrint("%s() LN%d, sepcified query sqlCount %d.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); exit(-1); @@ -6911,7 +6939,7 @@ static int subscribeTestProcess() { //==== create threads for super table query if (g_queryInfo.superQueryInfo.sqlCount <= 0) { - debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n", + debugPrint("%s() LN%d, super table query sqlCount %d.\n", __func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount); } else { -- GitLab