From 4e32f1cc72bf39ee432541935abbeea8adab8475 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 20 May 2021 09:23:10 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 4238 taosdemo async for develop (#6164) * [TD-4238]: taosdemo async sub. for develop branch. * [TD-4238]: taosdemo async subscribe subsribe sql command do not use aggregation functions. * cherry pick f5e4cdd1f7cffc4ee04fc7d043e4841bd1dd9961 * [TD-4238]: taosdemo async subscribe. fix super table sub result file. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/async-sub.json | 41 +++ src/kit/taosdemo/taosdemo.c | 475 ++++++++++++++++++-------------- 2 files changed, 303 insertions(+), 213 deletions(-) create mode 100644 src/kit/taosdemo/async-sub.json diff --git a/src/kit/taosdemo/async-sub.json b/src/kit/taosdemo/async-sub.json new file mode 100644 index 0000000000..a30a1be45c --- /dev/null +++ b/src/kit/taosdemo/async-sub.json @@ -0,0 +1,41 @@ +{ + "filetype": "subscribe", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "databases": "test", + "specified_table_query": { + "concurrent": 1, + "mode": "async", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "resubAfterConsume": 10, + "sqls": [ + { + "sql": "select col1 from meters where col1 > 1;", + "result": "./subscribe_res0.txt" + }, + { + "sql": "select col2 from meters where col2 > 1;", + "result": "./subscribe_res2.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "threads": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "sqls": [ + { + "sql": "select col1 from xxxx where col1 > 10;", + "result": "./subscribe_res1.txt" + } + ] + } +} diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 95fb2f7bd4..068aa52283 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -114,7 +114,7 @@ typedef enum TALBE_EXISTS_EN { TBL_EXISTS_BUTT } TALBE_EXISTS_EN; -enum MODE { +enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT @@ -127,6 +127,12 @@ enum enum_TAOS_INTERFACE { INTERFACE_BUT }; +typedef enum enumQUERY_CLASS { + SPECIFIED_CLASS, + STABLE_CLASS, + CLASS_BUT +} QUERY_CLASS; + typedef enum enum_PROGRESSIVE_OR_INTERLACE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, @@ -451,8 +457,9 @@ typedef struct SThreadInfo_S { uint64_t maxDelay; uint64_t minDelay; - // query + // seq of query or subscribe uint64_t querySeq; // sequence number of sql command + } threadInfo; #ifdef WINDOWS @@ -1179,7 +1186,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { } static void selectAndGetResult( - threadInfo *pThreadInfo, char *command, char* resultFile) { + threadInfo *pThreadInfo, char *command) +{ if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { TAOS_RES *res = taos_query(pThreadInfo->taos, command); if (res == NULL || taos_errno(res) != 0) { @@ -1189,8 +1197,8 @@ static void selectAndGetResult( return; } - if ((resultFile) && (strlen(resultFile))) { - appendResultToFile(res, resultFile); + if ((strlen(pThreadInfo->fp))) { + appendResultToFile(res, pThreadInfo->fp); } taos_free_result(res); @@ -1198,7 +1206,7 @@ static void selectAndGetResult( int retCode = postProceSql( g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, command, - resultFile); + pThreadInfo->fp); if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); } @@ -6255,23 +6263,22 @@ static void *specifiedTableQuery(void *sarg) { uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs(); + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + } + while(queryTimes --) { if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) { taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms } - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - } - st = taosGetTimestampMs(); selectAndGetResult(pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]); et = taosGetTimestampMs(); printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n", @@ -6357,13 +6364,12 @@ static void *superTableQuery(void *sarg) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", + sprintf(pThreadInfo->fp, "%s-%d", g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - selectAndGetResult(pThreadInfo, sqlstr, tmpFile); + selectAndGetResult(pThreadInfo, sqlstr); totalQueried++; g_queryInfo.superQueryInfo.totalQueried ++; @@ -6435,7 +6441,7 @@ static int queryTestProcess() { threadInfo *infos = NULL; //==== create sub threads for query from specify table int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; - int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; + uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; uint64_t startTs = taosGetTimestampMs(); @@ -6449,32 +6455,33 @@ static int queryTestProcess() { ERROR_EXIT("memory allocation failed for create threads\n"); } - for (int i = 0; i < nConcurrent; i++) { - for (int j = 0; j < nSqlCount; j++) { - threadInfo *t_info = infos + i * nSqlCount + j; - t_info->threadID = i * nSqlCount + j; - t_info->querySeq = j; + for (uint64_t i = 0; i < nSqlCount; i++) { + for (int j = 0; j < nConcurrent; j++) { + uint64_t seq = i * nConcurrent + j; + threadInfo *t_info = infos + seq; + t_info->threadID = seq; + t_info->querySeq = i; + + if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { + + char sqlStr[MAX_TB_NAME_SIZE*2]; + sprintf(sqlStr, "use %s", g_queryInfo.dbName); + verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(taos); + free(infos); + free(pids); + errorPrint( "use database %s failed!\n\n", + g_queryInfo.dbName); + return -1; + } + } - if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { + t_info->taos = NULL;// TODO: workaround to use separate taos connection; - char sqlStr[MAX_TB_NAME_SIZE*2]; - sprintf(sqlStr, "use %s", g_queryInfo.dbName); - verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); - if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(taos); - free(infos); - free(pids); - errorPrint( "use database %s failed!\n\n", - g_queryInfo.dbName); - return -1; - } + pthread_create(pids + seq, NULL, specifiedTableQuery, + t_info); } - - t_info->taos = NULL;// TODO: workaround to use separate taos connection; - - pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery, - t_info); - } } } else { g_queryInfo.specifiedQueryInfo.concurrent = 0; @@ -6559,7 +6566,21 @@ static int queryTestProcess() { return 0; } -static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { +static void stable_sub_callback( + TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { + if (res == NULL || taos_errno(res) != 0) { + errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", + __func__, __LINE__, code, taos_errstr(res)); + return; + } + + if (param) + appendResultToFile(res, ((threadInfo *)param)->fp); + // tao_unscribe() will free result. +} + +static void specified_sub_callback( + TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { if (res == NULL || taos_errno(res) != 0) { errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", __func__, __LINE__, code, taos_errstr(res)); @@ -6567,24 +6588,36 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c } if (param) - appendResultToFile(res, (char*)param); + appendResultToFile(res, ((threadInfo *)param)->fp); // tao_unscribe() will free result. } static TAOS_SUB* subscribeImpl( - TAOS *taos, char *sql, char* topic, bool restart, - char* resultFileName) { + QUERY_CLASS class, + threadInfo *pThreadInfo, + char *sql, char* topic, bool restart, uint64_t interval) +{ TAOS_SUB* tsub = NULL; - if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { - tsub = taos_subscribe(taos, + if ((SPECIFIED_CLASS == class) + && (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) { + tsub = taos_subscribe( + pThreadInfo->taos, restart, - topic, sql, subscribe_callback, (void*)resultFileName, + topic, sql, specified_sub_callback, (void*)pThreadInfo, g_queryInfo.specifiedQueryInfo.subscribeInterval); + } else if ((STABLE_CLASS == class) + && (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) { + tsub = taos_subscribe( + pThreadInfo->taos, + restart, + topic, sql, stable_sub_callback, (void*)pThreadInfo, + g_queryInfo.superQueryInfo.subscribeInterval); } else { - tsub = taos_subscribe(taos, + tsub = taos_subscribe( + pThreadInfo->taos, restart, - topic, sql, NULL, NULL, 0); + topic, sql, NULL, NULL, interval); } if (tsub == NULL) { @@ -6600,13 +6633,8 @@ static void *superSubscribe(void *sarg) { char subSqlstr[MAX_QUERY_SQL_LENGTH]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - if (g_queryInfo.superQueryInfo.sqlCount == 0) - return NULL; - - if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { - errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n", - g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables, - g_queryInfo.superQueryInfo.sqlCount, + if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); exit(-1); @@ -6637,91 +6665,92 @@ static void *superSubscribe(void *sarg) { return NULL; } - uint64_t subSeq; char topic[32] = {0}; for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { - sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j); - memset(subSqlstr,0,sizeof(subSqlstr)); - replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); - } - - subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", - __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[subSeq]) { + verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", + __func__, __LINE__, + pThreadInfo->threadID, + pThreadInfo->start_table_from, + pThreadInfo->end_table_to, i); + sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", + i, pThreadInfo->querySeq); + memset(subSqlstr, 0, sizeof(subSqlstr)); + replaceChildTblName( + g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], + subSqlstr, i); + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + } + + debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", + __func__, __LINE__, pThreadInfo->threadID, subSqlstr); + tsub[i] = subscribeImpl( + STABLE_CLASS, + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval); + if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } - } } - int consumed[MAX_QUERY_SQL_COUNT]; - for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) - consumed[i] = 0; - // start loop to consume result + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) { + consumed[i] = 0; + } TAOS_RES* res = NULL; + while(1) { - for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { continue; } - subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taosMsleep(100); // ms - res = taos_consume(tsub[subSeq]); + taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms + res = taos_consume(tsub[i]); if (res) { - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[j], + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - appendResultToFile(res, tmpFile); + appendResultToFile(res, pThreadInfo->fp); } - consumed[j] ++; + consumed[i] ++; if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) - && (consumed[j] >= - g_queryInfo.superQueryInfo.resubAfterConsume[j])) { - printf("keepProgress:%d, resub super table query: %d\n", - g_queryInfo.superQueryInfo.subscribeKeepProgress, j); - taos_unsubscribe(tsub[subSeq], + && (consumed[i] >= + g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { + printf("keepProgress:%d, resub super table query: %"PRIu64"\n", + g_queryInfo.superQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + taos_unsubscribe(tsub, g_queryInfo.superQueryInfo.subscribeKeepProgress); - consumed[j]= 0; - subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", - __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl( - pThreadInfo->taos, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[subSeq]) { + consumed[i]= 0; + tsub[i] = subscribeImpl( + STABLE_CLASS, + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval + ); + if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } } } - } } } taos_free_result(res); for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { - subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taos_unsubscribe(tsub[subSeq], 0); - } + taos_unsubscribe(tsub[i], 0); } taos_close(pThreadInfo->taos); @@ -6730,10 +6759,7 @@ static void *superSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - - if (g_queryInfo.specifiedQueryInfo.sqlCount == 0) - return NULL; + TAOS_SUB* tsub = NULL; if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; @@ -6760,74 +6786,64 @@ static void *specifiedSubscribe(void *sarg) { } char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - sprintf(topic, "taosdemo-subscribe-%d", i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); - } - tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, + sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq); + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + } + tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + topic, g_queryInfo.specifiedQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[i]) { - taos_close(pThreadInfo->taos); - return NULL; - } + g_queryInfo.specifiedQueryInfo.subscribeInterval); + if (NULL == tsub) { + taos_close(pThreadInfo->taos); + return NULL; } // start loop to consume result TAOS_RES* res = NULL; - int consumed[MAX_QUERY_SQL_COUNT]; - for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) - consumed[i] = 0; + int consumed; while(1) { - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } - taosMsleep(1000); // ms - res = taos_consume(tsub[i]); + taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms + res = taos_consume(tsub); if (res) { - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); - appendResultToFile(res, tmpFile); - } - consumed[i] ++; - - if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) - && (consumed[i] >= - g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { - printf("keepProgress:%d, resub specified query: %d\n", - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); - consumed[i] = 0; - taos_unsubscribe(tsub[i], - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[i]) { + consumed ++; + if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) + && (consumed >= + g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { + printf("keepProgress:%d, resub specified query: %"PRIu64"\n", + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + consumed = 0; + taos_unsubscribe(tsub, + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + tsub = subscribeImpl( + SPECIFIED_CLASS, + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); + if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; - } - } + } + } } - } } taos_free_result(res); - - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - taos_unsubscribe(tsub[i], 0); - } - + taos_unsubscribe(tsub, 0); taos_close(pThreadInfo->taos); + return NULL; } @@ -6865,9 +6881,13 @@ static int subscribeTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; - //==== create sub threads for query for specified table + + pthread_t *pidsOfStable = NULL; + threadInfo *infosOfStable = NULL; + + //==== create threads for query for specified table if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { - printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); } else { @@ -6878,80 +6898,109 @@ static int subscribeTestProcess() { exit(-1); } - pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); - infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); + pids = malloc( + g_queryInfo.specifiedQueryInfo.sqlCount * + g_queryInfo.specifiedQueryInfo.concurrent * + sizeof(pthread_t)); + infos = malloc( + g_queryInfo.specifiedQueryInfo.sqlCount * + g_queryInfo.specifiedQueryInfo.concurrent * + sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); exit(-1); } - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { - threadInfo *t_info = infos + i; - t_info->threadID = i; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pids + i, NULL, specifiedSubscribe, t_info); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { + uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; + threadInfo *t_info = infos + seq; + t_info->threadID = seq; + t_info->querySeq = i; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pids + seq, NULL, specifiedSubscribe, t_info); + } } } - //==== create sub threads for super table query - pthread_t *pidsOfSub = NULL; - threadInfo *infosOfSub = NULL; - if ((g_queryInfo.superQueryInfo.sqlCount > 0) + //==== create threads for super table query + if (g_queryInfo.superQueryInfo.sqlCount <= 0) { + printf("%s() LN%d, super table query sqlCount %"PRIu64".\n", + __func__, __LINE__, + g_queryInfo.superQueryInfo.sqlCount); + } else { + if ((g_queryInfo.superQueryInfo.sqlCount > 0) && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * + pidsOfStable = malloc( + g_queryInfo.superQueryInfo.sqlCount * + g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); - infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * + infosOfStable = malloc( + g_queryInfo.superQueryInfo.sqlCount * + g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); - if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { - errorPrint("%s() LN%d, malloc failed for create threads\n", + if ((NULL == pidsOfStable) || (NULL == infosOfStable)) { + errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); - // taos_close(taos); - exit(-1); - } - - int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; - int threads = g_queryInfo.superQueryInfo.threadCnt; + // taos_close(taos); + exit(-1); + } - int64_t a = ntables / threads; - if (a < 1) { - threads = ntables; - a = 1; - } + int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; + int threads = g_queryInfo.superQueryInfo.threadCnt; - int64_t b = 0; - if (threads != 0) { - b = ntables % threads; - } + int64_t a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } - uint64_t startFrom = 0; - for (int i = 0; i < threads; i++) { - threadInfo *t_info = infosOfSub + i; - t_info->threadID = i; + int64_t b = 0; + if (threads != 0) { + b = ntables % threads; + } - t_info->start_table_from = startFrom; - t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = t_info->end_table_to + 1; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info); - } + for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + uint64_t startFrom = 0; + for (int j = 0; j < threads; j++) { + uint64_t seq = i * threads + j; + threadInfo *t_info = infosOfStable + seq; + t_info->threadID = seq; + t_info->querySeq = i; + + t_info->start_table_from = startFrom; + t_info->ntables = jend_table_to = jend_table_to + 1; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pidsOfStable + seq, + NULL, superSubscribe, t_info); + } + } - g_queryInfo.superQueryInfo.threadCnt = threads; + g_queryInfo.superQueryInfo.threadCnt = threads; - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - pthread_join(pidsOfSub[i], NULL); + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + for (int j = 0; j < threads; j++) { + uint64_t seq = i * threads + j; + pthread_join(pidsOfStable[seq], NULL); + } + } } } - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { - pthread_join(pids[i], NULL); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { + uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; + pthread_join(pids[seq], NULL); + } } tmfree((char*)pids); tmfree((char*)infos); - tmfree((char*)pidsOfSub); - tmfree((char*)infosOfSub); + tmfree((char*)pidsOfStable); + tmfree((char*)infosOfStable); // taos_close(taos); return 0; } -- GitLab