diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index b2ac11b2cadc928e927b56010c7720e6cd9ef9fa..ec4c6d87736ad530eb093e0de430fb74cd1cc73a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -4408,14 +4408,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* resubAfterConsume = cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; if ((resubAfterConsume) && (resubAfterConsume->type == cJSON_Number) && (resubAfterConsume->valueint >= 0)) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = resubAfterConsume->valueint; - } else if (!resubAfterConsume) { - // default value is -1, which mean do not resub - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; } cJSON *result = cJSON_GetObjectItem(sql, "result"); @@ -4572,14 +4570,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* superResubAfterConsume = cJSON_GetObjectItem(superQuery, "resubAfterConsume"); + g_queryInfo.superQueryInfo.resubAfterConsume = -1; if ((superResubAfterConsume) && (superResubAfterConsume->type == cJSON_Number) && (superResubAfterConsume->valueint >= 0)) { g_queryInfo.superQueryInfo.resubAfterConsume = superResubAfterConsume->valueint; - } else if (!superResubAfterConsume) { - // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.resubAfterConsume = -1; } // supert table sqls @@ -7222,151 +7218,159 @@ static TAOS_SUB* subscribeImpl( } if (tsub == NULL) { - printf("failed to create subscription. topic:%s, sql:%s\n", topic, sql); - return NULL; + errorPrint("failed to create subscription. topic:%s, sql:%s\n", topic, sql); + return NULL; } return tsub; } static void *superSubscribe(void *sarg) { - threadInfo *pThreadInfo = (threadInfo *)sarg; - char subSqlstr[MAX_QUERY_SQL_LENGTH]; - TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - uint64_t tsubSeq; - - if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { - errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", - pThreadInfo->ntables, - MAX_QUERY_SQL_COUNT); - exit(-1); - } + threadInfo *pThreadInfo = (threadInfo *)sarg; + char subSqlstr[MAX_QUERY_SQL_LENGTH]; + TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; + uint64_t tsubSeq; + + if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", + pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); + exit(-1); + } - if (pThreadInfo->taos == NULL) { - pThreadInfo->taos = taos_connect(g_queryInfo.host, - g_queryInfo.user, - g_queryInfo.password, - g_queryInfo.dbName, - g_queryInfo.port); if (pThreadInfo->taos == NULL) { - errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", - pThreadInfo->threadID, taos_errstr(NULL)); - return NULL; + pThreadInfo->taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + g_queryInfo.dbName, + g_queryInfo.port); + if (pThreadInfo->taos == NULL) { + errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", + pThreadInfo->threadID, taos_errstr(NULL)); + return NULL; + } } - } - char sqlStr[MAX_TB_NAME_SIZE*2]; - sprintf(sqlStr, "use %s", g_queryInfo.dbName); - if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(pThreadInfo->taos); - errorPrint( "use database %s failed!\n\n", + char sqlStr[MAX_TB_NAME_SIZE*2]; + sprintf(sqlStr, "use %s", g_queryInfo.dbName); + if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(pThreadInfo->taos); + errorPrint( "use database %s failed!\n\n", g_queryInfo.dbName); - return NULL; - } + return NULL; + } - char topic[32] = {0}; - for (uint64_t i = pThreadInfo->start_table_from; - i <= pThreadInfo->end_table_to; i++) { + char topic[32] = {0}; + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { + tsubSeq = i - pThreadInfo->start_table_from; + verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", + __func__, __LINE__, + pThreadInfo->threadID, + pThreadInfo->start_table_from, + pThreadInfo->end_table_to, i); + sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", + i, pThreadInfo->querySeq); + memset(subSqlstr, 0, sizeof(subSqlstr)); + replaceChildTblName( + g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], + subSqlstr, i); + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->filePath, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + } - tsubSeq = i - pThreadInfo->start_table_from; - verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", - __func__, __LINE__, - pThreadInfo->threadID, - pThreadInfo->start_table_from, - pThreadInfo->end_table_to, i); - sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", - i, pThreadInfo->querySeq); - memset(subSqlstr, 0, sizeof(subSqlstr)); - replaceChildTblName( - g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], - subSqlstr, i); - if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->filePath, "%s-%d", - g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - } + debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", + __func__, __LINE__, pThreadInfo->threadID, subSqlstr); + tsub[tsubSeq] = subscribeImpl( + STABLE_CLASS, + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval); + if (NULL == tsub[tsubSeq]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } - debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", - __func__, __LINE__, pThreadInfo->threadID, subSqlstr); - tsub[tsubSeq] = subscribeImpl( - STABLE_CLASS, - pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - g_queryInfo.superQueryInfo.subscribeInterval); - if (NULL == tsub[tsubSeq]) { - taos_close(pThreadInfo->taos); - return NULL; - } - } + // start loop to consume result + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) { + consumed[i] = 0; + } + TAOS_RES* res = NULL; - // start loop to consume result - int consumed[MAX_QUERY_SQL_COUNT]; - for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) { - consumed[i] = 0; - } - TAOS_RES* res = NULL; + uint64_t st = 0, et = 0; - uint64_t st = 0, et = 0; + while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) + || (g_queryInfo.superQueryInfo.endAfterConsume > + consumed[pThreadInfo->end_table_to + - pThreadInfo->start_table_from])) { - while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) - || (g_queryInfo.superQueryInfo.endAfterConsume < - consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) { + verbosePrint("super endAfterConsume: %d, consumed: %d\n", + g_queryInfo.superQueryInfo.endAfterConsume, + consumed[pThreadInfo->end_table_to + - pThreadInfo->start_table_from]); + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { + tsubSeq = i - pThreadInfo->start_table_from; + if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + continue; + } + + st = taosGetTimestampMs(); + performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et)); + res = taos_consume(tsub[tsubSeq]); + et = taosGetTimestampMs(); + performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st)); + + if (res) { + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->filePath, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + fetchResult(res, pThreadInfo); + } + consumed[tsubSeq] ++; + + if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) + && (consumed[tsubSeq] >= + g_queryInfo.superQueryInfo.resubAfterConsume)) { + verbosePrint("%s() LN%d, keepProgress:%d, resub super table query: %"PRIu64"\n", + __func__, __LINE__, + g_queryInfo.superQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + taos_unsubscribe(tsub[tsubSeq], + g_queryInfo.superQueryInfo.subscribeKeepProgress); + consumed[tsubSeq]= 0; + tsub[tsubSeq] = subscribeImpl( + STABLE_CLASS, + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval + ); + if (NULL == tsub[tsubSeq]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } + } + } + } + verbosePrint("%s() LN%d, super endAfterConsume: %d, consumed: %d\n", + __func__, __LINE__, + g_queryInfo.superQueryInfo.endAfterConsume, + consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from]); + taos_free_result(res); for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - tsubSeq = i - pThreadInfo->start_table_from; - if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { - continue; - } - - st = taosGetTimestampMs(); - performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et)); - res = taos_consume(tsub[tsubSeq]); - et = taosGetTimestampMs(); - performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st)); - - if (res) { - if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->filePath, "%s-%d", - g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - fetchResult(res, pThreadInfo); - } - consumed[tsubSeq] ++; - - if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) - && (consumed[tsubSeq] >= - g_queryInfo.superQueryInfo.resubAfterConsume)) { - printf("keepProgress:%d, resub super table query: %"PRIu64"\n", - g_queryInfo.superQueryInfo.subscribeKeepProgress, - pThreadInfo->querySeq); - taos_unsubscribe(tsub[tsubSeq], - g_queryInfo.superQueryInfo.subscribeKeepProgress); - consumed[tsubSeq]= 0; - tsub[tsubSeq] = subscribeImpl( - STABLE_CLASS, - pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - g_queryInfo.superQueryInfo.subscribeInterval - ); - if (NULL == tsub[tsubSeq]) { - taos_close(pThreadInfo->taos); - return NULL; - } - } - } + tsubSeq = i - pThreadInfo->start_table_from; + taos_unsubscribe(tsub[tsubSeq], 0); } - } - taos_free_result(res); - for (uint64_t i = pThreadInfo->start_table_from; - i <= pThreadInfo->end_table_to; i++) { - tsubSeq = i - pThreadInfo->start_table_from; - taos_unsubscribe(tsub[tsubSeq], 0); - } - - taos_close(pThreadInfo->taos); - return NULL; + taos_close(pThreadInfo->taos); + return NULL; } static void *specifiedSubscribe(void *sarg) { @@ -7420,8 +7424,13 @@ static void *specifiedSubscribe(void *sarg) { || (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] < g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) { + printf("consumed[%d]: %d, endAfterConsum[%"PRId64"]: %d\n", + pThreadInfo->threadID, + g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID], + pThreadInfo->querySeq, + g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq]); if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { - continue; + continue; } g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(