From 76db6804e4ea28f44cf896a5ad8204d937a938fc Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sun, 11 Apr 2021 20:44:12 +0800 Subject: [PATCH] Feature/sangshuduo/td 3317 taosdemo interlace (#5773) * [TD-3316] : add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] : add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] : add test case for limit and offset. fix sample data issue. * [TD-3327] : fix taosdemo segfault when import data from sample data file. * [TD-3317] : make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode insertion. refactor. * [TD-3317] : support interlace mode insertion. change json file. * [TD-3317] : support interlace mode insertion. fix multithread create table regression. * [TD-3317] : support interlace mode insertion. working but not perfect. * [TD-3317] : support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] : support interlace mode insertion. perfect * [TD-3317] : support interlace mode insertion. cleanup. * [TD-3317] : support interlace mode insertion. adjust algorithm of loop times. * [TD-3317] : support interlace mode insertion. fix delay time bug. * [TD-3317] : support interlace mode insertion. fix progressive timestamp bug. * [TD-3317] : support interlace mode insertion. add an option for performance print. * [TD-3317] : support interlace mode insertion. change json test case with less table for acceleration. * [TD-3317] : support interlace mode insertion. change progressive mode timestamp step and testcase. * [TD-3197] : fix taosdemo coverity scan issues. * [TD-3197] : fix taosdemo coverity scan issue. fix subscribeTest pids uninitialized. * [TD-3317] : support interlace mode insertion. add time shift for no sleep time. * [TD-3317] : support interlace insert. rework timestamp. * [TD-3317] : support interlace mode insertion. change rows_per_tbl to interlace_rows. * [TD-3317] : taosdemo suppoert interlace mode. remove trailing spaces. * [TD-3317] : taosdemo support interlace insertion. prompt if interlace > num_of_records_per_req * fill insert-into early to buffer. * fix buffer overflow issue. * change rows_per_tbl to interlace_rows to align with taosdemo. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 70 +++++++++++++++-------- tests/pytest/tools/taosdemoPerformance.py | 4 +- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index ed2b74c973..49e550cc01 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -18,6 +18,7 @@ when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread. */ +#include #define _GNU_SOURCE #define CURL_STATICLIB @@ -3242,7 +3243,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) { g_args.num_of_RPR = numRecPerReq->valueint; } else if (!numRecPerReq) { - g_args.num_of_RPR = 0xffff; + g_args.num_of_RPR = INT32_MAX; } else { errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); @@ -4647,7 +4648,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, len = snprintf( headBuf, HEAD_BUFF_LEN, - "insert into %s.%s using %s.%s tags %s values", + "%s.%s using %s.%s tags %s values", pThreadInfo->db_name, tableName, pThreadInfo->db_name, @@ -4658,14 +4659,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, len = snprintf( headBuf, HEAD_BUFF_LEN, - "insert into %s.%s values", + "%s.%s values", pThreadInfo->db_name, tableName); } else { len = snprintf( headBuf, HEAD_BUFF_LEN, - "insert into %s.%s values", + "%s.%s values", pThreadInfo->db_name, tableName); } @@ -4673,7 +4674,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, len = snprintf( headBuf, HEAD_BUFF_LEN, - "insert into %s.%s values", + "%s.%s values", pThreadInfo->db_name, tableName); } @@ -4694,6 +4695,7 @@ static int generateInterlaceDataBuffer( int64_t startTime, int *pRemainderBufLen) { + assert(buffer); char *pstr = buffer; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -4723,18 +4725,20 @@ static int generateInterlaceDataBuffer( } else { startTime = 1500000000000; } + int k = generateDataTail( superTblInfo, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, &(pThreadInfo->samplePos), &dataLen); - if (k > 0) { + if (k == batchPerTbl) { pstr += dataLen; *pRemainderBufLen -= dataLen; } else { pstr -= headLen; pstr[0] = '\0'; + k = 0; } return k; @@ -4745,7 +4749,8 @@ static int generateProgressiveDataBuffer( int32_t tableSeq, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, - int64_t startFrom, int64_t startTime, int *pSamplePos) + int64_t startFrom, int64_t startTime, int *pSamplePos, + int *pRemainderBufLen) { SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -4760,27 +4765,24 @@ static int generateProgressiveDataBuffer( } assert(buffer != NULL); + char *pstr = buffer; int k = 0; - int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - int remainderBufLen = maxSqlLen; - - memset(buffer, 0, maxSqlLen); - char *pstr = buffer; + memset(buffer, 0, *pRemainderBufLen); int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, - buffer, remainderBufLen); + buffer, *pRemainderBufLen); if (headLen <= 0) { return 0; } pstr += headLen; - remainderBufLen -= headLen; + *pRemainderBufLen -= headLen; int dataLen; k = generateDataTail(superTblInfo, - g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, + g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, startTime, pSamplePos, &dataLen); @@ -4842,18 +4844,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t startTime = pThreadInfo->start_time; - int batchPerTblTimes; - int batchPerTbl; - assert(pThreadInfo->ntables > 0); if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - batchPerTbl = interlaceRows; + int batchPerTbl = interlaceRows; + + int batchPerTblTimes; if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { batchPerTblTimes = - (g_args.num_of_RPR / (interlaceRows * pThreadInfo->ntables)) + 1; + g_args.num_of_RPR / interlaceRows; } else { batchPerTblTimes = 1; } @@ -4862,6 +4863,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { bool flagSleep = true; int sleepTimeTotal = 0; + char *strInsertInto = "insert into "; + int nInsertBufLen = strlen(strInsertInto); + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampUs(); @@ -4872,6 +4876,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int remainderBufLen = maxSqlLen; char *pstr = buffer; + + int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); + pstr += len; + remainderBufLen -= len; + int recOfBatch = 0; for (int i = 0; i < batchPerTblTimes; i ++) { @@ -4883,6 +4892,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { return NULL; } + int oldRemainderLen = remainderBufLen; int generated = generateInterlaceDataBuffer( tableName, batchPerTbl, i, batchPerTblTimes, tableSeq, @@ -4901,6 +4911,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { tableSeq ++; recOfBatch += batchPerTbl; + pstr += (oldRemainderLen - remainderBufLen); // startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", @@ -5012,11 +5023,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); + char* buffer = calloc(maxSqlLen, 1); if (NULL == buffer) { errorPrint( "Failed to alloc %d Bytes, reason:%s\n", - superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, + maxSqlLen, strerror(errno)); return NULL; } @@ -5059,10 +5071,20 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { __func__, __LINE__, pThreadInfo->threadID, tableSeq, tableName); + int remainderBufLen = maxSqlLen; + char *pstr = buffer; + int nInsertBufLen = strlen("insert into "); + + int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); + + pstr += len; + remainderBufLen -= len; + int generated = generateProgressiveDataBuffer( - tableName, tableSeq, pThreadInfo, buffer, insertRows, + tableName, tableSeq, pThreadInfo, pstr, insertRows, i, start_time, - &(pThreadInfo->samplePos)); + &(pThreadInfo->samplePos), + &remainderBufLen); if (generated > 0) i += generated; else diff --git a/tests/pytest/tools/taosdemoPerformance.py b/tests/pytest/tools/taosdemoPerformance.py index 1156cc2484..8ce99d0969 100644 --- a/tests/pytest/tools/taosdemoPerformance.py +++ b/tests/pytest/tools/taosdemoPerformance.py @@ -51,7 +51,7 @@ class taosdemoPerformace: "insert_rows": 100000, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, + "interlace_rows": 100, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, @@ -159,4 +159,4 @@ if __name__ == '__main__': perftest = taosdemoPerformace(args.commit_id, args.database_name) perftest.insertData() - perftest.createTablesAndStoreData() \ No newline at end of file + perftest.createTablesAndStoreData() -- GitLab