未验证 提交 f8500b04 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3976 taosdemo print insert perf per batch (#6013)

* [TD-3976]<fix>: taosdemo print each insert performance data.

* [TD-3976]<fix>: taosdemo print insert performance per thread.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 cef35140
......@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0;
}
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, int k)
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
{
int affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
......@@ -4744,7 +4744,7 @@ static int64_t generateDataTail(
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
uint64_t k = 0;
int64_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
......@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer(
return k;
}
static int generateProgressiveDataBuffer(
static int64_t generateProgressiveDataBuffer(
char *tableName,
int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
......@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer(
return k;
}
static void printStatPerThread(threadInfo *pThreadInfo)
{
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows;
int64_t interlaceRows;
uint64_t insertRows;
uint64_t interlaceRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
......@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert(pThreadInfo->ntables > 0);
int64_t batchPerTbl = interlaceRows;
uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
int64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
......@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTblTimes = 1;
}
int64_t generatedRecPerTbl = 0;
uint64_t generatedRecPerTbl = 0;
bool flagSleep = true;
int64_t sleepTimeTotal = 0;
uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
......@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr += len;
remainderBufLen -= len;
int64_t recOfBatch = 0;
uint64_t recOfBatch = 0;
for (int64_t i = 0; i < batchPerTblTimes; i ++) {
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
......@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTime,
&remainderBufLen);
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %"PRId64"\n",
errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace;
goto free_of_interlace;
} else if (generated == 0) {
break;
}
......@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break;
}
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRId64" totalInsertRows=%"PRId64"\n",
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
......@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRId64", affected rows: %"PRId64"\n%s\n",
if (recOfBatch != affectedRows) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer);
goto free_and_statistics_interlace;
goto free_of_interlace;
}
pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
......@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
}
free_and_statistics_interlace:
free_of_interlace:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
printStatPerThread(pThreadInfo);
return NULL;
}
......@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen,
strerror(errno));
return NULL;
}
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampMs();
int64_t endTs;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
......@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0;
for (int64_t tableSeq =
for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
uint64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (int64_t i = 0; i < insertRows;) {
for (uint64_t i = 0; i < insertRows;) {
/*
if (insert_interval) {
st = taosGetTimestampMs();
......@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pstr += len;
remainderBufLen -= len;
int generated = generateProgressiveDataBuffer(
int64_t generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time,
&(pThreadInfo->samplePos),
......@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (generated > 0)
i += generated;
else
goto free_and_statistics_2;
goto free_of_progressive;
start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated;
......@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs;
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (affectedRows < 0)
goto free_and_statistics_2;
if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
pThreadInfo->totalAffectedRows += affectedRows;
......@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
}
} // tableSeq
free_and_statistics_2:
free_of_progressive:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
printStatPerThread(pThreadInfo);
return NULL;
}
......@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) {
// progressive mode
return syncWriteProgressive(pThreadInfo);
}
}
static void callBack(void *param, TAOS_RES *res, int code) {
......@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pthread_join(pids[i], NULL);
}
int64_t totalDelay = 0;
int64_t maxDelay = 0;
int64_t minDelay = UINT64_MAX;
int64_t cntDelay = 1;
uint64_t totalDelay = 0;
uint64_t maxDelay = 0;
uint64_t minDelay = UINT64_MAX;
uint64_t cntDelay = 1;
double avgDelay = 0;
for (int i = 0; i < threads; i++) {
......@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n",
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows);
......@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t t = end - start;
if (superTblInfo) {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0));
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0));
}
} else {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n",
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t / 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0));
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n",
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t * 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0));
}
}
printf("insert delay, avg: %10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n",
fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n",
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
}
//taos_close(taos);
......@@ -5973,7 +5990,8 @@ static int insertTestProcess() {
return -1;
}
printfInsertMetaToFile(g_fpOfInsertResult);
if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
......@@ -5984,7 +6002,8 @@ static int insertTestProcess() {
// create database and super tables
if(createDatabasesAndStables() != 0) {
fclose(g_fpOfInsertResult);
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1;
}
......@@ -6000,11 +6019,13 @@ static int insertTestProcess() {
end = taosGetTimestampMs();
if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
fprintf(stderr, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
fprintf(g_fpOfInsertResult,
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
}
}
taosMsleep(1000);
......@@ -6077,14 +6098,14 @@ static void *specifiedTableQuery(void *sarg) {
return NULL;
}
int64_t st = 0;
int64_t et = 0;
uint64_t st = 0;
uint64_t et = 0;
int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
int totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
......@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) {
if (currentPrintTime - lastPrintTime > 30*1000) {
debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n",
__func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %d, QPS: %10.6f\n",
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.6f\n",
pThreadInfo->threadID,
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
......@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) {
}
}
int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
uint64_t st = 0;
uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
int queryTimes = g_queryInfo.superQueryInfo.queryTimes;
int totalQueried = 0;
int64_t startTs = taosGetTimestampMs();
uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t startTs = taosGetTimestampMs();
int64_t lastPrintTime = taosGetTimestampMs();
uint64_t lastPrintTime = taosGetTimestampMs();
while(queryTimes --) {
if (g_queryInfo.superQueryInfo.queryInterval
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
......@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) {
int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently completed queries: %d, QPS: %10.3f\n",
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.3f\n",
pThreadInfo->threadID,
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
......@@ -6285,7 +6306,7 @@ static int queryTestProcess() {
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
int64_t startTs = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if ((nSqlCount > 0) && (nConcurrent > 0)) {
......@@ -6345,16 +6366,16 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n");
}
int ntables = g_queryInfo.superQueryInfo.childTblCount;
uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads;
uint64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
uint64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
......@@ -6396,12 +6417,12 @@ static int queryTestProcess() {
tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection;
int64_t endTs = taosGetTimestampMs();
uint64_t endTs = taosGetTimestampMs();
int totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried;
printf("==== completed total queries: %d, the QPS of all threads: %10.3f====\n",
fprintf(stderr, "==== completed total queries: %"PRIu64", the QPS of all threads: %10.3f====\n",
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册