From 95139cf4a1c03cd6f9bb60fe0daca48bbd84e4e9 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 11 May 2021 21:36:48 +0800 Subject: [PATCH] [TD-4130]: taosdemo subscribe super table. (#6086) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 134 +++++++++++++++++------------------- 1 file changed, 63 insertions(+), 71 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index a3a8968017..8866bf2607 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -68,12 +68,6 @@ enum TEST_MODE { INVAID_TEST }; -enum QUERY_MODE { - SYNC_QUERY_MODE, // 0 - ASYNC_QUERY_MODE, // 1 - INVALID_MODE -}; - #define MAX_RECORDS_PER_REQ 32766 #define MAX_SQL_SIZE 65536 @@ -1107,6 +1101,7 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile) } } + fprintf(fp, "%s", resultBuf); tmfclose(fp); } @@ -1142,6 +1137,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { totalLen += len; } + verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile); appendResultBufToFile(databuf, resultFile); free(databuf); } @@ -6517,59 +6513,63 @@ static void *superSubscribe(void *sarg) { return NULL; } - //int64_t st = 0; - //int64_t et = 0; - do { - //if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) { - // taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms - // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); - //} - - //st = taosGetTimestampMs(); - char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - sprintf(topic, "taosdemo-subscribe-%d", i); + char topic[32] = {0}; + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { + for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { + sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j); memset(subSqlstr,0,sizeof(subSqlstr)); - replaceChildTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i); + replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[i][0] != 0) { + if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], pThreadInfo->threadID); + g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - tsub[i] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); - if (NULL == tsub[i]) { + + uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", + __func__, __LINE__, subSeq, subSqlstr); + tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); + if (NULL == tsub[subSeq]) { taos_close(pThreadInfo->taos); return NULL; } } - //et = taosGetTimestampMs(); - //printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); - } while(0); + } // start loop to consume result TAOS_RES* res = NULL; while(1) { - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { - continue; - } + for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { + if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + continue; + } - res = taos_consume(tsub[i]); - if (res) { - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], + uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + taosMsleep(100); // ms + res = taos_consume(tsub[subSeq]); + if (res) { + char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; + if (g_queryInfo.superQueryInfo.result[j][0] != 0) { + sprintf(tmpFile, "%s-%d", + g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); - appendResultToFile(res, tmpFile); + appendResultToFile(res, tmpFile); + } } } } } taos_free_result(res); - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { + for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { + uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + taos_unsubscribe(tsub[subSeq], + g_queryInfo.superQueryInfo.subscribeKeepProgress); + } } taos_close(pThreadInfo->taos); @@ -6607,17 +6607,8 @@ static void *specifiedSubscribe(void *sarg) { return NULL; } - //int64_t st = 0; - //int64_t et = 0; - do { - //if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) { - // taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms - // //printf("========sleep duration:%"PRIu64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); - //} - - //st = taosGetTimestampMs(); - char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + char topic[32] = {0}; + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%d", i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { @@ -6630,11 +6621,7 @@ static void *specifiedSubscribe(void *sarg) { taos_close(pThreadInfo->taos); return NULL; } - } - //et = taosGetTimestampMs(); - //printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); - } while(0); - + } // start loop to consume result TAOS_RES* res = NULL; while(1) { @@ -6643,6 +6630,7 @@ static void *specifiedSubscribe(void *sarg) { continue; } + taosMsleep(1000); // ms res = taos_consume(tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; @@ -6699,31 +6687,35 @@ static int subscribeTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; - //==== create sub threads for query from super table - if ((g_queryInfo.specifiedQueryInfo.sqlCount <= 0) || - (g_queryInfo.specifiedQueryInfo.concurrent <= 0)) { - errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n", + //==== create sub threads for query for specified table + if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { + printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, - g_queryInfo.specifiedQueryInfo.sqlCount, - g_queryInfo.specifiedQueryInfo.concurrent); - exit(-1); - } + g_queryInfo.specifiedQueryInfo.sqlCount); + } else { + if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) { + errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + __func__, __LINE__, + g_queryInfo.specifiedQueryInfo.sqlCount); + exit(-1); + } - pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); - infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); - if ((NULL == pids) || (NULL == infos)) { - errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); - exit(-1); - } + pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); + infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); + if ((NULL == pids) || (NULL == infos)) { + errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); + exit(-1); + } - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; t_info->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pids + i, NULL, specifiedSubscribe, t_info); + } } - //==== create sub threads for query from sub table + //==== create sub threads for super table query pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; if ((g_queryInfo.superQueryInfo.sqlCount > 0) -- GitLab