From f39cf39ca728261b817ae577b401dfab47a12d06 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 13 Sep 2021 22:24:42 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 5872 taosdemo stmt improve for master (#7890) * cherry pick from develop branch. * cherry pick 548131751fad2b0e11e15b75af6c254164f28eea * [TD-5872]: taosdemo stmt csv perf improve. * rand func back to early impl. * fix windows/mac compile error. * cherry pick from develop branch. * merge with develop branch. * rename MAX_SAMPLES_ONCE_FROM_FILE to reflect reality. * split func for stmt interlace. * fix bug that get build path. * stmt batch interlace works. * fix start time issue. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 470 ++++++++++++++++++++++-------------- 1 file changed, 294 insertions(+), 176 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4b58fa1df2..9b3be1556a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -109,8 +109,7 @@ extern char configDir[]; #define DEFAULT_DATATYPE_NUM 1 #define DEFAULT_CHILDTABLES 10000 - -#define STMT_BIND_PARAM_BATCH 0 +#define STMT_BIND_PARAM_BATCH 1 char* g_sampleDataBuf = NULL; #if STMT_BIND_PARAM_BATCH == 1 @@ -5430,7 +5429,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows; } } else if (!stbInterlaceRows) { - g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req + g_Dbs.db[i].superTbls[j].interlaceRows = g_args.interlaceRows; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { errorPrint( "%s", "failed to read json, interlace rows input mistake\n"); @@ -8294,7 +8293,7 @@ static uint32_t execBindParam( } #endif -static int32_t prepareStbStmtWithSample( +static int32_t prepareStbStmt( threadInfo *pThreadInfo, char *tableName, int64_t tableSeq, @@ -8468,21 +8467,18 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint64_t maxSqlLen; - int64_t nTimeStampStep; + int64_t timeStampStep; uint64_t insert_interval; SSuperTable* stbInfo = pThreadInfo->stbInfo; if (stbInfo) { insertRows = stbInfo->insertRows; - maxSqlLen = stbInfo->maxSqlLen; - nTimeStampStep = stbInfo->timeStampStep; + timeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; } else { insertRows = g_args.insertRows; - maxSqlLen = g_args.max_sql_len; - nTimeStampStep = g_args.timestamp_step; + timeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; } @@ -8491,18 +8487,14 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter pThreadInfo->start_table_from, pThreadInfo->ntables, insertRows); - uint32_t batchPerTbl = interlaceRows; - uint32_t batchPerTblTimes; + uint64_t timesInterlace = (insertRows / interlaceRows) + 1; + uint32_t precalcBatch = interlaceRows; - if (interlaceRows > g_args.reqPerReq) - interlaceRows = g_args.reqPerReq; + if (precalcBatch > g_args.reqPerReq) + precalcBatch = g_args.reqPerReq; - if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { - batchPerTblTimes = - g_args.reqPerReq / interlaceRows; - } else { - batchPerTblTimes = 1; - } + if (precalcBatch > MAX_SAMPLES) + precalcBatch = MAX_SAMPLES; pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; @@ -8515,27 +8507,27 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter uint64_t endTs; uint64_t tableSeq = pThreadInfo->start_table_from; - int64_t startTime = pThreadInfo->start_time; + int64_t startTime; - uint64_t generatedRecPerTbl = 0; bool flagSleep = true; uint64_t sleepTimeTotal = 0; int percentComplete = 0; int64_t totalRows = insertRows * pThreadInfo->ntables; + pThreadInfo->samplePos = 0; - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { + for (int64_t interlace = 0; + interlace < timesInterlace; interlace ++) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); flagSleep = false; } - uint32_t recOfBatch = 0; + int64_t generated = 0; + int64_t samplePos; - int32_t generated; - for (uint64_t i = 0; i < batchPerTblTimes; i ++) { + for (; tableSeq < pThreadInfo->start_table_from + pThreadInfo->ntables; tableSeq ++) { char tableName[TSDB_TABLE_NAME_LEN]; - getTableName(tableName, pThreadInfo, tableSeq); if (0 == strlen(tableName)) { errorPrint2("[%d] %s() LN%d, getTableName return null\n", @@ -8543,127 +8535,121 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter return NULL; } - if (stbInfo) { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime, - &(pThreadInfo->samplePos)); - } else { - debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", - pThreadInfo->threadID, - __func__, __LINE__, - tableName, batchPerTbl, startTime); - generated = prepareStmtWithoutStb( - pThreadInfo, - tableName, - batchPerTbl, - insertRows, i, - startTime); - } + samplePos = pThreadInfo->samplePos; + startTime = pThreadInfo->start_time + + interlace * interlaceRows * timeStampStep; + uint64_t remainRecPerTbl = + insertRows - interlaceRows * interlace; + uint64_t recPerTbl = 0; - debugPrint("[%d] %s() LN%d, generated records is %d\n", - pThreadInfo->threadID, __func__, __LINE__, generated); - if (generated < 0) { - errorPrint2("[%d] %s() LN%d, generated records is %d\n", - pThreadInfo->threadID, __func__, __LINE__, generated); - goto free_of_interlace_stmt; - } else if (generated == 0) { - break; + uint64_t remainPerInterlace; + if (remainRecPerTbl > interlaceRows) { + remainPerInterlace = interlaceRows; + } else { + remainPerInterlace = remainRecPerTbl; } - tableSeq ++; - recOfBatch += batchPerTbl; + while(remainPerInterlace > 0) { - pThreadInfo->totalInsertRows += batchPerTbl; - - verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", - pThreadInfo->threadID, __func__, __LINE__, - batchPerTbl, recOfBatch); - - if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { - // turn to first table - tableSeq = pThreadInfo->start_table_from; - generatedRecPerTbl += batchPerTbl; + uint32_t batch; + if (remainPerInterlace > precalcBatch) { + batch = precalcBatch; + } else { + batch = remainPerInterlace; + } + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batch, startTime); - startTime = pThreadInfo->start_time - + generatedRecPerTbl * nTimeStampStep; + if (stbInfo) { + generated = prepareStbStmt( + pThreadInfo, + tableName, + tableSeq, + batch, + insertRows, 0, + startTime, + &samplePos); + } else { + generated = prepareStmtWithoutStb( + pThreadInfo, + tableName, + batch, + insertRows, + interlaceRows * interlace + recPerTbl, + startTime); + } - flagSleep = true; - if (generatedRecPerTbl >= insertRows) + debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + if (generated < 0) { + errorPrint2("[%d] %s() LN%d, generated records is %"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + goto free_of_interlace_stmt; + } else if (generated == 0) { break; + } - int64_t remainRows = insertRows - generatedRecPerTbl; - if ((remainRows > 0) && (batchPerTbl > remainRows)) - batchPerTbl = remainRows; + recPerTbl += generated; + remainPerInterlace -= generated; + pThreadInfo->totalInsertRows += generated; - if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq) - break; - } + verbosePrint("[%d] %s() LN%d totalInsertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->totalInsertRows); - verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", - pThreadInfo->threadID, __func__, __LINE__, - generatedRecPerTbl, insertRows); + startTs = taosGetTimestampUs(); - if ((g_args.reqPerReq - recOfBatch) < batchPerTbl) - break; - } + int64_t affectedRows = execInsert(pThreadInfo, generated); - verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, recOfBatch, - pThreadInfo->totalInsertRows); + endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %10.2f ms\n", + __func__, __LINE__, delay / 1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); - startTs = taosGetTimestampUs(); + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; - if (recOfBatch == 0) { - errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n", - pThreadInfo->threadID, __func__, __LINE__, - batchPerTbl); - if (batchPerTbl > 0) { - errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n", - batchPerTbl, maxSqlLen / batchPerTbl); - } - goto free_of_interlace_stmt; - } - int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); + if (generated != affectedRows) { + errorPrint2("[%d] %s() LN%d execInsert() insert %"PRId64", affected rows: %"PRId64"\n\n", + pThreadInfo->threadID, __func__, __LINE__, + generated, affectedRows); + goto free_of_interlace_stmt; + } - endTs = taosGetTimestampUs(); - uint64_t delay = endTs - startTs; - performancePrint("%s() LN%d, insert execution time is %10.2f ms\n", - __func__, __LINE__, delay / 1000.0); - verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", - pThreadInfo->threadID, - __func__, __LINE__, affectedRows); + pThreadInfo->totalAffectedRows += affectedRows; - if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; - if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; - pThreadInfo->cntDelay++; - pThreadInfo->totalDelay += delay; + int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } - if (recOfBatch != affectedRows) { - errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n", - pThreadInfo->threadID, __func__, __LINE__, - recOfBatch, affectedRows); - goto free_of_interlace_stmt; + startTime += (generated * timeStampStep); + } } + pThreadInfo->samplePos = samplePos; - pThreadInfo->totalAffectedRows += affectedRows; + if (tableSeq == pThreadInfo->start_table_from + + pThreadInfo->ntables) { + // turn to first table + tableSeq = pThreadInfo->start_table_from; - int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; - if (currentPercent > percentComplete ) { - printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); - percentComplete = currentPercent; - } - int64_t currentPrintTime = taosGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", - pThreadInfo->threadID, - pThreadInfo->totalInsertRows, - pThreadInfo->totalAffectedRows); - lastPrintTime = currentPrintTime; + flagSleep = true; } if ((insert_interval) && flagSleep) { @@ -8693,7 +8679,7 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR int64_t insertRows; uint64_t maxSqlLen; - int64_t nTimeStampStep; + int64_t timeStampStep; uint64_t insert_interval; SSuperTable* stbInfo = pThreadInfo->stbInfo; @@ -8701,12 +8687,12 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR if (stbInfo) { insertRows = stbInfo->insertRows; maxSqlLen = stbInfo->maxSqlLen; - nTimeStampStep = stbInfo->timeStampStep; + timeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; } else { insertRows = g_args.insertRows; maxSqlLen = g_args.max_sql_len; - nTimeStampStep = g_args.timestamp_step; + timeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; } @@ -8767,8 +8753,12 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR return NULL; } + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); if (stbInfo) { - generated = prepareStbStmtWithSample( + generated = prepareStbStmt( pThreadInfo, tableName, tableSeq, @@ -8777,10 +8767,6 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR startTime, &(pThreadInfo->samplePos)); } else { - debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", - pThreadInfo->threadID, - __func__, __LINE__, - tableName, batchPerTbl, startTime); generated = prepareStmtWithoutStb( pThreadInfo, tableName, @@ -8814,7 +8800,7 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR generatedRecPerTbl += batchPerTbl; startTime = pThreadInfo->start_time - + generatedRecPerTbl * nTimeStampStep; + + generatedRecPerTbl * timeStampStep; flagSleep = true; if (generatedRecPerTbl >= insertRows) @@ -8919,7 +8905,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) int64_t insertRows; uint64_t maxSqlLen; - int64_t nTimeStampStep; + int64_t timeStampStep; uint64_t insert_interval; SSuperTable* stbInfo = pThreadInfo->stbInfo; @@ -8927,12 +8913,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) if (stbInfo) { insertRows = stbInfo->insertRows; maxSqlLen = stbInfo->maxSqlLen; - nTimeStampStep = stbInfo->timeStampStep; + timeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; } else { insertRows = g_args.insertRows; maxSqlLen = g_args.max_sql_len; - nTimeStampStep = g_args.timestamp_step; + timeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; } @@ -9075,7 +9061,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) generatedRecPerTbl += batchPerTbl; startTime = pThreadInfo->start_time - + generatedRecPerTbl * nTimeStampStep; + + generatedRecPerTbl * timeStampStep; flagSleep = true; if (generatedRecPerTbl >= insertRows) @@ -9176,6 +9162,144 @@ free_of_interlace: return NULL; } +static void* syncWriteProgressiveStmt(threadInfo *pThreadInfo) { + debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__); + + SSuperTable* stbInfo = pThreadInfo->stbInfo; + int64_t timeStampStep = + stbInfo?stbInfo->timeStampStep:g_args.timestamp_step; + int64_t insertRows = + (stbInfo)?stbInfo->insertRows:g_args.insertRows; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + pThreadInfo->samplePos = 0; + + int percentComplete = 0; + int64_t totalRows = insertRows * pThreadInfo->ntables; + + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { + int64_t start_time = pThreadInfo->start_time; + + for (uint64_t i = 0; i < insertRows;) { + char tableName[TSDB_TABLE_NAME_LEN]; + getTableName(tableName, pThreadInfo, tableSeq); + verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", + __func__, __LINE__, + pThreadInfo->threadID, tableSeq, tableName); + if (0 == strlen(tableName)) { + errorPrint2("[%d] %s() LN%d, getTableName return null\n", + pThreadInfo->threadID, __func__, __LINE__); + return NULL; + } + + // measure prepare + insert + startTs = taosGetTimestampUs(); + + int32_t generated; + if (stbInfo) { + generated = prepareStbStmt( + pThreadInfo, + tableName, + tableSeq, + (g_args.reqPerReq>stbInfo->insertRows)? + stbInfo->insertRows: + g_args.reqPerReq, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); + } else { + generated = prepareStmtWithoutStb( + pThreadInfo, + tableName, + g_args.reqPerReq, + insertRows, i, + start_time); + } + + verbosePrint("[%d] %s() LN%d generated=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, generated); + + if (generated > 0) + i += generated; + else + goto free_of_stmt_progressive; + + start_time += generated * timeStampStep; + pThreadInfo->totalInsertRows += generated; + + // only measure insert + // startTs = taosGetTimestampUs(); + + int32_t affectedRows = execInsert(pThreadInfo, generated); + + endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %10.f ms\n", + __func__, __LINE__, delay/1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + if (affectedRows < 0) { + errorPrint2("%s() LN%d, affected rows: %d\n", + __func__, __LINE__, affectedRows); + goto free_of_stmt_progressive; + } + + pThreadInfo->totalAffectedRows += affectedRows; + + int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + if (i >= insertRows) + break; + } // insertRows + + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (stbInfo) + && (0 == strncasecmp( + stbInfo->dataSource, + "sample", strlen("sample")))) { + verbosePrint("%s() LN%d samplePos=%"PRId64"\n", + __func__, __LINE__, pThreadInfo->samplePos); + } + } // tableSeq + + if (percentComplete < 100) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); + } + +free_of_stmt_progressive: + tmfree(pThreadInfo->buffer); + printStatPerThread(pThreadInfo); + return NULL; +} // sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); @@ -9242,7 +9366,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { - generated = prepareStbStmtWithSample( + generated = prepareStbStmt( pThreadInfo, tableName, tableSeq, @@ -9374,20 +9498,28 @@ static void* syncWrite(void *sarg) { if (interlaceRows > 0) { // interlace mode - if (((stbInfo) && (STMT_IFACE == stbInfo->iface)) - || (STMT_IFACE == g_args.iface)) { + if (stbInfo) { + if (STMT_IFACE == stbInfo->iface) { #if STMT_BIND_PARAM_BATCH == 1 - return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows); + return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows); #else - return syncWriteInterlaceStmt(pThreadInfo, interlaceRows); + return syncWriteInterlaceStmt(pThreadInfo, interlaceRows); #endif - } else { - return syncWriteInterlace(pThreadInfo, interlaceRows); + } else { + return syncWriteInterlace(pThreadInfo, interlaceRows); + } } - }else { - // progressive mode - return syncWriteProgressive(pThreadInfo); + } else { + // progressive mode + if (((stbInfo) && (STMT_IFACE == stbInfo->iface)) + || (STMT_IFACE == g_args.iface)) { + return syncWriteProgressiveStmt(pThreadInfo); + } else { + return syncWriteProgressive(pThreadInfo); + } } + + return NULL; } static void callBack(void *param, TAOS_RES *res, int code) { @@ -9518,24 +9650,24 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } - int64_t start_time; + int64_t startTime; if (stbInfo) { if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) { - start_time = taosGetTimestamp(timePrec); + startTime = taosGetTimestamp(timePrec); } else { if (TSDB_CODE_SUCCESS != taosParseTime( stbInfo->startTimestamp, - &start_time, + &startTime, strlen(stbInfo->startTimestamp), timePrec, 0)) { ERROR_EXIT("failed to parse time!\n"); } } } else { - start_time = DEFAULT_START_TIME; + startTime = DEFAULT_START_TIME; } - debugPrint("%s() LN%d, start_time= %"PRId64"\n", - __func__, __LINE__, start_time); + debugPrint("%s() LN%d, startTime= %"PRId64"\n", + __func__, __LINE__, startTime); // read sample data from file first int ret; @@ -9655,14 +9787,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, } pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); - assert(pids != NULL); - threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(pids != NULL); assert(infos != NULL); - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - char *stmtBuffer = calloc(1, BUFFER_SIZE); assert(stmtBuffer); @@ -9671,18 +9799,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, uint32_t batch; if (stbInfo) { - if ((stbInfo->interlaceRows == 0) - && (g_args.interlaceRows > 0) - ) { - interlaceRows = g_args.interlaceRows; - - } else { + if (stbInfo->interlaceRows < stbInfo->insertRows) interlaceRows = stbInfo->interlaceRows; - } - - if (interlaceRows > stbInfo->insertRows) { - interlaceRows = 0; - } } else { if (g_args.interlaceRows < g_args.insertRows) interlaceRows = g_args.interlaceRows; @@ -9739,7 +9857,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->time_precision = timePrec; pThreadInfo->stbInfo = stbInfo; - pThreadInfo->start_time = start_time; + pThreadInfo->start_time = startTime; pThreadInfo->minDelay = UINT64_MAX; if ((NULL == stbInfo) || @@ -9955,7 +10073,7 @@ static void *readTable(void *sarg) { char *command = calloc(1, BUFFER_SIZE); assert(command); - uint64_t sTime = pThreadInfo->start_time; + uint64_t startTime = pThreadInfo->start_time; char *tb_prefix = pThreadInfo->tb_prefix; FILE *fp = fopen(pThreadInfo->filePath, "a"); if (NULL == fp) { @@ -9988,7 +10106,7 @@ static void *readTable(void *sarg) { uint64_t count = 0; for (int64_t i = 0; i < ntables; i++) { sprintf(command, "SELECT %s FROM %s%"PRId64" WHERE ts>= %" PRIu64, - g_aggreFunc[j], tb_prefix, i, sTime); + g_aggreFunc[j], tb_prefix, i, startTime); double t = taosGetTimestampMs(); TAOS_RES *pSql = taos_query(taos, command); -- GitLab