未验证 提交 6a618dd2 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3976 taosdemo print insert perf per thread for develop (#6014)

* [TD-3976]<fix>: taosdemo print each insert performance data.

* [TD-3976]<fix>: taosdemo print insert performance per thread.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 6583d1d6
...@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { ...@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0; return 0;
} }
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, int k) static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
{ {
int affectedRows; int affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -4744,7 +4744,7 @@ static int64_t generateDataTail( ...@@ -4744,7 +4744,7 @@ static int64_t generateDataTail(
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
uint64_t k = 0; int64_t k = 0;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
...@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer( ...@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer(
return k; return k;
} }
static int generateProgressiveDataBuffer( static int64_t generateProgressiveDataBuffer(
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
...@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer( ...@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer(
return k; return k;
} }
static void printStatPerThread(threadInfo *pThreadInfo)
{
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
static void* syncWriteInterlace(threadInfo *pThreadInfo) { static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n", debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows; uint64_t insertRows;
int64_t interlaceRows; uint64_t interlaceRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert(pThreadInfo->ntables > 0); assert(pThreadInfo->ntables > 0);
int64_t batchPerTbl = interlaceRows; uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
int64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
g_args.num_of_RPR / interlaceRows; g_args.num_of_RPR / interlaceRows;
...@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
int64_t generatedRecPerTbl = 0; uint64_t generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
int64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into "; char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto); int nInsertBufLen = strlen(strInsertInto);
...@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int64_t recOfBatch = 0; uint64_t recOfBatch = 0;
for (int64_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
...@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTime, startTime,
&remainderBufLen); &remainderBufLen);
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) { if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %"PRId64"\n", errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace; goto free_of_interlace;
} else if (generated == 0) { } else if (generated == 0) {
break; break;
} }
...@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break; break;
} }
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRId64" totalInsertRows=%"PRId64"\n", verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows); pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n", verbosePrint("[%d] %s() LN%d, buffer=%s\n",
...@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", if (recOfBatch != affectedRows) {
pThreadInfo->threadID, errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
__func__, __LINE__, affectedRows);
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRId64", affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer); recOfBatch, affectedRows, buffer);
goto free_and_statistics_interlace; goto free_of_interlace;
} }
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
...@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
} }
free_and_statistics_interlace: free_of_interlace:
tmfree(buffer); tmfree(buffer);
printStatPerThread(pThreadInfo);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1); char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == buffer) {
errorPrint( "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen, maxSqlLen,
strerror(errno)); strerror(errno));
return NULL; return NULL;
} }
int64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
int64_t endTs; uint64_t endTs;
int64_t timeStampStep = int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
...@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0; pThreadInfo->samplePos = 0;
for (int64_t tableSeq = for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) { tableSeq ++) {
int64_t start_time = pThreadInfo->start_time; int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; uint64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (int64_t i = 0; i < insertRows;) { for (uint64_t i = 0; i < insertRows;) {
/* /*
if (insert_interval) { if (insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int generated = generateProgressiveDataBuffer( int64_t generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, pstr, insertRows, tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
&(pThreadInfo->samplePos), &(pThreadInfo->samplePos),
...@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
goto free_and_statistics_2; goto free_of_progressive;
start_time += generated * timeStampStep; start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated; pThreadInfo->totalInsertRows += generated;
...@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
if (affectedRows < 0) if (affectedRows < 0) {
goto free_and_statistics_2; errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
...@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} }
} // tableSeq } // tableSeq
free_and_statistics_2: free_of_progressive:
tmfree(buffer); tmfree(buffer);
printStatPerThread(pThreadInfo);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) { ...@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) {
// progressive mode // progressive mode
return syncWriteProgressive(pThreadInfo); return syncWriteProgressive(pThreadInfo);
} }
} }
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
...@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
int64_t totalDelay = 0; uint64_t totalDelay = 0;
int64_t maxDelay = 0; uint64_t maxDelay = 0;
int64_t minDelay = UINT64_MAX; uint64_t minDelay = UINT64_MAX;
int64_t cntDelay = 1; uint64_t cntDelay = 1;
double avgDelay = 0; double avgDelay = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
...@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos); taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n", debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__, __func__, __LINE__,
t_info->threadID, t_info->totalInsertRows, t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows); t_info->totalAffectedRows);
...@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t t = end - start; int64_t t = end - start;
if (superTblInfo) { if (superTblInfo) {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (double)superTblInfo->totalInsertRows / (t / 1000.0));
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (double)superTblInfo->totalInsertRows / (t / 1000.0));
}
} else { } else {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t / 1000.0, g_args.totalInsertRows, t / 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (double)g_args.totalInsertRows / (t / 1000.0));
fprintf(g_fpOfInsertResult, if (g_fpOfInsertResult) {
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t * 1000.0, g_args.totalInsertRows, t * 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (double)g_args.totalInsertRows / (t / 1000.0));
}
} }
printf("insert delay, avg: %10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n", fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay); avgDelay, maxDelay, minDelay);
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n", if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay); avgDelay, maxDelay, minDelay);
}
//taos_close(taos); //taos_close(taos);
...@@ -5973,7 +5990,8 @@ static int insertTestProcess() { ...@@ -5973,7 +5990,8 @@ static int insertTestProcess() {
return -1; return -1;
} }
printfInsertMetaToFile(g_fpOfInsertResult); if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n"); printf("Press enter key to continue\n\n");
...@@ -5984,7 +6002,8 @@ static int insertTestProcess() { ...@@ -5984,7 +6002,8 @@ static int insertTestProcess() {
// create database and super tables // create database and super tables
if(createDatabasesAndStables() != 0) { if(createDatabasesAndStables() != 0) {
fclose(g_fpOfInsertResult); if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1; return -1;
} }
...@@ -6000,11 +6019,13 @@ static int insertTestProcess() { ...@@ -6000,11 +6019,13 @@ static int insertTestProcess() {
end = taosGetTimestampMs(); end = taosGetTimestampMs();
if (g_totalChildTables > 0) { if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", fprintf(stderr, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
fprintf(g_fpOfInsertResult, if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n", "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
}
} }
taosMsleep(1000); taosMsleep(1000);
...@@ -6077,14 +6098,14 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6077,14 +6098,14 @@ static void *specifiedTableQuery(void *sarg) {
return NULL; return NULL;
} }
int64_t st = 0; uint64_t st = 0;
int64_t et = 0; uint64_t et = 0;
int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
int totalQueried = 0; uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
...@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) {
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n", debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n",
__func__, __LINE__, endTs, startTs); __func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %d, QPS: %10.6f\n", printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.6f\n",
pThreadInfo->threadID, pThreadInfo->threadID,
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
...@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) { ...@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) {
} }
} }
int64_t st = 0; uint64_t st = 0;
int64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval; uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
int queryTimes = g_queryInfo.superQueryInfo.queryTimes; uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
int totalQueried = 0; uint64_t totalQueried = 0;
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
int64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.superQueryInfo.queryInterval if (g_queryInfo.superQueryInfo.queryInterval
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) { && (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
...@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) { ...@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) {
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs(); int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently completed queries: %d, QPS: %10.3f\n", printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.3f\n",
pThreadInfo->threadID, pThreadInfo->threadID,
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
...@@ -6285,7 +6306,7 @@ static int queryTestProcess() { ...@@ -6285,7 +6306,7 @@ static int queryTestProcess() {
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
if ((nSqlCount > 0) && (nConcurrent > 0)) { if ((nSqlCount > 0) && (nConcurrent > 0)) {
...@@ -6345,16 +6366,16 @@ static int queryTestProcess() { ...@@ -6345,16 +6366,16 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
int ntables = g_queryInfo.superQueryInfo.childTblCount; uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt; int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads; uint64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int b = 0; uint64_t b = 0;
if (threads != 0) { if (threads != 0) {
b = ntables % threads; b = ntables % threads;
} }
...@@ -6396,12 +6417,12 @@ static int queryTestProcess() { ...@@ -6396,12 +6417,12 @@ static int queryTestProcess() {
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection; // taos_close(taos);// TODO: workaround to use separate taos connection;
int64_t endTs = taosGetTimestampMs(); uint64_t endTs = taosGetTimestampMs();
int totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried; g_queryInfo.superQueryInfo.totalQueried;
printf("==== completed total queries: %d, the QPS of all threads: %10.3f====\n", fprintf(stderr, "==== completed total queries: %"PRIu64", the QPS of all threads: %10.3f====\n",
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册