diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 5a65e3dc5e3e677b22fd2dd037638177ce142a41..fe39085981a563f82e5a4e1f0181f678300933c6 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -2541,6 +2541,7 @@ static void* createTable(void *sarg) static int startMultiThreadCreateChildTable( char* cols, int threads, int startFrom, int ntables, char* db_name, SSuperTable* superTblInfo) { + pthread_t *pids = malloc(threads * sizeof(pthread_t)); threadInfo *infos = malloc(threads * sizeof(threadInfo)); @@ -2625,12 +2626,12 @@ static void createChildTables() { g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, - g_totalChildTables, startFrom); + g_totalChildTables, startFrom); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.threadCountByCreateTbl, startFrom, - g_totalChildTables, + g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); } } else { @@ -5314,16 +5315,16 @@ static int insertTestProcess() { continue; } startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, superTblInfo); } } else { startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, NULL); } } @@ -5343,7 +5344,7 @@ static int insertTestProcess() { } static void *superQueryProcess(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; + threadInfo *winfo = (threadInfo *)sarg; //char sqlStr[MAX_TB_NAME_SIZE*2]; //sprintf(sqlStr, "use %s", g_queryInfo.dbName); @@ -5352,39 +5353,41 @@ static void *superQueryProcess(void *sarg) { int64_t st = 0; int64_t et = 0; while (1) { - if (g_queryInfo.superQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) { + if (g_queryInfo.superQueryInfo.rate && (et - st) < + (int64_t)g_queryInfo.superQueryInfo.rate*1000) { taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); } st = taosGetTimestampUs(); for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { + if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { int64_t t1 = taosGetTimestampUs(); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); + sprintf(tmpFile, "%s-%d", + g_queryInfo.superQueryInfo.result[i], winfo->threadID); } - selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); - int64_t t2 = taosGetTimestampUs(); - printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", + selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); + int64_t t2 = taosGetTimestampUs(); + printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); } else { int64_t t1 = taosGetTimestampUs(); - int retCode = postProceSql(g_queryInfo.host, + int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); - int64_t t2 = taosGetTimestampUs(); - printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", + int64_t t2 = taosGetTimestampUs(); + printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); - + if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", winfo->threadID); return NULL; } - } + } } et = taosGetTimestampUs(); - printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", + printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); } return NULL; @@ -5393,28 +5396,28 @@ static void *superQueryProcess(void *sarg) { static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; - sprintf(subTblName, "%s.%s", - g_queryInfo.dbName, + sprintf(subTblName, "%s.%s", + g_queryInfo.dbName, g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); //printf("inSql: %s\n", inSql); char* pos = strstr(inSql, sourceString); if (0 == pos) { - return; + return; } tstrncpy(outSql, inSql, pos - inSql + 1); //printf("1: %s\n", outSql); - strcat(outSql, subTblName); - //printf("2: %s\n", outSql); - strcat(outSql, pos+strlen(sourceString)); - //printf("3: %s\n", outSql); + strcat(outSql, subTblName); + //printf("2: %s\n", outSql); + strcat(outSql, pos+strlen(sourceString)); + //printf("3: %s\n", outSql); } static void *subQueryProcess(void *sarg) { char sqlstr[1024]; - threadInfo *winfo = (threadInfo *)sarg; + threadInfo *winfo = (threadInfo *)sarg; int64_t st = 0; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; while (1) { @@ -5431,29 +5434,29 @@ static void *subQueryProcess(void *sarg) { replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.subQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.subQueryInfo.result[i], + sprintf(tmpFile, "%s-%d", + g_queryInfo.subQueryInfo.result[i], winfo->threadID); } - selectAndGetResult(winfo->taos, sqlstr, tmpFile); + selectAndGetResult(winfo->taos, sqlstr, tmpFile); } } et = taosGetTimestampUs(); printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", - taosGetSelfPthreadId(), - winfo->start_table_from, - winfo->end_table_to, + taosGetSelfPthreadId(), + winfo->start_table_from, + winfo->end_table_to, (double)(et - st)/1000000.0); } return NULL; } static int queryTestProcess() { - TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, - g_queryInfo.user, - g_queryInfo.password, - NULL, + TAOS * taos = NULL; + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, g_queryInfo.port); if (taos == NULL) { errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); @@ -5466,7 +5469,7 @@ static int queryTestProcess() { g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); - } + } printfQueryMeta(); @@ -5563,7 +5566,7 @@ static int queryTestProcess() { for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; - + t_info->start_table_from = startFrom; t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; @@ -5575,7 +5578,7 @@ static int queryTestProcess() { g_queryInfo.subQueryInfo.threadCnt = threads; } else { g_queryInfo.subQueryInfo.threadCnt = 0; - } + } for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL);