diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c index a6d962df55670d7b188cfe4ffcf2df9a4be3594a..96b65d402f3216071cf5053ba949ae705c56c22e 100644 --- a/src/kit/taosdemox/taosdemox.c +++ b/src/kit/taosdemox/taosdemox.c @@ -173,6 +173,7 @@ typedef struct SSuperTable_S { int childTblCount; bool superTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes + int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample @@ -808,13 +809,14 @@ static void init_rand_data() { static void printfInsertMeta() { printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n"); - printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); - printf("user: \033[33m%s\033[0m\n", g_Dbs.user); - printf("password: \033[33m%s\033[0m\n", g_Dbs.password); - printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); - printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount); - - printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); + printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); + printf("user: \033[33m%s\033[0m\n", g_Dbs.user); + printf("password: \033[33m%s\033[0m\n", g_Dbs.password); + printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); + printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); + printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); + + printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { printf("database[\033[33m%d\033[0m]:\n", i); printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName); @@ -944,11 +946,12 @@ static void printfInsertMeta() { static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "================ insert.json parse result START================\n"); - fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); - fprintf(fp, "user: %s\n", g_Dbs.user); - fprintf(fp, "password: %s\n", g_Dbs.password); - fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); - fprintf(fp, "thread count: %d\n", g_Dbs.threadCount); + fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); + fprintf(fp, "user: %s\n", g_Dbs.user); + fprintf(fp, "password: %s\n", g_Dbs.password); + fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); + fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); + fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl) fprintf(fp, "database count: %d\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -1730,19 +1733,27 @@ static int createDatabases() { void * createTable(void *sarg) -{ - char command[BUFFER_SIZE] = "\0"; - +{ threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; int64_t lastPrintTime = taosGetTimestampMs(); + char* buffer = calloc(superTblInfo->maxSqlLen, 1); + + int len = 0; + int batchNum = 0; //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { if (0 == g_Dbs.use_metric) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); + snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); } else { + if (0 == len) { + batchNum = 0; + memset(buffer, 0, superTblInfo->maxSqlLen); + len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table "); + } + char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); @@ -1750,13 +1761,22 @@ void * createTable(void *sarg) tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { + free(buffer); return NULL; } - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf); + + len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf); free(tagsValBuf); + batchNum++; + + if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) { + continue; + } } - - if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){ + + len = 0; + if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ + free(buffer); return NULL; } @@ -1766,7 +1786,12 @@ void * createTable(void *sarg) lastPrintTime = currentPrintTime; } } - + + if (0 != len) { + (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); + } + + free(buffer); return NULL; } @@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, auto_create_table not found"); goto PARSE_OVER; } + + cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); + if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; + } else if (!batchCreateTbl) { + g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000; + } else { + printf("failed to read json, batch_create_tbl_num not found"); + goto PARSE_OVER; + } cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) { @@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu b = ntables % threads; } - TAOS* taos; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { - printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); - exit(-1); - } - } + //TAOS* taos; + //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + // if (NULL == taos) { + // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); + // exit(-1); + // } + //} int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { @@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu t_info->start_time = start_time; if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - t_info->taos = taos; + //t_info->taos = taos; + t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + if (NULL == t_info->taos) { + printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); + exit(-1); + } } else { t_info->taos = NULL; #ifdef TD_LOWA_CURL @@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu threadInfo *t_info = infos + i; tsem_destroy(&(t_info->lock_sem)); + taos_close(t_info->taos); superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; @@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu double end = getCurrentTime(); - taos_close(taos); + //taos_close(taos); free(pids); free(infos);