diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 068aa522834e1b245d8d1ce080fc9e342e825229..7a444c839d0351e641f03b1b92941d5e2efb4087 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -70,6 +70,8 @@ enum TEST_MODE { #define MAX_RECORDS_PER_REQ 32766 +#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. + #define MAX_SQL_SIZE 65536 #define BUFFER_SIZE (65536*2) #define COND_BUF_LEN BUFFER_SIZE - 30 @@ -1079,8 +1081,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } static bool getInfoFromJsonFile(char* file); -//static int generateOneRowDataForStb(SSuperTable* stbInfo); -//static int getDataIntoMemForStb(SSuperTable* stbInfo); static void init_rand_data(); static void tmfclose(FILE *fp) { if (NULL != fp) { @@ -1180,7 +1180,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { totalLen += len; } - verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile); + verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", + __func__, __LINE__, databuf, resultFile); appendResultBufToFile(databuf, resultFile); free(databuf); } @@ -1640,8 +1641,10 @@ static void printfInsertMetaToFile(FILE* fp) { */ fprintf(fp, " interlaceRows: %"PRIu64"\n", g_Dbs.db[i].superTbls[j].interlaceRows); - fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); - fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); + fprintf(fp, " disorderRange: %d\n", + g_Dbs.db[i].superTbls[j].disorderRange); + fprintf(fp, " disorderRatio: %d\n", + g_Dbs.db[i].superTbls[j].disorderRatio); fprintf(fp, " maxSqlLen: %"PRIu64"\n", g_Dbs.db[i].superTbls[j].maxSqlLen); @@ -1649,11 +1652,15 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].timeStampStep); fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp); - fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat); - fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile); - fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile); + fprintf(fp, " sampleFormat: %s\n", + g_Dbs.db[i].superTbls[j].sampleFormat); + fprintf(fp, " sampleFile: %s\n", + g_Dbs.db[i].superTbls[j].sampleFile); + fprintf(fp, " tagsFile: %s\n", + g_Dbs.db[i].superTbls[j].tagsFile); - fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount); + fprintf(fp, " columnCount: %d\n ", + g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); if ((0 == strncasecmp( @@ -1665,7 +1672,8 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { - fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); + fprintf(fp, "column[%d]:%s ", + k, g_Dbs.db[i].superTbls[j].columns[k].dataType); } } fprintf(fp, "\n"); @@ -4637,16 +4645,22 @@ static int getRowDataFromSample( return dataLen; } -static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) { +static int64_t generateStbRowData( + SSuperTable* stbInfo, + char* recBuf, int64_t timestamp + ) { int64_t dataLen = 0; char *pstr = recBuf; int64_t maxLen = MAX_DATA_SIZE; - dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, + "(%" PRId64 ",", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY"))) - || (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, + "BINARY", strlen("BINARY"))) + || (0 == strncasecmp(stbInfo->columns[i].dataType, + "NCHAR", strlen("NCHAR")))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); @@ -4708,7 +4722,7 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb } static int64_t generateData(char *recBuf, char **data_type, - int num_of_cols, int64_t timestamp, int lenOfBinary) { + int64_t timestamp, int lenOfBinary) { memset(recBuf, 0, MAX_DATA_SIZE); char *pstr = recBuf; pstr += sprintf(pstr, "(%" PRId64, timestamp); @@ -4859,100 +4873,119 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t table } } -static int64_t generateDataTail( - SSuperTable* superTblInfo, - uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { - uint64_t len = 0; - uint32_t ncols_per_record = 1; // count first col ts +static int64_t generateDataTailWithoutStb( + uint64_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t startFrom, int64_t startTime, + /* int64_t *pSamplePos, */int64_t *dataLen) { + uint64_t len = 0; char *pstr = buffer; - if (superTblInfo == NULL) { - uint32_t datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; - } - } - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); - uint64_t k = 0; + int64_t k = 0; for (k = 0; k < batch;) { char data[MAX_DATA_SIZE]; memset(data, 0, MAX_DATA_SIZE); int64_t retLen = 0; - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", strlen("sample"))) { - retLen = getRowDataFromSample( - data, - remainderBufLen, - startTime + superTblInfo->timeStampStep * k, - superTblInfo, - pSamplePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, - "rand", strlen("rand"))) { + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; - int64_t randTail = superTblInfo->timeStampStep * k; - if (superTblInfo->disorderRatio > 0) { - int rand_num = taosRandom() % 100; - if(rand_num < superTblInfo->disorderRatio) { - randTail = (randTail + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %"PRId64"\n", randTail); - } + int64_t randTail = DEFAULT_TIMESTAMP_STEP * k; + + if (g_args.disorderRatio != 0) { + int rand_num = taosRandom() % 100; + if (rand_num < g_args.disorderRatio) { + randTail = (randTail + + (taosRandom() % g_args.disorderRange + 1)) * (-1); + + debugPrint("rand data generated, back %"PRId64"\n", randTail); } + } else { + randTail = DEFAULT_TIMESTAMP_STEP * k; + } - int64_t d = startTime - + randTail; - retLen = generateRowData( - data, - d, - superTblInfo); - } + retLen = generateData(data, data_type, + startTime + randTail, + lenOfBinary); - if (retLen > remainderBufLen) { + if (len > remainderBufLen) break; - } - pstr += snprintf(pstr , retLen + 1, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; - } else { - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; + pstr += sprintf(pstr, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; - int64_t randTail = DEFAULT_TIMESTAMP_STEP * k; + verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); - if (g_args.disorderRatio != 0) { - int rand_num = taosRandom() % 100; - if (rand_num < g_args.disorderRatio) { - randTail = (randTail + (taosRandom() % g_args.disorderRange + 1)) * (-1); + startFrom ++; - debugPrint("rand data generated, back %"PRId64"\n", randTail); + if (startFrom >= insertRows) { + break; + } + } + + *dataLen = len; + return k; +} + +static int64_t generateStbDataTail( + SSuperTable* superTblInfo, + uint64_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t startFrom, int64_t startTime, + int64_t *pSamplePos, int64_t *dataLen) { + uint64_t len = 0; + + char *pstr = buffer; + + verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); + + int64_t k = 0; + for (k = 0; k < batch;) { + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + int64_t retLen = 0; + + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample"))) { + retLen = getRowDataFromSample( + data, + remainderBufLen, + startTime + superTblInfo->timeStampStep * k, + superTblInfo, + pSamplePos); + } else if (0 == strncasecmp(superTblInfo->dataSource, + "rand", strlen("rand"))) { + int64_t randTail = superTblInfo->timeStampStep * k; + if (superTblInfo->disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < superTblInfo->disorderRatio) { + randTail = (randTail + + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %"PRId64"\n", randTail); + } } - } else { - randTail = DEFAULT_TIMESTAMP_STEP * k; - } - retLen = generateData(data, data_type, - ncols_per_record, - startTime + randTail, - lenOfBinary); + int64_t d = startTime + randTail; + retLen = generateStbRowData(superTblInfo, data, d); + } - if (len > remainderBufLen) + if (retLen > remainderBufLen) { break; - - pstr += sprintf(pstr, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; } + pstr += snprintf(pstr , retLen + 1, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); @@ -4967,17 +5000,41 @@ static int64_t generateDataTail( return k; } -static int generateSQLHead(char *tableName, int32_t tableSeq, - threadInfo* pThreadInfo, SSuperTable* superTblInfo, + +static int generateSQLHeadWithoutStb(char *tableName, + char *dbName, char *buffer, int remainderBufLen) { int len; -#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. char headBuf[HEAD_BUFF_LEN]; - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); + + if (len > remainderBufLen) + return -1; + + tstrncpy(buffer, headBuf, len + 1); + + return len; +} + +static int generateStbSQLHead( + SSuperTable* superTblInfo, + char *tableName, int32_t tableSeq, + char *dbName, + char *buffer, int remainderBufLen) +{ + int len; + + char headBuf[HEAD_BUFF_LEN]; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo, tableSeq); @@ -4996,9 +5053,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s using %s.%s tags %s values", - pThreadInfo->db_name, + dbName, tableName, - pThreadInfo->db_name, + dbName, superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); @@ -5007,22 +5064,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } else { len = snprintf( headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, - tableName); - } - } else { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } @@ -5046,8 +5095,10 @@ static int64_t generateInterlaceDataBuffer( char *pstr = buffer; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, - superTblInfo, pstr, *pRemainderBufLen); + int headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, + pstr, *pRemainderBufLen); if (headLen <= 0) { return 0; @@ -5065,20 +5116,25 @@ static int64_t generateInterlaceDataBuffer( pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); + int64_t k; if (superTblInfo) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); } + + k = generateStbDataTail( + superTblInfo, + batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &(pThreadInfo->samplePos), &dataLen); } else { - startTime = 1500000000000; + startTime = 1500000000000; + k = generateDataTailWithoutStb( + batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + /* &(pThreadInfo->samplePos), */&dataLen); } - int64_t k = generateDataTail( - superTblInfo, - batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, - startTime, - &(pThreadInfo->samplePos), &dataLen); - if (k == batchPerTbl) { pstr += dataLen; *pRemainderBufLen -= dataLen; @@ -5103,25 +5159,23 @@ static int64_t generateProgressiveDataBuffer( { SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int ncols_per_record = 1; // count first col ts - - if (superTblInfo == NULL) { - int datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; - } - } - assert(buffer != NULL); char *pstr = buffer; - int64_t k = 0; - memset(buffer, 0, *pRemainderBufLen); - int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, + int64_t headLen; + + if (superTblInfo) { + headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, + buffer, *pRemainderBufLen); + } else { + headLen = generateSQLHeadWithoutStb( + tableName, pThreadInfo->db_name, buffer, *pRemainderBufLen); + } if (headLen <= 0) { return 0; @@ -5130,10 +5184,20 @@ static int64_t generateProgressiveDataBuffer( *pRemainderBufLen -= headLen; int64_t dataLen; - k = generateDataTail(superTblInfo, - g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, + int64_t k; + + if (superTblInfo) { + k = generateStbDataTail(superTblInfo, + g_args.num_of_RPR, pstr, *pRemainderBufLen, + insertRows, startFrom, startTime, pSamplePos, &dataLen); + } else { + k = generateDataTailWithoutStb( + g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, + startTime, + /*pSamplePos, */&dataLen); + } return k; } @@ -5147,6 +5211,7 @@ static void printStatPerThread(threadInfo *pThreadInfo) (double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); } +// sync write interlace data static void* syncWriteInterlace(threadInfo *pThreadInfo) { debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, __func__, __LINE__); @@ -5197,7 +5262,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t nTimeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; uint64_t insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; @@ -5211,7 +5277,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t tableSeq = pThreadInfo->start_table_from; debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, pThreadInfo->ntables, insertRows); int64_t startTime = pThreadInfo->start_time; @@ -5384,19 +5451,18 @@ free_of_interlace: return NULL; } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ +// sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int64_t timeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t insertRows = + (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); pThreadInfo->buffer = calloc(maxSqlLen, 1); if (NULL == pThreadInfo->buffer) { @@ -5410,8 +5476,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { uint64_t startTs = taosGetTimestampMs(); uint64_t endTs; - int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; /* int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; @@ -5423,21 +5487,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->samplePos = 0; - for (uint64_t tableSeq = - pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; - tableSeq ++) { + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { int64_t start_time = pThreadInfo->start_time; - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - for (uint64_t i = 0; i < insertRows;) { - /* - if (insert_interval) { - st = taosGetTimestampMs(); - } - */ - char tableName[TSDB_TABLE_NAME_LEN]; getTableName(tableName, pThreadInfo, tableSeq); verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", @@ -5502,27 +5557,14 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { if (i >= insertRows) break; -/* - if (insert_interval) { - et = taosGetTimestampMs(); - - if (insert_interval > ((et - st)) ) { - int sleep_time = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", - __func__, __LINE__, sleep_time); - taosMsleep(sleep_time); // ms - } - } - */ } // num_of_DPT - if (g_args.verbose_print) { - if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { verbosePrint("%s() LN%d samplePos=%"PRId64"\n", __func__, __LINE__, pThreadInfo->samplePos); - } } } // tableSeq @@ -5557,7 +5599,6 @@ static void* syncWrite(void *sarg) { // progressive mode return syncWriteProgressive(pThreadInfo); } - } static void callBack(void *param, TAOS_RES *res, int code) { @@ -5595,10 +5636,12 @@ static void callBack(void *param, TAOS_RES *res, int code) { int rand_num = taosRandom() % 100; if (0 != pThreadInfo->superTblInfo->disorderRatio && rand_num < pThreadInfo->superTblInfo->disorderRatio) { - int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - generateRowData(data, d, pThreadInfo->superTblInfo); + int64_t d = pThreadInfo->lastTs + - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); + generateStbRowData(pThreadInfo->superTblInfo, data, d); } else { - generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, + data, pThreadInfo->lastTs += 1000); } pstr += sprintf(pstr, "%s", data); pThreadInfo->counter++;