diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 014acb5fbf7ed5425b8086d2a9a07ea9724dec49..4e5a59550b23ea3d4213876eee20d459ae6f0f6e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -262,7 +262,7 @@ typedef struct SSuperTable_S { int tagUsePos; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; } SSuperTable; @@ -332,7 +332,7 @@ typedef struct SDbs_S { SDataBase db[MAX_DB_COUNT]; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; } SDbs; @@ -403,7 +403,7 @@ typedef struct SThreadInfo_S { int64_t lastTs; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; // insert delay statistics @@ -3986,8 +3986,6 @@ static void syncWriteForNumberOfTblInOneSql( int samplePos = 0; //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); - int64_t totalRowsInserted = 0; - int64_t totalAffectedRows = 0; int64_t lastPrintTime = taosGetTimestampMs(); char* buffer = calloc(superTblInfo->maxSqlLen+1, 1); @@ -4128,7 +4126,7 @@ static void syncWriteForNumberOfTblInOneSql( len += retLen; //inserted++; j++; - totalRowsInserted++; + winfo->totalInsertRows++; if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { @@ -4185,7 +4183,7 @@ send_to_server: if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } @@ -4223,9 +4221,7 @@ send_to_server: free_and_statistics: tmfree(buffer); - winfo->totalRowsInserted = totalRowsInserted; - winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows); + printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); return; } @@ -4315,6 +4311,39 @@ static int prepareSampleData(SSuperTable *superTblInfo) { return 0; } +static int execInsert(threadInfo *winfo, char *buffer, int k) +{ + int affectedRows; + SSuperTable* superTblInfo = winfo->superTblInfo; + + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + + } else { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); + + if (0 != retCode) { + affectedRows = -1; + printf("========restful return fail, threadID[%d]\n", winfo->threadID); + } else { + affectedRows = k; + } + } + } else { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + affectedRows = queryDbExec(winfo->taos, buffer, 1); + } + + if (0 > affectedRows){ + return affectedRows; + } + + return affectedRows; +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -4324,7 +4353,6 @@ static int prepareSampleData(SSuperTable *superTblInfo) { 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s */ static void* syncWrite(void *sarg) { - uint64_t lastPrintTime = taosGetTimestampMs(); threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; @@ -4360,11 +4388,15 @@ static void* syncWrite(void *sarg) { return NULL; } + int64_t lastPrintTime = taosGetTimestampMs(); + int64_t startTs = taosGetTimestampUs(); + int64_t endTs; + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; uint64_t et = 0xffffffff; - winfo->totalRowsInserted = 0; + winfo->totalInsertRows = 0; winfo->totalAffectedRows = 0; int sampleUsePos; @@ -4383,12 +4415,12 @@ static void* syncWrite(void *sarg) { for (int64_t i = 0; i < insertRows;) { int64_t prepared = i; - sampleUsePos = samplePos; - if (insert_interval) { - st = taosGetTimestampUs(); + st = taosGetTimestampUs(); } + sampleUsePos = samplePos; + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; @@ -4518,35 +4550,12 @@ static void* syncWrite(void *sarg) { break; } - winfo->totalRowsInserted += k; - - int64_t startTs = taosGetTimestampUs(); - int64_t endTs; - int affectedRows; - - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); - - if (0 > affectedRows){ - goto free_and_statistics_2; - } - } else { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); + int affectedRows = execInsert(winfo, buffer, k); + if (affectedRows < 0) + goto free_and_statistics_2; - if (0 != retCode) { - printf("========restful return fail, threadID[%d]\n", winfo->threadID); - goto free_and_statistics_2; - } - - affectedRows = k; - } - } else { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - affectedRows = queryDbExec(winfo->taos, buffer, 1); - } + winfo->totalInsertRows += k; + winfo->totalAffectedRows += affectedRows; endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4555,13 +4564,11 @@ static void* syncWrite(void *sarg) { winfo->cntDelay++; winfo->totalDelay += delay; - winfo->totalAffectedRows += affectedRows; - int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } @@ -4572,14 +4579,12 @@ static void* syncWrite(void *sarg) { if (insert_interval) { et = taosGetTimestampUs(); - printf("et: %ld ms st: %ld\n", et, st); if (insert_interval > ((et - st)/1000) ) { int sleep_time = insert_interval - (et -st)/1000; printf("sleep: %d ms for insert interval\n", sleep_time); taosMsleep(sleep_time); // ms } } - } // num_of_DPT if ((tID == winfo->end_table_id) && superTblInfo && @@ -4596,7 +4601,7 @@ free_and_statistics_2: printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); return NULL; } @@ -4806,7 +4811,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; - superTblInfo->totalRowsInserted += t_info->totalRowsInserted; + superTblInfo->totalInsertRows += t_info->totalInsertRows; } totalDelay += t_info->totalDelay; @@ -4824,16 +4829,16 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (superTblInfo) { printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, + t, superTblInfo->totalInsertRows, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, - superTblInfo->totalRowsInserted / t); + superTblInfo->totalInsertRows / t); fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, + t, superTblInfo->totalInsertRows, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, - superTblInfo->totalRowsInserted / t); + superTblInfo->totalInsertRows/ t); } printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", @@ -5067,14 +5072,14 @@ static int insertTestProcess() { } //end = getCurrentTime(); - //int64_t totalRowsInserted = 0; + //int64_t totalInsertRows = 0; //int64_t totalAffectedRows = 0; //for (int i = 0; i < g_Dbs.dbCount; i++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - // totalRowsInserted += g_Dbs.db[i].superTbls[j].totalRowsInserted; + // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; //} - //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount); + //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount); postFreeResource(); return 0;