diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4ca4031b1fe2d422a57feade52658a8cbbbb50d2..1d3b0d1ade0e8c632a28950401d9cd077cd175c3 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -61,6 +61,8 @@ extern char configDir[]; #define QUERY_JSON_NAME "query.json" #define SUBSCRIBE_JSON_NAME "subscribe.json" +#define STR_INSERT_INTO "INSERT INTO " + enum TEST_MODE { INSERT_TEST, // 0 QUERY_TEST, // 1 @@ -5093,7 +5095,8 @@ static int generateStbSQLHead( return len; } -static int64_t generateInterlaceDataBuffer( +static int64_t generateStbInterlaceData( + SSuperTable *superTblInfo, char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, uint64_t tableSeq, threadInfo *pThreadInfo, char *buffer, @@ -5103,12 +5106,11 @@ static int64_t generateInterlaceDataBuffer( { assert(buffer); char *pstr = buffer; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; int headLen = generateStbSQLHead( superTblInfo, tableName, tableSeq, pThreadInfo->db_name, - pstr, *pRemainderBufLen); + pstr, *pRemainderBufLen); if (headLen <= 0) { return 0; @@ -5126,24 +5128,58 @@ static int64_t generateInterlaceDataBuffer( pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - int64_t k; - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); - } + } - k = generateStbDataTail( + int64_t k = generateStbDataTail( superTblInfo, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, &(pThreadInfo->samplePos), &dataLen); + + if (k == batchPerTbl) { + pstr += dataLen; + *pRemainderBufLen -= dataLen; } else { - startTime = 1500000000000; - k = generateDataTailWithoutStb( + debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + __func__, __LINE__, k, batchPerTbl); + pstr -= headLen; + pstr[0] = '\0'; + k = 0; + } + + return k; +} + +static int64_t generateInterlaceDataWithoutStb( + char *tableName, uint64_t batchPerTbl, + uint64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + uint64_t *pRemainderBufLen) +{ + assert(buffer); + char *pstr = buffer; + + int headLen = generateSQLHeadWithoutStb( + tableName, dbName, + pstr, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen = 0; + + int64_t startTime = 1500000000000; + int64_t k = generateDataTailWithoutStb( batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, - /* &(pThreadInfo->samplePos), */&dataLen); - } + &dataLen); if (k == batchPerTbl) { pstr += dataLen; @@ -5159,33 +5195,24 @@ static int64_t generateInterlaceDataBuffer( return k; } -static int64_t generateProgressiveDataBuffer( +static int64_t generateStbProgressiveData( + SSuperTable *superTblInfo, char *tableName, int64_t tableSeq, - threadInfo *pThreadInfo, char *buffer, + char *dbName, char *buffer, int64_t insertRows, uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *pRemainderBufLen) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - assert(buffer != NULL); char *pstr = buffer; memset(buffer, 0, *pRemainderBufLen); - int64_t headLen; - - if (superTblInfo) { - headLen = generateStbSQLHead( + int64_t headLen = generateStbSQLHead( superTblInfo, - tableName, tableSeq, pThreadInfo->db_name, + tableName, tableSeq, dbName, buffer, *pRemainderBufLen); - } else { - headLen = generateSQLHeadWithoutStb( - tableName, pThreadInfo->db_name, - buffer, *pRemainderBufLen); - } if (headLen <= 0) { return 0; @@ -5194,22 +5221,43 @@ static int64_t generateProgressiveDataBuffer( *pRemainderBufLen -= headLen; int64_t dataLen; - int64_t k; - if (superTblInfo) { - k = generateStbDataTail(superTblInfo, + return generateStbDataTail(superTblInfo, g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, startTime, pSamplePos, &dataLen); - } else { - k = generateDataTailWithoutStb( +} + +static int64_t generateProgressiveDataWithoutStb( + char *tableName, + int64_t tableSeq, + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, + int64_t *pRemainderBufLen) +{ + assert(buffer != NULL); + char *pstr = buffer; + + memset(buffer, 0, *pRemainderBufLen); + + int64_t headLen = generateSQLHeadWithoutStb( + tableName, pThreadInfo->db_name, + buffer, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen; + + return generateDataTailWithoutStb( g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, startTime, /*pSamplePos, */&dataLen); - } - - return k; } static void printStatPerThread(threadInfo *pThreadInfo) @@ -5228,6 +5276,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t insertRows; uint64_t interlaceRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -5240,26 +5291,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { interlaceRows = superTblInfo->interlaceRows; } + maxSqlLen = superTblInfo->maxSqlLen; + nTimeStampStep = superTblInfo->timeStampStep; + insert_interval = superTblInfo->insertInterval; } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = DEFAULT_TIMESTAMP_STEP; + insert_interval = g_args.insert_interval; } + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + if (interlaceRows > insertRows) interlaceRows = insertRows; if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - int progOrInterlace; + uint64_t batchPerTbl = interlaceRows; + uint64_t batchPerTblTimes; - if (interlaceRows > 0) { - progOrInterlace= INTERLACE_INSERT_MODE; + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.num_of_RPR / interlaceRows; } else { - progOrInterlace = PROGRESSIVE_INSERT_MODE; + batchPerTblTimes = 1; } - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; pThreadInfo->buffer = calloc(maxSqlLen, 1); if (NULL == pThreadInfo->buffer) { errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", @@ -5267,16 +5330,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { return NULL; } - char tableName[TSDB_TABLE_NAME_LEN]; - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - int64_t nTimeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; - - uint64_t insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; uint64_t et = UINT64_MAX; @@ -5285,31 +5341,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t endTs; uint64_t tableSeq = pThreadInfo->start_table_from; - - debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, - pThreadInfo->start_table_from, - pThreadInfo->ntables, insertRows); - int64_t startTime = pThreadInfo->start_time; - uint64_t batchPerTbl = interlaceRows; - uint64_t batchPerTblTimes; - - if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { - batchPerTblTimes = - g_args.num_of_RPR / interlaceRows; - } else { - batchPerTblTimes = 1; - } - uint64_t generatedRecPerTbl = 0; bool flagSleep = true; uint64_t sleepTimeTotal = 0; - char *strInsertInto = "insert into "; - int nInsertBufLen = strlen(strInsertInto); - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); @@ -5321,13 +5358,16 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; uint64_t recOfBatch = 0; for (uint64_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; + getTableName(tableName, pThreadInfo, tableSeq); if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", @@ -5337,13 +5377,25 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } uint64_t oldRemainderLen = remainderBufLen; - int64_t generated = generateInterlaceDataBuffer( - tableName, batchPerTbl, i, batchPerTblTimes, - tableSeq, - pThreadInfo, pstr, - insertRows, - startTime, - &remainderBufLen); + + int64_t generated; + if (superTblInfo) { + generated = generateStbInterlaceData( + superTblInfo, + tableName, batchPerTbl, i, batchPerTblTimes, + tableSeq, + pThreadInfo, pstr, + insertRows, + startTime, + &remainderBufLen); + } else { + generated = generateInterlaceDataWithoutStb( + tableName, batchPerTbl, + tableSeq, + pThreadInfo->db_name, pstr, + insertRows, + &remainderBufLen); + } debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", pThreadInfo->threadID, __func__, __LINE__, generated); @@ -5364,8 +5416,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); - if (progOrInterlace == INTERLACE_INSERT_MODE) { - if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; generatedRecPerTbl += batchPerTbl; @@ -5383,7 +5434,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) break; - } } verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", @@ -5511,18 +5561,28 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { int64_t remainderBufLen = maxSqlLen; char *pstr = pThreadInfo->buffer; - int nInsertBufLen = strlen("insert into "); - int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - int64_t generated = generateProgressiveDataBuffer( + int64_t generated; + if (superTblInfo) { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, insertRows, + i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } else { + generated = generateProgressiveDataWithoutStb( tableName, tableSeq, pThreadInfo, pstr, insertRows, i, start_time, &(pThreadInfo->samplePos), &remainderBufLen); + } if (generated > 0) i += generated; else