diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4f3b17c37f77b115ba23d7b3e30f7f2f98b2a9d8..05bfb25026aa9fd4a0f537587cb7c4749864532e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -5977,12 +5977,85 @@ static int64_t generateData(char *recBuf, char **data_type, return (int32_t)strlen(recBuf); } +static int generateSampleMemoryFromRand(SSuperTable *stbInfo) +{ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *buff = malloc(stbInfo->lenOfOneRow); + if (NULL == buff) { + errorPrint2("%s() LN%d, memory allocation %"PRId64" bytes failed\n", + __func__, __LINE__, stbInfo->lenOfOneRow); + exit(EXIT_FAILURE); + } + + for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { + uint64_t pos = 0; + memset(buff, 0, stbInfo->lenOfOneRow); + + for (int c = 0; c < stbInfo->columnCount; c++) { + char *tmp; + if (0 == strncasecmp(stbInfo->columns[c].dataType, + "BINARY", strlen("BINARY"))) { + rand_string(data, stbInfo->columns[c].dataLen); + pos += sprintf(buff + pos, "%s,", data); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "NCHAR", strlen("NCHAR"))) { + rand_string(data, stbInfo->columns[c].dataLen); + pos += sprintf(buff + pos, "%s,", data); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "INT", strlen("INT"))) { + if ((g_args.demo_mode) && (c == 1)) { + tmp = demo_voltage_int_str(); + } else { + tmp = rand_int_str(); + } + pos += sprintf(buff + pos, "%s,", tmp); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "BIGINT", strlen("BIGINT"))) { + pos += sprintf(buff + pos, "%s,", rand_bigint_str()); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "FLOAT", strlen("FLOAT"))) { + if (g_args.demo_mode) { + if (c == 0) { + tmp = demo_current_float_str(); + } else { + tmp = demo_phase_float_str(); + } + } else { + tmp = rand_float_str(); + } + pos += sprintf(buff + pos, "%s,", tmp); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "DOUBLE", strlen("DOUBLE"))) { + pos += sprintf(buff + pos, "%s,", rand_double_str()); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "SMALLINT", strlen("SMALLINT"))) { + pos += sprintf(buff + pos, "%s,", rand_smallint_str()); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "TINYINT", strlen("TINYINT"))) { + pos += sprintf(buff + pos, "%s,", rand_tinyint_str()); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "BOOL", strlen("BOOL"))) { + pos += sprintf(buff + pos, "%s,", rand_bool_str()); + } else if (0 == strncasecmp(stbInfo->columns[c].dataType, + "TIMESTAMP", strlen("TIMESTAMP"))) { + pos += sprintf(buff + pos, "%s,", rand_bigint_str()); + } + } + *(buff + pos - 1) = 0; + memcpy(stbInfo->sampleDataBuf + i * stbInfo->lenOfOneRow, buff, pos); + } + + free(buff); + return 0; +} + static int prepareSampleDataForSTable(SSuperTable *stbInfo) { - char* sampleDataBuf = NULL; - sampleDataBuf = calloc( + stbInfo->sampleDataBuf = calloc( stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); - if (sampleDataBuf == NULL) { + if (NULL == stbInfo->sampleDataBuf) { errorPrint2("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", __func__, __LINE__, stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, @@ -5990,13 +6063,16 @@ static int prepareSampleDataForSTable(SSuperTable *stbInfo) { return -1; } - stbInfo->sampleDataBuf = sampleDataBuf; - int ret = readSampleFromCsvFileToMem(stbInfo); + int ret; + if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) + ret = readSampleFromCsvFileToMem(stbInfo); + else + ret = generateSampleMemoryFromRand(stbInfo); if (0 != ret) { errorPrint2("%s() LN%d, read sample from csv file failed.\n", __func__, __LINE__); - tmfree(sampleDataBuf); + tmfree(stbInfo->sampleDataBuf); stbInfo->sampleDataBuf = NULL; return -1; } @@ -7714,11 +7790,14 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pstr += len; remainderBufLen -= len; + // measure prepare + insert + startTs = taosGetTimestampUs(); + int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { if (sourceRand) { - generated = prepareStbStmtRand( +/* generated = prepareStbStmtRand( pThreadInfo, tableName, tableSeq, @@ -7726,6 +7805,14 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { insertRows, i, start_time ); + */ + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); } else { generated = prepareStbStmtWithSample( pThreadInfo, @@ -7770,7 +7857,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { start_time += generated * timeStampStep; pThreadInfo->totalInsertRows += generated; - startTs = taosGetTimestampUs(); + // only measure insert + // startTs = taosGetTimestampUs(); int32_t affectedRows = execInsert(pThreadInfo, generated); @@ -7980,7 +8068,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) return -1; } - for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); if (bindArray == NULL) { @@ -7989,7 +8076,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) return -1; } - TAOS_BIND *bind; int cursor = 0; @@ -8078,11 +8164,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, debugPrint("%s() LN%d, start_time= %"PRId64"\n", __func__, __LINE__, start_time); - int64_t start = taosGetTimestampMs(); - // read sample data from file first - if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource, - "sample", strlen("sample")))) { + if (stbInfo) { if (0 != prepareSampleDataForSTable(stbInfo)) { errorPrint2("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); @@ -8230,8 +8313,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); - if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource, - "sample", strlen("sample")))) { + if (stbInfo) { parseSampleFileToStmt(stbInfo, timePrec); } } @@ -8316,6 +8398,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, free(stmtBuffer); + int64_t start = taosGetTimestampMs(); + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } @@ -8358,22 +8442,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (cntDelay == 0) cntDelay = 1; avgDelay = (double)totalDelay / cntDelay; - int64_t end = taosGetTimestampMs(); + int64_t end = taosGetTimestampMs(); int64_t t = end - start; - double tInMs = t/1000.0; + double tInMs = (double) t / 1000.0; if (stbInfo) { - fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", + fprintf(stderr, "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", tInMs, stbInfo->totalInsertRows, stbInfo->totalAffectedRows, threads, db_name, stbInfo->sTblName, - (tInMs)? + (double) tInMs? (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX); if (g_fpOfInsertResult) { fprintf(g_fpOfInsertResult, - "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", + "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", tInMs, stbInfo->totalInsertRows, stbInfo->totalAffectedRows, threads, db_name, stbInfo->sTblName, @@ -8381,7 +8465,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX); } } else { - fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", + fprintf(stderr, "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", tInMs, g_args.totalInsertRows, g_args.totalAffectedRows, threads, db_name, @@ -8389,7 +8473,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, (double)(g_args.totalInsertRows/tInMs):FLT_MAX); if (g_fpOfInsertResult) { fprintf(g_fpOfInsertResult, - "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", + "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", tInMs, g_args.totalInsertRows, g_args.totalAffectedRows, threads, db_name,