diff --git a/src/kit/taosdemox/insert.json b/src/kit/taosdemox/insert.json index 88416c13a4cad68a25d090f69f9b3ae7dee66296..aa071c115d60d78797b4c36456adfacd0d345af7 100644 --- a/src/kit/taosdemox/insert.json +++ b/src/kit/taosdemox/insert.json @@ -5,7 +5,8 @@ "port": 6030, "user": "root", "password": "taosdata", - "thread_count": 2, + "thread_count": 4, + "thread_count_create_tbl": 1, "result_file": "./insert_res.txt", "databases": [{ "dbinfo": { diff --git a/src/kit/taosdemox/taosdemox.c b/src/kit/taosdemox/taosdemox.c index 97e7b426675da6d9a5398da52fe6e34e86b3562d..0e2ec6d7ae82feabb6a9fa15b5425067776b3292 100644 --- a/src/kit/taosdemox/taosdemox.c +++ b/src/kit/taosdemox/taosdemox.c @@ -93,9 +93,6 @@ extern char configDir[]; #define MAX_QUERY_SQL_COUNT 10 #define MAX_QUERY_SQL_LENGTH 256 - -#define MAX_LINE_COUNT_IN_MEM 10000 - typedef enum CREATE_SUB_TALBE_MOD_EN { PRE_CREATE_SUBTBL, AUTO_CREATE_SUBTBL, @@ -259,6 +256,7 @@ typedef struct SDbs_S { bool queryMode; int threadCount; + int threadCountByCreateTbl; int dbCount; SDataBase db[MAX_DB_COUNT]; @@ -1418,7 +1416,6 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName char command[BUFFER_SIZE] = "\0"; TAOS_RES * res; TAOS_ROW row = NULL; - int count = 0; char* childTblName = *childTblNameOfSuperTbl; @@ -1433,12 +1430,13 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName exit(-1); } - int childTblCount = 10000; - count = 0; + int childTblCount = 10000; + int count = 0; childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); char* pTblName = childTblName; while ((row = taos_fetch_row(res)) != NULL) { - strncpy(pTblName, (char *)row[0], TSDB_TABLE_NAME_LEN); + int32_t* len = taos_fetch_lengths(res); + strncpy(pTblName, (char *)row[0], len[0]); //printf("==== sub table name: %s\n", pTblName); count++; if (count >= childTblCount - 1) { @@ -1829,38 +1827,64 @@ static void createChildTables() { if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; } - startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCount, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCountByCreateTbl, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; } } } +/* +static int taosGetLineNum(const char *fileName) +{ + int lineNum = 0; + char cmd[1024] = { 0 }; + char buf[1024] = { 0 }; + sprintf(cmd, "wc -l %s", fileName); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + return lineNum; + } + + if (fgets(buf, sizeof(buf), fp)) { + int index = strchr((const char*)buf, ' ') - buf; + buf[index] = '\0'; + lineNum = atoi(buf); + } + pclose(fp); + return lineNum; +} +*/ + /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ -int readTagFromCsvFileToMem(SSuperTable * supterTblInfo) { +int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; - FILE *fp = fopen(supterTblInfo->tagsFile, "r"); + FILE *fp = fopen(superTblInfo->tagsFile, "r"); if (fp == NULL) { - printf("Failed to open tags file: %s, reason:%s\n", supterTblInfo->tagsFile, strerror(errno)); + printf("Failed to open tags file: %s, reason:%s\n", superTblInfo->tagsFile, strerror(errno)); return -1; } - if (supterTblInfo->tagDataBuf) { - free(supterTblInfo->tagDataBuf); - supterTblInfo->tagDataBuf = NULL; + if (superTblInfo->tagDataBuf) { + free(superTblInfo->tagDataBuf); + superTblInfo->tagDataBuf = NULL; } - - supterTblInfo->tagDataBuf = calloc(supterTblInfo->lenOfTagOfOneRow * MAX_LINE_COUNT_IN_MEM, 1); - if (supterTblInfo->tagDataBuf == NULL) { + + int tagCount = 10000; + int count = 0; + char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); + if (tagDataBuf == NULL) { printf("Failed to calloc, reason:%s\n", strerror(errno)); fclose(fp); return -1; } - + while ((readLen = getline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; @@ -1870,20 +1894,35 @@ int readTagFromCsvFileToMem(SSuperTable * supterTblInfo) { continue; } - memcpy(supterTblInfo->tagDataBuf + supterTblInfo->tagSampleCount * supterTblInfo->lenOfTagOfOneRow, line, readLen); - supterTblInfo->tagSampleCount++; + memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen); + count++; - if (supterTblInfo->tagSampleCount >= MAX_LINE_COUNT_IN_MEM) { - break; + if (count >= tagCount - 1) { + char *tmp = realloc(tagDataBuf, (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow); + if (tmp != NULL) { + tagDataBuf = tmp; + tagCount = (int)(tagCount*1.5); + memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow)); + } else { + // exit, if allocate more memory failed + printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile); + tmfree(tagDataBuf); + free(line); + fclose(fp); + return -1; + } } } + superTblInfo->tagDataBuf = tagDataBuf; + superTblInfo->tagSampleCount = count; + free(line); fclose(fp); return 0; } -int readSampleFromJsonFileToMem(SSuperTable * supterTblInfo) { +int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { // TODO return 0; } @@ -2138,6 +2177,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, threads not found"); goto PARSE_OVER; } + + cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl"); + if (threads2 && threads2->type == cJSON_Number) { + g_Dbs.threadCountByCreateTbl = threads2->valueint; + } else if (!threads2) { + g_Dbs.threadCountByCreateTbl = 1; + } else { + printf("failed to read json, threads2 not found"); + goto PARSE_OVER; + } cJSON* dbs = cJSON_GetObjectItem(root, "databases"); if (!dbs || dbs->type != cJSON_Array) { @@ -3008,6 +3057,10 @@ void postFreeResource() { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; } + if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { + free(g_Dbs.db[i].superTbls[j].tagDataBuf); + g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; + } if (0 != g_Dbs.db[i].superTbls[j].childTblName) { free(g_Dbs.db[i].superTbls[j].childTblName); g_Dbs.db[i].superTbls[j].childTblName = NULL;