diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4e5a59550b23ea3d4213876eee20d459ae6f0f6e..083752ec7eb4bc0d5298b5129004890656d553ad 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -98,6 +98,8 @@ extern char configDir[]; #define MAX_DATABASE_COUNT 256 #define INPUT_BUF_LEN 256 +#define DEFAULT_TIMESTAMP_STEP 10 + typedef enum CREATE_SUB_TALBE_MOD_EN { PRE_CREATE_SUBTBL, AUTO_CREATE_SUBTBL, @@ -3307,7 +3309,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { if (timestampStep && timestampStep->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint; } else if (!timestampStep) { - g_Dbs.db[i].superTbls[j].timeStampStep = 1000; + g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP; } else { printf("ERROR: failed to read json, timestamp_step not found\n"); goto PARSE_OVER; @@ -4344,217 +4346,233 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) return affectedRows; } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ -static void* syncWrite(void *sarg) { - - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; +static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + int64_t startFrom, int64_t startTime, int *pSampleUsePos) +{ + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; int ncols_per_record = 1; // count first col ts - int samplePos = 0; - - if (superTblInfo) { - if (0 != prepareSampleData(superTblInfo)) - return NULL; - - if (superTblInfo->numberOfTblInOneSql > 0) { - syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); - tmfree(superTblInfo->sampleDataBuf); - return NULL; - } - } else { + if (superTblInfo == NULL) { int datatypeSeq = 0; while(g_args.datatype[datatypeSeq]) { datatypeSeq ++; ncols_per_record ++; } - - } - - char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); - if (NULL == buffer) { - fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", - superTblInfo->maxSqlLen, - strerror(errno)); - tmfree(superTblInfo->sampleDataBuf); - return NULL; - } - - int64_t lastPrintTime = taosGetTimestampMs(); - int64_t startTs = taosGetTimestampUs(); - int64_t endTs; - - int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = 0xffffffff; - - winfo->totalInsertRows = 0; - winfo->totalAffectedRows = 0; - - int sampleUsePos; - - if (superTblInfo && superTblInfo->childTblLimit ) { - // TODO } - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; - tID++) { - int64_t start_time = winfo->start_time; - - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - - for (int64_t i = 0; i < insertRows;) { - int64_t prepared = i; - - if (insert_interval) { - st = taosGetTimestampUs(); - } - - sampleUsePos = samplePos; - - memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); - char *pstr = buffer; - - if (superTblInfo) { + char *pstr = buffer; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { + } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, - tID % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - goto free_and_statistics_2; - } + threadID % superTblInfo->tagSampleCount); + } + if (NULL == tagsValBuf) { + fprintf(stderr, "tag buf failed to allocate memory\n"); + return -1; + } - pstr += snprintf(pstr, + pstr += snprintf(pstr, superTblInfo->maxSqlLen, "insert into %s.%s%d using %s.%s tags %s values", - winfo->db_name, + pThreadInfo->db_name, superTblInfo->childTblPrefix, - tID, - winfo->db_name, + threadID, + pThreadInfo->db_name, superTblInfo->sTblName, tagsValBuf); - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - pstr += snprintf(pstr, + tmfree(tagsValBuf); + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + pstr += snprintf(pstr, superTblInfo->maxSqlLen, "insert into %s.%s values", - winfo->db_name, - superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { - pstr += snprintf(pstr, + pThreadInfo->db_name, + superTblInfo->childTblName + threadID * TSDB_TABLE_NAME_LEN); + } else { + pstr += snprintf(pstr, (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), "insert into %s.%s%d values", - winfo->db_name, + pThreadInfo->db_name, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tID); - } - } else { - - pstr += snprintf(pstr, + threadID); + } + } else { + pstr += snprintf(pstr, (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), "insert into %s.%s%d values", - winfo->db_name, + pThreadInfo->db_name, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tID); - } - - int k; - int len = 0; + threadID); + } - verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { + int k; + int len = 0; - if (superTblInfo) { + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + for (k = 0; k < g_args.num_of_RPR;) { + if (superTblInfo) { int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, + startTime + superTblInfo->timeStampStep * startFrom, superTblInfo, - &sampleUsePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + pSampleUsePos); + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = start_time - rand() % superTblInfo->disorderRange; + int64_t d = startTime - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); - } else { + } else { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, + startTime + superTblInfo->timeStampStep * startFrom, superTblInfo); - } + } - if (retLen < 0) { - goto free_and_statistics_2; - } + if (retLen < 0) { + return -1; + } - len += retLen; - } - } else { - int rand_num = rand() % 100; + len += retLen; + } + } else { + int rand_num = rand() % 100; char data[MAX_DATA_SIZE]; char **data_type = g_args.datatype; int lenOfBinary = g_args.len_of_binary; - if ((g_args.disorderRatio != 0) + if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) { - int64_t d = start_time - rand() % 1000000 + rand_num; - len = generateData(data, data_type, + int64_t d = startTime - rand() % 1000000 + rand_num; + len = generateData(data, data_type, ncols_per_record, d, lenOfBinary); - } else { + } else { len = generateData(data, data_type, - ncols_per_record, start_time += 1000, lenOfBinary); - } + ncols_per_record, + startTime + DEFAULT_TIMESTAMP_STEP * startFrom, + lenOfBinary); + } - //assert(len + pstr - buffer < BUFFER_SIZE); - if (len + pstr - buffer >= g_args.max_sql_len) { // too long + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= g_args.max_sql_len) { // too long break; - } + } - pstr += sprintf(pstr, " %s", data); - } + pstr += sprintf(pstr, " %s", data); + } + + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); + + k++; + startFrom ++; - verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); + debugPrint("%s() LN%d k=%d startFrom=%ld insertRows=%ld\n", __func__, __LINE__, k, startFrom, insertRows); + if (startFrom >= insertRows) + break; + } - prepared ++; - k++; - i++; + return k; +} - if (prepared >= insertRows) - break; +// sync insertion +/* + 1 thread: 100 tables * 2000 rows/s + 1 thread: 10 tables * 20000 rows/s + 6 thread: 300 tables * 2000 rows/s + + 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s +*/ +static void* syncWrite(void *sarg) { + + threadInfo *winfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = winfo->superTblInfo; + + + if (superTblInfo) { + if (0 != prepareSampleData(superTblInfo)) + return NULL; + + if (superTblInfo->numberOfTblInOneSql > 0) { + syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); + tmfree(superTblInfo->sampleDataBuf); + return NULL; + } + } + + int samplePos = 0; + + char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); + if (NULL == buffer) { + fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", + superTblInfo->maxSqlLen, + strerror(errno)); + tmfree(superTblInfo->sampleDataBuf); + return NULL; + } + + int64_t lastPrintTime = taosGetTimestampMs(); + int64_t startTs = taosGetTimestampUs(); + int64_t endTs; + + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + uint64_t st = 0; + uint64_t et = 0xffffffff; + + winfo->totalInsertRows = 0; + winfo->totalAffectedRows = 0; + + int sampleUsePos; + + if (superTblInfo && superTblInfo->childTblLimit ) { + // TODO + } + + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + tID++) { + int64_t start_time = winfo->start_time; + + int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); + + for (int64_t i = 0; i < insertRows;) { + if (insert_interval) { + st = taosGetTimestampUs(); } - - int affectedRows = execInsert(winfo, buffer, k); + + sampleUsePos = samplePos; + + int generated = generateDataBuffer(tID, winfo, buffer, insertRows, + i, start_time, &sampleUsePos); + if (generated > 0) + i += generated; + else + goto free_and_statistics_2; + + int affectedRows = execInsert(winfo, buffer, generated); if (affectedRows < 0) goto free_and_statistics_2; - winfo->totalInsertRows += k; + winfo->totalInsertRows += generated; winfo->totalAffectedRows += affectedRows; endTs = taosGetTimestampUs(); @@ -4573,7 +4591,7 @@ static void* syncWrite(void *sarg) { lastPrintTime = currentPrintTime; } - if (prepared >= insertRows) + if (i >= insertRows) break; if (insert_interval) { @@ -4581,7 +4599,7 @@ static void* syncWrite(void *sarg) { if (insert_interval > ((et - st)/1000) ) { int sleep_time = insert_interval - (et -st)/1000; - printf("sleep: %d ms for insert interval\n", sleep_time); + verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time); taosMsleep(sleep_time); // ms } } @@ -5675,7 +5693,7 @@ void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;