未验证 提交 e9b3f5d3 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3478] <fix>: increase thread num of table creation same as insert… (#5577)

* [TD-3478] <fix>: increase thread num of table creation same as insertion.

* [TD-3478] <fix>: increase thread num of table creation same as insertion.

fix potential refer to derefed buffer.

* [TD-3478] <fix>: increase thread num of table creation same as insertion.

change rowsPerTbl to interlaceRows
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 1b998f06
...@@ -237,7 +237,7 @@ typedef struct SSuperTable_S { ...@@ -237,7 +237,7 @@ typedef struct SSuperTable_S {
int childTblOffset; int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes int multiThreadWriteOneTbl; // 0: no, 1: yes
int rowsPerTbl; // int interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
int maxSqlLen; // int maxSqlLen; //
...@@ -1199,8 +1199,8 @@ static int printfInsertMeta() { ...@@ -1199,8 +1199,8 @@ static int printfInsertMeta() {
}else { }else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
} }
printf(" rowsPerTbl: \033[33m%d\033[0m\n", printf(" interlaceRows: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].rowsPerTbl); g_Dbs.db[i].superTbls[j].interlaceRows);
printf(" disorderRange: \033[33m%d\033[0m\n", printf(" disorderRange: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].disorderRange); g_Dbs.db[i].superTbls[j].disorderRange);
printf(" disorderRatio: \033[33m%d\033[0m\n", printf(" disorderRatio: \033[33m%d\033[0m\n",
...@@ -1361,7 +1361,7 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1361,7 +1361,7 @@ static void printfInsertMetaToFile(FILE* fp) {
}else { }else {
fprintf(fp, " multiThreadWriteOneTbl: yes\n"); fprintf(fp, " multiThreadWriteOneTbl: yes\n");
} }
fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen); fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen);
...@@ -2573,8 +2573,8 @@ static void* createTable(void *sarg) ...@@ -2573,8 +2573,8 @@ static void* createTable(void *sarg)
len = 0; len = 0;
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer);
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
free(buffer);
return NULL; return NULL;
} }
...@@ -3061,9 +3061,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3061,9 +3061,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (threads2 && threads2->type == cJSON_Number) { if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint; g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!threads2) { } else if (!threads2) {
g_Dbs.threadCountByCreateTbl = 1; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
} else { } else {
printf("ERROR: failed to read json, threads2 not found\n"); errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3578,13 +3579,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3578,13 +3579,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "interlace_rows"); cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { if (interlaceRows && interlaceRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint; g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
} else if (!rowsPerTbl) { } else if (!interlaceRows) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
errorPrint("%s() LN%d, failed to read json, rowsPerTbl input mistake\n", __func__, __LINE__); errorPrint(
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -4505,17 +4508,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4505,17 +4508,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int insertMode; int insertMode;
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.interlace_rows; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
if (rowsPerTbl > 0) { if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE; insertMode = INTERLACE_INSERT_MODE;
} else { } else {
insertMode = PROGRESSIVE_INSERT_MODE; insertMode = PROGRESSIVE_INSERT_MODE;
} }
// rows per table need be less than insert batch // rows per table need be less than insert batch
if (rowsPerTbl > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
rowsPerTbl = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
...@@ -4543,13 +4546,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4543,13 +4546,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert(pThreadInfo->ntables > 0); assert(pThreadInfo->ntables > 0);
if (rowsPerTbl > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
rowsPerTbl = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
batchPerTbl = rowsPerTbl; batchPerTbl = interlaceRows;
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
(g_args.num_of_RPR / (rowsPerTbl * pThreadInfo->ntables)) + 1; (g_args.num_of_RPR / (interlaceRows * pThreadInfo->ntables)) + 1;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
...@@ -4830,9 +4833,9 @@ static void* syncWrite(void *sarg) { ...@@ -4830,9 +4833,9 @@ static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.interlace_rows; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
if (rowsPerTbl > 0) { if (interlaceRows > 0) {
// interlace mode // interlace mode
return syncWriteInterlace(winfo); return syncWriteInterlace(winfo);
} else { } else {
...@@ -4974,7 +4977,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4974,7 +4977,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} else if (0 == strncasecmp(precision, "us", 2)) { } else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO; timePrec = TSDB_TIME_PRECISION_MICRO;
} else { } else {
errorPrint( "No support precision: %s\n", precision); errorPrint("Not support precision: %s\n", precision);
exit(-1); exit(-1);
} }
} }
...@@ -5009,7 +5012,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5009,7 +5012,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(superTblInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1); exit(-1);
} }
} }
...@@ -5018,7 +5022,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5018,7 +5022,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(superTblInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1); exit(-1);
} }
} }
...@@ -5078,7 +5083,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5078,7 +5083,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) { if (NULL == t_info->taos) {
errorPrint( "connect to server fail from insert sub thread, reason: %s\n", errorPrint(
"connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL)); taos_errstr(NULL));
exit(-1); exit(-1);
} }
...@@ -5374,10 +5380,10 @@ static int insertTestProcess() { ...@@ -5374,10 +5380,10 @@ static int insertTestProcess() {
if (g_totalChildTables > 0) { if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount); end - start, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n", "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount); end - start, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
} }
taosMsleep(1000); taosMsleep(1000);
...@@ -5546,7 +5552,8 @@ static int queryTestProcess() { ...@@ -5546,7 +5552,8 @@ static int queryTestProcess() {
NULL, NULL,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1); exit(-1);
} }
...@@ -5867,7 +5874,8 @@ static int subscribeTestProcess() { ...@@ -5867,7 +5874,8 @@ static int subscribeTestProcess() {
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1); exit(-1);
} }
...@@ -6041,7 +6049,7 @@ static void setParaFromArg(){ ...@@ -6041,7 +6049,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads; g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = 1; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.mode; g_Dbs.queryMode = g_args.mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
...@@ -6089,7 +6097,7 @@ static void setParaFromArg(){ ...@@ -6089,7 +6097,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2; g_Dbs.db[0].superTbls[0].tagCount = 2;
} else { } else {
g_Dbs.threadCountByCreateTbl = 1; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.db[0].superTbls[0].tagCount = 0; g_Dbs.db[0].superTbls[0].tagCount = 0;
} }
...@@ -6237,7 +6245,8 @@ static void queryResult() { ...@@ -6237,7 +6245,8 @@ static void queryResult() {
g_Dbs.db[0].dbName, g_Dbs.db[0].dbName,
g_Dbs.port); g_Dbs.port);
if (rInfo->taos == NULL) { if (rInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(rInfo); free(rInfo);
exit(-1); exit(-1);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册