From d53e74bfc120a5f8d2cc8724177d84d8443c50da Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 13 Apr 2021 20:12:39 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 3401 query statistic (#5795) (#5802) * [TD-3401]: taosdemo query statistic. refactor func name. * [TD-3401]: taosdemo query statistic. refactor func name 2. * [TD-3401]: taosdemo support query statistic. implementation. * cleanup Co-authored-by: Shuduo Sang Co-authored-by: Shuduo Sang --- src/kit/taosdemo/query.json | 37 +++-- src/kit/taosdemo/taosdemo.c | 263 +++++++++++++++++++++--------------- 2 files changed, 181 insertions(+), 119 deletions(-) diff --git a/src/kit/taosdemo/query.json b/src/kit/taosdemo/query.json index 33ac120bda..d84f997c32 100644 --- a/src/kit/taosdemo/query.json +++ b/src/kit/taosdemo/query.json @@ -1,5 +1,5 @@ { - "filetype":"query", + "filetype": "query", "cfgdir": "/etc/taos", "host": "127.0.0.1", "port": 6030, @@ -7,13 +7,30 @@ "password": "taosdata", "confirm_parameter_prompt": "yes", "databases": "dbx", - "specified_table_query": - {"query_interval":1, "concurrent":4, - "sqls": [{"sql": "select last_row(*) from stb where color='red'", "result": "./query_res0.txt"}, - {"sql": "select count(*) from stb_01", "result": "./query_res1.txt"}] - }, - "super_table_query": - {"stblname": "stb", "query_interval":1, "threads":4, - "sqls": [{"sql": "select last_row(*) from xxxx", "result": "./query_res2.txt"}] - } + "query_times": 1, + "specified_table_query": { + "query_interval": 1, + "concurrent": 4, + "sqls": [ + { + "sql": "select last_row(*) from stb where color='red'", + "result": "./query_res0.txt" + }, + { + "sql": "select count(*) from stb_01", + "result": "./query_res1.txt" + } + ] + }, + "super_table_query": { + "stblname": "stb", + "query_interval": 1, + "threads": 4, + "sqls": [ + { + "sql": "select last_row(*) from xxxx", + "result": "./query_res2.txt" + } + ] + } } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 02a1ec2b6d..30b78a9b21 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -366,6 +366,7 @@ typedef struct SpecifiedQueryInfo_S { char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; + int totalQueried; } SpecifiedQueryInfo; typedef struct SuperQueryInfo_S { @@ -385,6 +386,7 @@ typedef struct SuperQueryInfo_S { TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; + int totalQueried; } SuperQueryInfo; typedef struct SQueryMetaInfo_S { @@ -398,6 +400,7 @@ typedef struct SQueryMetaInfo_S { SpecifiedQueryInfo specifiedQueryInfo; SuperQueryInfo superQueryInfo; + int totalQueried; } SQueryMetaInfo; typedef struct SThreadInfo_S { @@ -2602,8 +2605,8 @@ static int createDatabasesAndStables() { static void* createTable(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; + threadInfo *pThreadInfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; int64_t lastPrintTime = taosGetTimestampMs(); @@ -2621,15 +2624,15 @@ static void* createTable(void *sarg) verbosePrint("%s() LN%d: Creating table from %d to %d\n", __func__, __LINE__, - winfo->start_table_from, winfo->end_table_to); + pThreadInfo->start_table_from, pThreadInfo->end_table_to); - for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { + for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { if (0 == g_Dbs.use_metric) { snprintf(buffer, buff_len, "create table if not exists %s.%s%d %s;", - winfo->db_name, + pThreadInfo->db_name, g_args.tb_prefix, i, - winfo->cols); + pThreadInfo->cols); } else { if (superTblInfo == NULL) { errorPrint("%s() LN%d, use metric, but super table info is NULL\n", @@ -2658,8 +2661,8 @@ static void* createTable(void *sarg) len += snprintf(buffer + len, buff_len - len, "if not exists %s.%s%d using %s.%s tags %s ", - winfo->db_name, superTblInfo->childTblPrefix, - i, winfo->db_name, + pThreadInfo->db_name, superTblInfo->childTblPrefix, + i, pThreadInfo->db_name, superTblInfo->sTblName, tagsValBuf); free(tagsValBuf); batchNum++; @@ -2673,7 +2676,7 @@ static void* createTable(void *sarg) len = 0; verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)){ + if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)){ errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); free(buffer); return NULL; @@ -2682,14 +2685,14 @@ static void* createTable(void *sarg) int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %d - %d tables\n", - winfo->threadID, winfo->start_table_from, i); + pThreadInfo->threadID, pThreadInfo->start_table_from, i); lastPrintTime = currentPrintTime; } } if (0 != len) { verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); - if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) { + if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)) { errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); } } @@ -5157,45 +5160,45 @@ free_and_statistics_2: static void* syncWrite(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; + threadInfo *pThreadInfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; if (interlaceRows > 0) { // interlace mode - return syncWriteInterlace(winfo); + return syncWriteInterlace(pThreadInfo); } else { // progressive mode - return syncWriteProgressive(winfo); + return syncWriteProgressive(pThreadInfo); } } static void callBack(void *param, TAOS_RES *res, int code) { - threadInfo* winfo = (threadInfo*)param; - SSuperTable* superTblInfo = winfo->superTblInfo; + threadInfo* pThreadInfo = (threadInfo*)param; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; if (insert_interval) { - winfo->et = taosGetTimestampUs(); - if (((winfo->et - winfo->st)/1000) < insert_interval) { - taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms + pThreadInfo->et = taosGetTimestampUs(); + if (((pThreadInfo->et - pThreadInfo->st)/1000) < insert_interval) { + taosMsleep(insert_interval - (pThreadInfo->et - pThreadInfo->st)/1000); // ms } } - char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen); + char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen); char data[MAX_DATA_SIZE]; char *pstr = buffer; - pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, - winfo->start_table_from); -// if (winfo->counter >= winfo->superTblInfo->insertRows) { - if (winfo->counter >= g_args.num_of_RPR) { - winfo->start_table_from++; - winfo->counter = 0; - } - if (winfo->start_table_from > winfo->end_table_to) { - tsem_post(&winfo->lock_sem); + pstr += sprintf(pstr, "insert into %s.%s%d values", pThreadInfo->db_name, pThreadInfo->tb_prefix, + pThreadInfo->start_table_from); +// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { + if (pThreadInfo->counter >= g_args.num_of_RPR) { + pThreadInfo->start_table_from++; + pThreadInfo->counter = 0; + } + if (pThreadInfo->start_table_from > pThreadInfo->end_table_to) { + tsem_post(&pThreadInfo->lock_sem); free(buffer); taos_free_result(res); return; @@ -5203,46 +5206,46 @@ static void callBack(void *param, TAOS_RES *res, int code) { for (int i = 0; i < g_args.num_of_RPR; i++) { int rand_num = taosRandom() % 100; - if (0 != winfo->superTblInfo->disorderRatio - && rand_num < winfo->superTblInfo->disorderRatio) { - int64_t d = winfo->lastTs - (taosRandom() % winfo->superTblInfo->disorderRange + 1); - generateRowData(data, d, winfo->superTblInfo); + if (0 != pThreadInfo->superTblInfo->disorderRatio + && rand_num < pThreadInfo->superTblInfo->disorderRatio) { + int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); + generateRowData(data, d, pThreadInfo->superTblInfo); } else { - generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo); + generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); } pstr += sprintf(pstr, "%s", data); - winfo->counter++; + pThreadInfo->counter++; - if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { break; } } if (insert_interval) { - winfo->st = taosGetTimestampUs(); + pThreadInfo->st = taosGetTimestampUs(); } - taos_query_a(winfo->taos, buffer, callBack, winfo); + taos_query_a(pThreadInfo->taos, buffer, callBack, pThreadInfo); free(buffer); taos_free_result(res); } static void *asyncWrite(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; + threadInfo *pThreadInfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - winfo->st = 0; - winfo->et = 0; - winfo->lastTs = winfo->start_time; + pThreadInfo->st = 0; + pThreadInfo->et = 0; + pThreadInfo->lastTs = pThreadInfo->start_time; int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; if (insert_interval) { - winfo->st = taosGetTimestampUs(); + pThreadInfo->st = taosGetTimestampUs(); } - taos_query_a(winfo->taos, "show databases", callBack, winfo); + taos_query_a(pThreadInfo->taos, "show databases", callBack, pThreadInfo); - tsem_wait(&(winfo->lock_sem)); + tsem_wait(&(pThreadInfo->lock_sem)); return NULL; } @@ -5772,10 +5775,10 @@ static int insertTestProcess() { return 0; } -static void *specifiedQueryProcess(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; +static void *specifiedTableQuery(void *sarg) { + threadInfo *pThreadInfo = (threadInfo *)sarg; - if (winfo->taos == NULL) { + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -5784,17 +5787,17 @@ static void *specifiedQueryProcess(void *sarg) { g_queryInfo.port); if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", - winfo->threadID, taos_errstr(NULL)); + pThreadInfo->threadID, taos_errstr(NULL)); return NULL; } else { - winfo->taos = taos; + pThreadInfo->taos = taos; } } char sqlStr[MAX_DB_NAME_SIZE + 5]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(winfo->taos); + if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(pThreadInfo->taos); errorPrint( "use database %s failed!\n\n", g_queryInfo.dbName); return NULL; @@ -5805,11 +5808,15 @@ static void *specifiedQueryProcess(void *sarg) { int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; + int totalQueried = 0; + int64_t lastPrintTime = taosGetTimestampMs(); + int64_t startTs = taosGetTimestampMs(); + while(queryTimes --) { if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.rate*1000) { taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); + //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); } st = taosGetTimestampUs(); @@ -5817,13 +5824,13 @@ static void *specifiedQueryProcess(void *sarg) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { int64_t t1 = taosGetTimestampUs(); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[winfo->querySeq], - winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); } - selectAndGetResult(winfo->taos, - g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile); + selectAndGetResult(pThreadInfo->taos, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); int64_t t2 = taosGetTimestampUs(); printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); @@ -5831,25 +5838,37 @@ static void *specifiedQueryProcess(void *sarg) { int64_t t1 = taosGetTimestampUs(); int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, - g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]); + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]); + if (0 != retCode) { + printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); + return NULL; + } int64_t t2 = taosGetTimestampUs(); printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); - if (0 != retCode) { - printf("====restful return fail, threadID[%d]\n", winfo->threadID); - return NULL; - } } + totalQueried ++; + g_queryInfo.specifiedQueryInfo.totalQueried ++; et = taosGetTimestampUs(); printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); + + int64_t currentPrintTime = taosGetTimestampMs(); + int64_t endTs = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently completed queries: %d, QPS: %10.2f\n", + pThreadInfo->threadID, + totalQueried, + totalQueried/((endTs-startTs)/1000.0)); + } + lastPrintTime = currentPrintTime; } return NULL; } -static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { +static void replaceChildTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; sprintf(subTblName, "%s.%s", @@ -5871,11 +5890,11 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { //printf("3: %s\n", outSql); } -static void *superQueryProcess(void *sarg) { +static void *superTableQuery(void *sarg) { char sqlstr[1024]; - threadInfo *winfo = (threadInfo *)sarg; + threadInfo *pThreadInfo = (threadInfo *)sarg; - if (winfo->taos == NULL) { + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -5884,10 +5903,10 @@ static void *superQueryProcess(void *sarg) { g_queryInfo.port); if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", - winfo->threadID, taos_errstr(NULL)); + pThreadInfo->threadID, taos_errstr(NULL)); return NULL; } else { - winfo->taos = taos; + pThreadInfo->taos = taos; } } @@ -5895,33 +5914,49 @@ static void *superQueryProcess(void *sarg) { int64_t et = (int64_t)g_queryInfo.superQueryInfo.rate*1000; int queryTimes = g_queryInfo.superQueryInfo.queryTimes; + int totalQueried = 0; + int64_t startTs = taosGetTimestampMs(); + int64_t lastPrintTime = taosGetTimestampMs(); while(queryTimes --) { if (g_queryInfo.superQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) { taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); + //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); } st = taosGetTimestampUs(); - for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { + for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); - replaceSubTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); + replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[j], - winfo->threadID); + pThreadInfo->threadID); } - selectAndGetResult(winfo->taos, sqlstr, tmpFile); + selectAndGetResult(pThreadInfo->taos, sqlstr, tmpFile); + + totalQueried++; + g_queryInfo.superQueryInfo.totalQueried ++; + + int64_t currentPrintTime = taosGetTimestampMs(); + int64_t endTs = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently completed queries: %d, QPS: %10.2f\n", + pThreadInfo->threadID, + totalQueried, + totalQueried/((endTs-startTs)/1000.0)); + } + lastPrintTime = currentPrintTime; } } et = taosGetTimestampUs(); printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), - winfo->start_table_from, - winfo->end_table_to, + pThreadInfo->start_table_from, + pThreadInfo->end_table_to, (double)(et - st)/1000000.0); } @@ -5967,6 +6002,8 @@ static int queryTestProcess() { int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; + int64_t startTs = taosGetTimestampMs(); + if ((nSqlCount > 0) && (nConcurrent > 0)) { pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t)); @@ -6000,7 +6037,7 @@ static int queryTestProcess() { t_info->taos = NULL;// TODO: workaround to use separate taos connection; - pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess, + pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery, t_info); } } @@ -6049,7 +6086,7 @@ static int queryTestProcess() { t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; startFrom = t_info->end_table_to + 1; t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info); + pthread_create(pidsOfSub + i, NULL, superTableQuery, t_info); } g_queryInfo.superQueryInfo.threadCnt = threads; @@ -6076,6 +6113,14 @@ static int queryTestProcess() { tmfree((char*)infosOfSub); // taos_close(taos);// TODO: workaround to use separate taos connection; + int64_t endTs = taosGetTimestampMs(); + + int totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + + g_queryInfo.superQueryInfo.totalQueried; + + printf("==== completed total queries: %d, the QPS of all threads: %10.2f====\n", + totalQueried, + totalQueried/((endTs-startTs)/1000.0)); return 0; } @@ -6112,12 +6157,12 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF return tsub; } -static void *subSubscribeProcess(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; +static void *superSubscribe(void *sarg) { + threadInfo *pThreadInfo = (threadInfo *)sarg; char subSqlstr[1024]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - if (winfo->taos == NULL) { + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -6126,17 +6171,17 @@ static void *subSubscribeProcess(void *sarg) { g_queryInfo.port); if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", - winfo->threadID, taos_errstr(NULL)); + pThreadInfo->threadID, taos_errstr(NULL)); return NULL; } else { - winfo->taos = taos; + pThreadInfo->taos = taos; } } char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(winfo->taos); + if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(pThreadInfo->taos); errorPrint( "use database %s failed!\n\n", g_queryInfo.dbName); return NULL; @@ -6147,7 +6192,7 @@ static void *subSubscribeProcess(void *sarg) { do { //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) { // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms - // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); + // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); //} //st = taosGetTimestampMs(); @@ -6155,15 +6200,15 @@ static void *subSubscribeProcess(void *sarg) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%d", i); memset(subSqlstr,0,sizeof(subSqlstr)); - replaceSubTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i); + replaceChildTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[i], winfo->threadID); + g_queryInfo.superQueryInfo.result[i], pThreadInfo->threadID); } - tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); + tsub[i] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); if (NULL == tsub[i]) { - taos_close(winfo->taos); + taos_close(pThreadInfo->taos); return NULL; } } @@ -6185,7 +6230,7 @@ static void *subSubscribeProcess(void *sarg) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], - winfo->threadID); + pThreadInfo->threadID); } getResult(res, tmpFile); } @@ -6197,15 +6242,15 @@ static void *subSubscribeProcess(void *sarg) { taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); } - taos_close(winfo->taos); + taos_close(pThreadInfo->taos); return NULL; } -static void *superSubscribeProcess(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; +static void *specifiedSubscribe(void *sarg) { + threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - if (winfo->taos == NULL) { + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -6214,18 +6259,18 @@ static void *superSubscribeProcess(void *sarg) { g_queryInfo.port); if (taos == NULL) { errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", - winfo->threadID, taos_errstr(NULL)); + pThreadInfo->threadID, taos_errstr(NULL)); return NULL; } else { - winfo->taos = taos; + pThreadInfo->taos = taos; } } char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); - if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(winfo->taos); + if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(pThreadInfo->taos); return NULL; } @@ -6234,7 +6279,7 @@ static void *superSubscribeProcess(void *sarg) { do { //if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) { // taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms - // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); + // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to); //} //st = taosGetTimestampMs(); @@ -6244,12 +6289,12 @@ static void *superSubscribeProcess(void *sarg) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); } - tsub[i] = subscribeImpl(winfo->taos, + tsub[i] = subscribeImpl(pThreadInfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) { - taos_close(winfo->taos); + taos_close(pThreadInfo->taos); return NULL; } } @@ -6270,7 +6315,7 @@ static void *superSubscribeProcess(void *sarg) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); } getResult(res, tmpFile); } @@ -6283,7 +6328,7 @@ static void *superSubscribeProcess(void *sarg) { g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); } - taos_close(winfo->taos); + taos_close(pThreadInfo->taos); return NULL; } @@ -6341,7 +6386,7 @@ static int subscribeTestProcess() { threadInfo *t_info = infos + i; t_info->threadID = i; t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pids + i, NULL, superSubscribeProcess, t_info); + pthread_create(pids + i, NULL, specifiedSubscribe, t_info); } //==== create sub threads for query from sub table @@ -6384,7 +6429,7 @@ static int subscribeTestProcess() { t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; startFrom = t_info->end_table_to + 1; t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); + pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info); } g_queryInfo.superQueryInfo.threadCnt = threads; -- GitLab