diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index fb043eda103066ae1bce5485b0e1002fd7d63209..96b250ef7081e5ebcb8bfc31aa7187d4860c2771 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -240,13 +240,13 @@ typedef struct SColumn_S { typedef struct SSuperTable_S { char sTblName[MAX_TB_NAME_SIZE+1]; + char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample + char childTblPrefix[MAX_TB_NAME_SIZE]; + char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t childTblExists; int64_t childTblCount; - bool childTblExists; // 0: no, 1: yes uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table - char childTblPrefix[MAX_TB_NAME_SIZE]; - char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest int64_t childTblLimit; uint64_t childTblOffset; @@ -793,7 +793,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { if ((argc == i+1) || (!isStringNumber(argv[i+1]))) { printHelp(); - errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n"); + errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n"); exit(EXIT_FAILURE); } arguments->async_mode = atoi(argv[++i]); @@ -1417,7 +1417,8 @@ static int printfInsertMeta() { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); - } else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { + } else if (AUTO_CREATE_SUBTBL == + g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes"); } else { printf(" autoCreateTable: \033[33m%s\033[0m\n", "error"); @@ -3007,60 +3008,61 @@ static void createChildTables() { char tblColsBuf[MAX_SQL_SIZE]; int len; - for (int i = 0; i < g_Dbs.dbCount; i++) { - if (g_Dbs.use_metric) { - if (g_Dbs.db[i].superTblCount > 0) { - // with super table - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { - continue; - } + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.use_metric) { + if (g_Dbs.db[i].superTblCount > 0) { + // with super table + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) + || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { + continue; + } + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + uint64_t startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; + + verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", + __func__, __LINE__, g_totalChildTables, startFrom); + + startMultiThreadCreateChildTable( + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, + g_Dbs.threadCountByCreateTbl, + startFrom, + g_Dbs.db[i].superTbls[j].childTblCount, + g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + } + } + } else { + // normal table + len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); + for (int j = 0; j < g_args.num_of_CPR; j++) { + if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) + || (strncasecmp(g_args.datatype[j], + "NCHAR", strlen("NCHAR")) == 0)) { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); + } else { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s", j, g_args.datatype[j]); + } + len = strlen(tblColsBuf); + } - verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - uint64_t startFrom = 0; - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; - - verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", - __func__, __LINE__, g_totalChildTables, startFrom); - startMultiThreadCreateChildTable( - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, - g_Dbs.threadCountByCreateTbl, - startFrom, - g_Dbs.db[i].superTbls[j].childTblCount, - g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); + + verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", + __func__, __LINE__, + g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); + startMultiThreadCreateChildTable( + tblColsBuf, + g_Dbs.threadCountByCreateTbl, + 0, + g_args.num_of_tables, + g_Dbs.db[i].dbName, + NULL); } - } - } else { - // normal table - len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); - for (int j = 0; j < g_args.num_of_CPR; j++) { - if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) - || (strncasecmp(g_args.datatype[j], - "NCHAR", strlen("NCHAR")) == 0)) { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); - } else { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s", j, g_args.datatype[j]); - } - len = strlen(tblColsBuf); - } - - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - - verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", - __func__, __LINE__, - g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); - startMultiThreadCreateChildTable( - tblColsBuf, - g_Dbs.threadCountByCreateTbl, - 0, - g_args.num_of_tables, - g_Dbs.db[i].dbName, - NULL); } - } } /* @@ -3773,19 +3775,19 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { if (autoCreateTbl && autoCreateTbl->type == cJSON_String && autoCreateTbl->valuestring != NULL) { - if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) - && (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; - } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } else { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } + if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) + && (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; + } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } else { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } } else if (!autoCreateTbl) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; } else { - printf("ERROR: failed to read json, auto_create_table not found\n"); - goto PARSE_OVER; + printf("ERROR: failed to read json, auto_create_table not found\n"); + goto PARSE_OVER; } cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); @@ -4863,26 +4865,29 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) { - if (superTblInfo->childTblLimit > 0) { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + - (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) { + if (superTblInfo->childTblLimit > 0) { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + } else { + verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } + } else { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + superTblInfo->childTblPrefix, tableSeq); + } } else { - - verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, - pThreadInfo->start_table_from, - pThreadInfo->ntables, tableSeq); - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + g_args.tb_prefix, tableSeq); } - } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - g_args.tb_prefix, tableSeq); - } } static int64_t generateDataTail( @@ -6195,7 +6200,7 @@ static int insertTestProcess() { } } - taosMsleep(1000); + //taosMsleep(1000); // create sub threads for inserting data //start = taosGetTimestampMs(); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -6839,11 +6844,14 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]); if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) { - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] + != 0) { sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo); + fetchResult( + g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], + pThreadInfo); } g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; @@ -6856,16 +6864,17 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( - SPECIFIED_CLASS, - pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], - g_queryInfo.specifiedQueryInfo.subscribeRestart, - g_queryInfo.specifiedQueryInfo.subscribeInterval); + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = + subscribeImpl( + SPECIFIED_CLASS, + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { - taos_close(pThreadInfo->taos); - return NULL; + taos_close(pThreadInfo->taos); + return NULL; } } } @@ -7078,7 +7087,7 @@ static void setParaFromArg(){ g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.dbCount = 1; - g_Dbs.db[0].drop = 1; + g_Dbs.db[0].drop = true; tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE); g_Dbs.db[0].dbCfg.replica = g_args.replica;