diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 2358d666a998ea4a367fceef95f52b106daa25b7..ae9289e3b0982ef950f5f67674c3b91ae41d0895 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -244,7 +244,7 @@ typedef struct SArguments_S { uint64_t insert_interval; uint64_t timestamp_step; int64_t query_times; - uint32_t interlace_rows; + uint32_t interlaceRows; uint32_t reqPerReq; // num_of_records_per_req uint64_t max_sql_len; int64_t ntables; @@ -451,14 +451,13 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; TAOS_STMT *stmt; + int64_t *bind_ts; #if STMT_BIND_PARAM_BATCH == 1 - int64_t *bind_ts; int64_t *bind_ts_array; char *bindParams; char *is_null; #else - int64_t *bind_ts; char* sampleBindArray; #endif @@ -607,8 +606,8 @@ char *g_rand_current_buff = NULL; char *g_rand_phase_buff = NULL; char *g_randdouble_buff = NULL; -char *g_aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", - "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; +char *g_aggreFunc[] = {"*", "count(*)", "avg(C0)", "sum(C0)", + "max(C0)", "min(C0)", "first(C0)", "last(C0)"}; SArguments g_args = { NULL, // metaFile @@ -652,7 +651,7 @@ SArguments g_args = { 0, // insert_interval DEFAULT_TIMESTAMP_STEP, // timestamp_step 1, // query_times - DEFAULT_INTERLACE_ROWS, // interlace_rows; + DEFAULT_INTERLACE_ROWS, // interlaceRows; 30000, // reqPerReq (1024*1024), // max_sql_len DEFAULT_CHILDTABLES, // ntables @@ -1310,17 +1309,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { errorPrintReqArg2(argv[0], "B"); exit(EXIT_FAILURE); } - arguments->interlace_rows = atoi(argv[++i]); + arguments->interlaceRows = atoi(argv[++i]); } else if (0 == strncmp(argv[i], "--interlace-rows=", strlen("--interlace-rows="))) { if (isStringNumber((char *)(argv[i] + strlen("--interlace-rows=")))) { - arguments->interlace_rows = atoi((char *)(argv[i]+strlen("--interlace-rows="))); + arguments->interlaceRows = atoi((char *)(argv[i]+strlen("--interlace-rows="))); } else { errorPrintReqArg2(argv[0], "--interlace-rows"); exit(EXIT_FAILURE); } } else if (0 == strncmp(argv[i], "-B", strlen("-B"))) { if (isStringNumber((char *)(argv[i] + strlen("-B")))) { - arguments->interlace_rows = atoi((char *)(argv[i]+strlen("-B"))); + arguments->interlaceRows = atoi((char *)(argv[i]+strlen("-B"))); } else { errorPrintReqArg2(argv[0], "-B"); exit(EXIT_FAILURE); @@ -1333,7 +1332,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { errorPrintReqArg2(argv[0], "--interlace-rows"); exit(EXIT_FAILURE); } - arguments->interlace_rows = atoi(argv[++i]); + arguments->interlaceRows = atoi(argv[++i]); } else { errorUnrecognized(argv[0], argv[i]); exit(EXIT_FAILURE); @@ -4859,15 +4858,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows"); if (interlaceRows && interlaceRows->type == cJSON_Number) { if (interlaceRows->valueint < 0) { - errorPrint("%s", "failed to read json, interlace_rows input mistake\n"); + errorPrint("%s", "failed to read json, interlaceRows input mistake\n"); goto PARSE_OVER; } - g_args.interlace_rows = interlaceRows->valueint; + g_args.interlaceRows = interlaceRows->valueint; } else if (!interlaceRows) { - g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req + g_args.interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { - errorPrint("%s", "failed to read json, interlace_rows input mistake\n"); + errorPrint("%s", "failed to read json, interlaceRows input mistake\n"); goto PARSE_OVER; } @@ -4929,13 +4928,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } // rows per table need be less than insert batch - if (g_args.interlace_rows > g_args.reqPerReq) { + if (g_args.interlaceRows > g_args.reqPerReq) { printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n", - g_args.interlace_rows, g_args.reqPerReq); + g_args.interlaceRows, g_args.reqPerReq); printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.reqPerReq); prompt(); - g_args.interlace_rows = g_args.reqPerReq; + g_args.interlaceRows = g_args.reqPerReq; } cJSON* dbs = cJSON_GetObjectItem(root, "databases"); @@ -8462,13 +8461,13 @@ static void printStatPerThread(threadInfo *pThreadInfo) ); } -// sync write interlace data -static void* syncWriteInterlace(threadInfo *pThreadInfo) { - debugPrint("[%d] %s() LN%d: ### interlace write\n", +#if STMT_BIND_PARAM_BATCH == 1 +// stmt sync write interlace data +static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t interlaceRows) { + debugPrint("[%d] %s() LN%d: ### stmt interlace write\n", pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint32_t interlaceRows; uint64_t maxSqlLen; int64_t nTimeStampStep; uint64_t insert_interval; @@ -8477,19 +8476,235 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (stbInfo) { insertRows = stbInfo->insertRows; + maxSqlLen = stbInfo->maxSqlLen; + nTimeStampStep = stbInfo->timeStampStep; + insert_interval = stbInfo->insertInterval; + } else { + insertRows = g_args.insertRows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = g_args.timestamp_step; + insert_interval = g_args.insert_interval; + } - if ((stbInfo->interlaceRows == 0) - && (g_args.interlace_rows > 0)) { - interlaceRows = g_args.interlace_rows; - } else { - interlaceRows = stbInfo->interlaceRows; + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; + + if (interlaceRows > g_args.reqPerReq) + interlaceRows = g_args.reqPerReq; + + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.reqPerReq / interlaceRows; + } else { + batchPerTblTimes = 1; + } + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + uint64_t st = 0; + uint64_t et = UINT64_MAX; + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + uint64_t tableSeq = pThreadInfo->start_table_from; + int64_t startTime = pThreadInfo->start_time; + + uint64_t generatedRecPerTbl = 0; + bool flagSleep = true; + uint64_t sleepTimeTotal = 0; + + int percentComplete = 0; + int64_t totalRows = insertRows * pThreadInfo->ntables; + + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { + if ((flagSleep) && (insert_interval)) { + st = taosGetTimestampMs(); + flagSleep = false; } + + uint32_t recOfBatch = 0; + + int32_t generated; + for (uint64_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; + + getTableName(tableName, pThreadInfo, tableSeq); + if (0 == strlen(tableName)) { + errorPrint2("[%d] %s() LN%d, getTableName return null\n", + pThreadInfo->threadID, __func__, __LINE__); + return NULL; + } + + if (stbInfo) { + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime, + &(pThreadInfo->samplePos)); + } else { + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); + generated = prepareStmtWithoutStb( + pThreadInfo, + tableName, + batchPerTbl, + insertRows, i, + startTime); + } + + debugPrint("[%d] %s() LN%d, generated records is %d\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + if (generated < 0) { + errorPrint2("[%d] %s() LN%d, generated records is %d\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + goto free_of_interlace_stmt; + } else if (generated == 0) { + break; + } + + tableSeq ++; + recOfBatch += batchPerTbl; + + pThreadInfo->totalInsertRows += batchPerTbl; + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl, recOfBatch); + + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + // turn to first table + tableSeq = pThreadInfo->start_table_from; + generatedRecPerTbl += batchPerTbl; + + startTime = pThreadInfo->start_time + + generatedRecPerTbl * nTimeStampStep; + + flagSleep = true; + if (generatedRecPerTbl >= insertRows) + break; + + int64_t remainRows = insertRows - generatedRecPerTbl; + if ((remainRows > 0) && (batchPerTbl > remainRows)) + batchPerTbl = remainRows; + + if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq) + break; + } + + verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, + generatedRecPerTbl, insertRows); + + if ((g_args.reqPerReq - recOfBatch) < batchPerTbl) + break; + } + + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, recOfBatch, + pThreadInfo->totalInsertRows); + + startTs = taosGetTimestampUs(); + + if (recOfBatch == 0) { + errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl); + if (batchPerTbl > 0) { + errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n", + batchPerTbl, maxSqlLen / batchPerTbl); + } + goto free_of_interlace_stmt; + } + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); + + endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %10.2f ms\n", + __func__, __LINE__, delay / 1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + if (recOfBatch != affectedRows) { + errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n", + pThreadInfo->threadID, __func__, __LINE__, + recOfBatch, affectedRows); + goto free_of_interlace_stmt; + } + + pThreadInfo->totalAffectedRows += affectedRows; + + int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + if ((insert_interval) && flagSleep) { + et = taosGetTimestampMs(); + + if (insert_interval > (et - st) ) { + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", + __func__, __LINE__, sleepTime); + taosMsleep(sleepTime); // ms + sleepTimeTotal += insert_interval; + } + } + } + if (percentComplete < 100) + printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); + +free_of_interlace_stmt: + printStatPerThread(pThreadInfo); + return NULL; +} +#else +// stmt sync write interlace data +static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceRows) { + debugPrint("[%d] %s() LN%d: ### stmt interlace write\n", + pThreadInfo->threadID, __func__, __LINE__); + + int64_t insertRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; + + SSuperTable* stbInfo = pThreadInfo->stbInfo; + + if (stbInfo) { + insertRows = stbInfo->insertRows; maxSqlLen = stbInfo->maxSqlLen; nTimeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; } else { insertRows = g_args.insertRows; - interlaceRows = g_args.interlace_rows; maxSqlLen = g_args.max_sql_len; nTimeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; @@ -8500,9 +8715,232 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->start_table_from, pThreadInfo->ntables, insertRows); - if (interlaceRows > insertRows) - interlaceRows = insertRows; + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; + + if (interlaceRows > g_args.reqPerReq) + interlaceRows = g_args.reqPerReq; + + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.reqPerReq / interlaceRows; + } else { + batchPerTblTimes = 1; + } + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + uint64_t st = 0; + uint64_t et = UINT64_MAX; + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + uint64_t tableSeq = pThreadInfo->start_table_from; + int64_t startTime = pThreadInfo->start_time; + + uint64_t generatedRecPerTbl = 0; + bool flagSleep = true; + uint64_t sleepTimeTotal = 0; + + int percentComplete = 0; + int64_t totalRows = insertRows * pThreadInfo->ntables; + + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { + if ((flagSleep) && (insert_interval)) { + st = taosGetTimestampMs(); + flagSleep = false; + } + + uint32_t recOfBatch = 0; + + int32_t generated; + for (uint64_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; + + getTableName(tableName, pThreadInfo, tableSeq); + if (0 == strlen(tableName)) { + errorPrint2("[%d] %s() LN%d, getTableName return null\n", + pThreadInfo->threadID, __func__, __LINE__); + return NULL; + } + + if (stbInfo) { + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime, + &(pThreadInfo->samplePos)); + } else { + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); + generated = prepareStmtWithoutStb( + pThreadInfo, + tableName, + batchPerTbl, + insertRows, i, + startTime); + } + + debugPrint("[%d] %s() LN%d, generated records is %d\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + if (generated < 0) { + errorPrint2("[%d] %s() LN%d, generated records is %d\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + goto free_of_interlace_stmt; + } else if (generated == 0) { + break; + } + + tableSeq ++; + recOfBatch += batchPerTbl; + + pThreadInfo->totalInsertRows += batchPerTbl; + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl, recOfBatch); + + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + // turn to first table + tableSeq = pThreadInfo->start_table_from; + generatedRecPerTbl += batchPerTbl; + + startTime = pThreadInfo->start_time + + generatedRecPerTbl * nTimeStampStep; + + flagSleep = true; + if (generatedRecPerTbl >= insertRows) + break; + + int64_t remainRows = insertRows - generatedRecPerTbl; + if ((remainRows > 0) && (batchPerTbl > remainRows)) + batchPerTbl = remainRows; + + if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq) + break; + } + + verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, + generatedRecPerTbl, insertRows); + + if ((g_args.reqPerReq - recOfBatch) < batchPerTbl) + break; + } + + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, recOfBatch, + pThreadInfo->totalInsertRows); + + startTs = taosGetTimestampUs(); + + if (recOfBatch == 0) { + errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl); + if (batchPerTbl > 0) { + errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n", + batchPerTbl, maxSqlLen / batchPerTbl); + } + goto free_of_interlace_stmt; + } + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); + + endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %10.2f ms\n", + __func__, __LINE__, delay / 1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + if (recOfBatch != affectedRows) { + errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n", + pThreadInfo->threadID, __func__, __LINE__, + recOfBatch, affectedRows); + goto free_of_interlace_stmt; + } + + pThreadInfo->totalAffectedRows += affectedRows; + int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + if ((insert_interval) && flagSleep) { + et = taosGetTimestampMs(); + + if (insert_interval > (et - st) ) { + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", + __func__, __LINE__, sleepTime); + taosMsleep(sleepTime); // ms + sleepTimeTotal += insert_interval; + } + } + } + if (percentComplete < 100) + printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); + +free_of_interlace_stmt: + printStatPerThread(pThreadInfo); + return NULL; +} + +#endif + +// sync write interlace data +static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { + debugPrint("[%d] %s() LN%d: ### interlace write\n", + pThreadInfo->threadID, __func__, __LINE__); + + int64_t insertRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; + + SSuperTable* stbInfo = pThreadInfo->stbInfo; + + if (stbInfo) { + insertRows = stbInfo->insertRows; + maxSqlLen = stbInfo->maxSqlLen; + nTimeStampStep = stbInfo->timeStampStep; + insert_interval = stbInfo->insertInterval; + } else { + insertRows = g_args.insertRows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = g_args.timestamp_step; + insert_interval = g_args.insert_interval; + } + + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); +#if 1 if (interlaceRows > g_args.reqPerReq) interlaceRows = g_args.reqPerReq; @@ -8515,7 +8953,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { batchPerTblTimes = 1; } +#else + uint32_t batchPerTbl; + if (interlaceRows > g_args.reqPerReq) + batchPerTbl = g_args.reqPerReq; + else + batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; + + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + interlaceRows / batchPerTbl; + } else { + batchPerTblTimes = 1; + } +#endif pThreadInfo->buffer = calloc(maxSqlLen, 1); if (NULL == pThreadInfo->buffer) { errorPrint2( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", @@ -8548,6 +9001,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { st = taosGetTimestampMs(); flagSleep = false; } + // generate data memset(pThreadInfo->buffer, 0, maxSqlLen); uint64_t remainderBufLen = maxSqlLen; @@ -8576,47 +9030,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t oldRemainderLen = remainderBufLen; if (stbInfo) { - if (stbInfo->iface == STMT_IFACE) { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime, - &(pThreadInfo->samplePos)); - } else { - generated = generateStbInterlaceData( - pThreadInfo, - tableName, batchPerTbl, i, - batchPerTblTimes, - tableSeq, - pstr, - insertRows, - startTime, - &remainderBufLen); - } + generated = generateStbInterlaceData( + pThreadInfo, + tableName, batchPerTbl, i, + batchPerTblTimes, + tableSeq, + pstr, + insertRows, + startTime, + &remainderBufLen); } else { - if (g_args.iface == STMT_IFACE) { - debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", - pThreadInfo->threadID, - __func__, __LINE__, - tableName, batchPerTbl, startTime); - generated = prepareStmtWithoutStb( - pThreadInfo, - tableName, - batchPerTbl, - insertRows, i, - startTime); - } else { - generated = generateInterlaceDataWithoutStb( - tableName, batchPerTbl, - tableSeq, - pThreadInfo->db_name, pstr, - insertRows, - startTime, - &remainderBufLen); - } + generated = generateInterlaceDataWithoutStb( + tableName, batchPerTbl, + tableSeq, + pThreadInfo->db_name, pstr, + insertRows, + startTime, + &remainderBufLen); } debugPrint("[%d] %s() LN%d, generated records is %d\n", @@ -8932,23 +9362,29 @@ static void* syncWrite(void *sarg) { setThreadName("syncWrite"); - uint32_t interlaceRows; + uint32_t interlaceRows = 0; if (stbInfo) { - if ((stbInfo->interlaceRows == 0) - && (g_args.interlace_rows > 0)) { - interlaceRows = g_args.interlace_rows; - } else { + if (stbInfo->interlaceRows < stbInfo->insertRows) interlaceRows = stbInfo->interlaceRows; - } } else { - interlaceRows = g_args.interlace_rows; + if (g_args.interlaceRows < g_args.insertRows) + interlaceRows = g_args.interlaceRows; } if (interlaceRows > 0) { // interlace mode - return syncWriteInterlace(pThreadInfo); - } else { + if (((stbInfo) && (STMT_IFACE == stbInfo->iface)) + || (STMT_IFACE == g_args.iface)) { +#if STMT_BIND_PARAM_BATCH == 1 + return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows); +#else + return syncWriteInterlaceStmt(pThreadInfo, interlaceRows); +#endif + } else { + return syncWriteInterlace(pThreadInfo, interlaceRows); + } + }else { // progressive mode return syncWriteProgressive(pThreadInfo); } @@ -9231,22 +9667,25 @@ static void startMultiThreadInsertData(int threads, char* db_name, assert(stmtBuffer); #if STMT_BIND_PARAM_BATCH == 1 - uint32_t interlaceRows; + uint32_t interlaceRows = 0; uint32_t batch; if (stbInfo) { if ((stbInfo->interlaceRows == 0) - && (g_args.interlace_rows > 0)) { - interlaceRows = g_args.interlace_rows; + && (g_args.interlaceRows > 0) + ) { + interlaceRows = g_args.interlaceRows; - if (interlaceRows > stbInfo->insertRows) { - interlaceRows = stbInfo->insertRows; - } } else { interlaceRows = stbInfo->interlaceRows; } + + if (interlaceRows > stbInfo->insertRows) { + interlaceRows = 0; + } } else { - interlaceRows = g_args.interlace_rows; + if (g_args.interlaceRows < g_args.insertRows) + interlaceRows = g_args.interlaceRows; } if (interlaceRows > 0) { @@ -9408,13 +9847,12 @@ static void startMultiThreadInsertData(int threads, char* db_name, taos_stmt_close(pThreadInfo->stmt); } -#if STMT_BIND_PARAM_BATCH == 1 tmfree((char *)pThreadInfo->bind_ts); +#if STMT_BIND_PARAM_BATCH == 1 tmfree((char *)pThreadInfo->bind_ts_array); tmfree(pThreadInfo->bindParams); tmfree(pThreadInfo->is_null); #else - tmfree((char *)pThreadInfo->bind_ts); if (pThreadInfo->sampleBindArray) { for (int k = 0; k < MAX_SAMPLES; k++) { uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( diff --git a/tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json b/tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json index 1b56830189623d344168918f239887c3359b2645..197f8a208e85ca4ce57c06518a433ec3a3acbac3 100644 --- a/tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json +++ b/tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json @@ -41,7 +41,7 @@ "batch_create_tbl_num": 10, "data_source": "rand", "insert_mode": "taosc", - "insert_rows": 1000, + "insert_rows": 1001, "childtable_limit": 0, "childtable_offset":0, "multi_thread_write_one_tbl": "no",