未验证 提交 88f3e7aa 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4353]<fix>: taosdemo resub if resubAfterConsume != -1 (#6286)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 c5682598
...@@ -379,7 +379,7 @@ typedef struct SDbs_S { ...@@ -379,7 +379,7 @@ typedef struct SDbs_S {
typedef struct SpecifiedQueryInfo_S { typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t concurrent; uint32_t concurrent;
uint64_t sqlCount; int sqlCount;
uint32_t asyncMode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
uint64_t queryTimes; uint64_t queryTimes;
...@@ -388,6 +388,7 @@ typedef struct SpecifiedQueryInfo_S { ...@@ -388,6 +388,7 @@ typedef struct SpecifiedQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT]; int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32]; char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT]; int consumed[MAX_QUERY_SQL_COUNT];
...@@ -406,10 +407,11 @@ typedef struct SuperQueryInfo_S { ...@@ -406,10 +407,11 @@ typedef struct SuperQueryInfo_S {
uint64_t queryTimes; uint64_t queryTimes;
int64_t childTblCount; int64_t childTblCount;
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
uint64_t sqlCount; int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume; int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
...@@ -1773,7 +1775,7 @@ static void printfQueryMeta() { ...@@ -1773,7 +1775,7 @@ static void printfQueryMeta() {
if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) { if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) {
printf("specified table query info: \n"); printf("specified table query info: \n");
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) { if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) {
printf("specified tbl query times:\n"); printf("specified tbl query times:\n");
...@@ -1793,15 +1795,15 @@ static void printfQueryMeta() { ...@@ -1793,15 +1795,15 @@ static void printfQueryMeta() {
printf("keepProgress: \033[33m%d\033[0m\n", printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]); i, g_queryInfo.specifiedQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
} }
printf("super table query info:\n"); printf("super table query info:\n");
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.sqlCount); g_queryInfo.superQueryInfo.sqlCount);
if (g_queryInfo.superQueryInfo.sqlCount > 0) { if (g_queryInfo.superQueryInfo.sqlCount > 0) {
...@@ -4268,7 +4270,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4268,7 +4270,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (concurrent && concurrent->type == cJSON_Number) { if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) { if (concurrent->valueint <= 0) {
errorPrint( errorPrint(
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n", "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent); g_queryInfo.specifiedQueryInfo.concurrent);
...@@ -4367,6 +4369,17 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4367,6 +4369,17 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
} else if (!endAfterConsume) {
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
}
cJSON* resubAfterConsume = cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume if (resubAfterConsume
...@@ -4374,9 +4387,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4374,9 +4387,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint; = resubAfterConsume->valueint;
} else if (!resubAfterConsume) { } else if (!resubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n"); // default value is -1, which mean do not resub
//goto PARSE_OVER; g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
} }
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
...@@ -4520,16 +4532,26 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4520,16 +4532,26 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} }
cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
} else if (!superEndAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
}
cJSON* superResubAfterConsume = cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume"); cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superResubAfterConsume if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) { && superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume = g_queryInfo.superQueryInfo.endAfterConsume =
superResubAfterConsume->valueint; superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) { } else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n"); // default value is -1, which mean do not resub
////goto PARSE_OVER; g_queryInfo.superQueryInfo.endAfterConsume = -1;
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
} }
// supert table sqls // supert table sqls
...@@ -7250,7 +7272,10 @@ static void *superSubscribe(void *sarg) { ...@@ -7250,7 +7272,10 @@ static void *superSubscribe(void *sarg) {
uint64_t st = 0, et = 0; uint64_t st = 0, et = 0;
while(1) { while ((g_queryInfo.superQueryInfo.endAfterConsume == -1)
|| (g_queryInfo.superQueryInfo.endAfterConsume <
consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from; tsubSeq = i - pThreadInfo->start_table_from;
...@@ -7279,7 +7304,7 @@ static void *superSubscribe(void *sarg) { ...@@ -7279,7 +7304,7 @@ static void *superSubscribe(void *sarg) {
} }
consumed[tsubSeq] ++; consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1)
&& (consumed[tsubSeq] >= && (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume)) { g_queryInfo.superQueryInfo.resubAfterConsume)) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
...@@ -7361,7 +7386,10 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -7361,7 +7386,10 @@ static void *specifiedSubscribe(void *sarg) {
// start loop to consume result // start loop to consume result
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while(1) { while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1)
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
...@@ -7377,7 +7405,7 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -7377,7 +7405,7 @@ static void *specifiedSubscribe(void *sarg) {
} }
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1)
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >= && (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n", printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
...@@ -7444,12 +7472,12 @@ static int subscribeTestProcess() { ...@@ -7444,12 +7472,12 @@ static int subscribeTestProcess() {
//==== create threads for query for specified table //==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", debugPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
} else { } else {
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) { if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1); exit(-1);
...@@ -7482,7 +7510,7 @@ static int subscribeTestProcess() { ...@@ -7482,7 +7510,7 @@ static int subscribeTestProcess() {
//==== create threads for super table query //==== create threads for super table query
if (g_queryInfo.superQueryInfo.sqlCount <= 0) { if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n", debugPrint("%s() LN%d, super table query sqlCount %d.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount); g_queryInfo.superQueryInfo.sqlCount);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册