From 5e43fc0d245bcf2dc23e0a51ce8697efa014c90a Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sun, 11 Apr 2021 11:09:54 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 3733 taosdemo buffer processing (#5769) * [TD-3733]: taosdemo buffer processing refactor. * [TD-3733]: taosdemo generate SQL head buffer process. * [TD-3733]: taosdemo generate sql head buffer processing. fix compile issue for windows. * [TD-3733]: taosdemo generate sql head buffer processing. fix max_sql_len. generate data buffer refactor. * resolve conflict with master. Co-authored-by: Shuduo Sang --- src/client/inc/tsclient.h | 11 ----- src/kit/taosdemo/taosdemo.c | 89 ++++++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9072b6f349..c91943e232 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -146,17 +146,6 @@ typedef struct SInternalField { SExprFilter *pFieldFilters; } SInternalField; -typedef struct SFieldInfo { - int16_t numOfOutput; // number of column in result - SArray *internalField; // SArray -} SFieldInfo; - -typedef struct SColumn { - SColumnIndex colIndex; - int32_t numOfFilters; - SColumnFilterInfo *filterInfo; -} SColumn; - typedef struct SCond { uint64_t uid; int32_t len; // length of tag query condition data diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index c34b00fc8c..ed2b74c973 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3211,9 +3211,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // rows per table need be less than insert batch if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %d > num_of_records_per_request %d\n\n", + printf("NOTICE: interlace rows value %d > num_of_records_per_req %d\n\n", g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %d\n\n", g_args.num_of_RPR); printf(" press Enter key to continue or Ctrl-C to stop."); (void)getchar(); @@ -3682,14 +3682,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { int32_t len = maxSqlLen->valueint; if (len > TSDB_MAX_ALLOWED_SQL_LEN) { len = TSDB_MAX_ALLOWED_SQL_LEN; - } else if (len < TSDB_MAX_SQL_LEN) { - len = TSDB_MAX_SQL_LEN; + } else if (len < 5) { + len = 5; } g_Dbs.db[i].superTbls[j].maxSqlLen = len; } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; } else { - printf("ERROR: failed to read json, maxSqlLen not found\n"); + errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n", + __func__, __LINE__); goto PARSE_OVER; } @@ -3715,9 +3716,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; // rows per table need be less than insert batch if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { - printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_request %d\n\n", + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_req %d\n\n", i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %d\n\n", g_args.num_of_RPR); printf(" press Enter key to continue or Ctrl-C to stop."); (void)getchar(); @@ -4511,13 +4512,15 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq) } } -static int generateDataTail(char *tableName, int32_t tableSeq, - threadInfo* pThreadInfo, SSuperTable* superTblInfo, +static int generateDataTail( + SSuperTable* superTblInfo, int batch, char* buffer, int remainderBufLen, int64_t insertRows, int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) { int len = 0; int ncols_per_record = 1; // count first col ts + char *pstr = buffer; + if (superTblInfo == NULL) { int datatypeSeq = 0; while(g_args.datatype[datatypeSeq]) { @@ -4546,15 +4549,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq, pSamplePos); } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = taosRandom() % 100; - int randTail; - if (0 != superTblInfo->disorderRatio - && rand_num < superTblInfo->disorderRatio) { - randTail = (superTblInfo->timeStampStep * k - + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %d\n", randTail); - } else { - randTail = superTblInfo->timeStampStep * k; + + int randTail = superTblInfo->timeStampStep * k; + if (superTblInfo->disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < superTblInfo->disorderRatio) { + randTail = (randTail + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %d\n", randTail); + } } uint64_t d = startTime @@ -4569,7 +4571,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, break; } - buffer += snprintf(buffer, retLen + 1, "%s", data); + pstr += snprintf(pstr , retLen + 1, "%s", data); k++; len += retLen; remainderBufLen -= retLen; @@ -4597,7 +4599,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, if (len > remainderBufLen) break; - buffer += sprintf(buffer, " %s", data); + pstr += sprintf(pstr, " %s", data); k++; len += retLen; remainderBufLen -= retLen; @@ -4622,6 +4624,10 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, char *buffer, int remainderBufLen) { int len; + +#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. + char headBuf[HEAD_BUFF_LEN]; + if (superTblInfo) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; @@ -4638,8 +4644,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, return -1; } - len = snprintf(buffer, - superTblInfo->maxSqlLen, + len = snprintf( + headBuf, + HEAD_BUFF_LEN, "insert into %s.%s using %s.%s tags %s values", pThreadInfo->db_name, tableName, @@ -4648,26 +4655,34 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, tagsValBuf); tmfree(tagsValBuf); } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - len = snprintf(buffer, - superTblInfo->maxSqlLen, + len = snprintf( + headBuf, + HEAD_BUFF_LEN, "insert into %s.%s values", pThreadInfo->db_name, tableName); } else { - len = snprintf(buffer, - superTblInfo->maxSqlLen, + len = snprintf( + headBuf, + HEAD_BUFF_LEN, "insert into %s.%s values", pThreadInfo->db_name, tableName); } } else { - len = snprintf(buffer, - g_args.max_sql_len, + len = snprintf( + headBuf, + HEAD_BUFF_LEN, "insert into %s.%s values", pThreadInfo->db_name, tableName); } + if (len > remainderBufLen) + return -1; + + tstrncpy(buffer, headBuf, len + 1); + return len; } @@ -4709,7 +4724,7 @@ static int generateInterlaceDataBuffer( startTime = 1500000000000; } int k = generateDataTail( - tableName, tableSeq, pThreadInfo, superTblInfo, + superTblInfo, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, &(pThreadInfo->samplePos), &dataLen); @@ -4764,7 +4779,7 @@ static int generateProgressiveDataBuffer( remainderBufLen -= headLen; int dataLen; - k = generateDataTail(tableName, tableSeq, pThreadInfo, superTblInfo, + k = generateDataTail(superTblInfo, g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, startTime, pSamplePos, &dataLen); @@ -4781,8 +4796,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; - if (interlaceRows > insertRows) - interlaceRows = insertRows; + if (interlaceRows > g_args.num_of_RPR) + interlaceRows = g_args.num_of_RPR; int insertMode; @@ -4847,8 +4862,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { bool flagSleep = true; int sleepTimeTotal = 0; - int remainderBufLen; - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampUs(); @@ -4856,7 +4869,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } // generate data memset(buffer, 0, maxSqlLen); - remainderBufLen = maxSqlLen; + int remainderBufLen = maxSqlLen; char *pstr = buffer; int recOfBatch = 0; @@ -5102,11 +5115,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { */ } // num_of_DPT - if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && + if (g_args.verbose_print) { + if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { - printf("%s() LN%d samplePos=%d\n", + verbosePrint("%s() LN%d samplePos=%d\n", __func__, __LINE__, pThreadInfo->samplePos); + } } } // tableSeq -- GitLab