diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index a94a4f73abf1df85093e309290193b0edb03570c..014acb5fbf7ed5425b8086d2a9a07ea9724dec49 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -232,7 +232,8 @@ typedef struct SSuperTable_S { int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision int maxSqlLen; // - + + int insertInterval; // insert interval, will override global insert interval int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // @@ -478,8 +479,8 @@ char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; SArguments g_args = { - NULL, // metaFile - 0, // test_mode + NULL, // metaFile + 0, // test_mode "127.0.0.1", // host 6030, // port "root", // user @@ -743,7 +744,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { } } - if (arguments->debug_print || arguments->verbose_print) { + if (((arguments->debug_print) && (arguments->metaFile == NULL)) + || arguments->verbose_print) { printf("###################################################################\n"); printf("# meta file: %s\n", arguments->metaFile); printf("# Server IP: %s:%hu\n", @@ -1263,6 +1265,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); + fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -2913,13 +2916,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval"); - if (insertInterval && insertInterval->type == cJSON_Number) { - g_args.insert_interval = insertInterval->valueint; - } else if (!insertInterval) { + cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval"); + if (gInsertInterval && gInsertInterval->type == cJSON_Number) { + g_args.insert_interval = gInsertInterval->valueint; + } else if (!gInsertInterval) { g_args.insert_interval = 0; } else { - printf("ERROR: failed to read json, insert_interval not found\n"); + fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n"); goto PARSE_OVER; } @@ -3438,13 +3441,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; - //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; - //} } else if (!insertRows) { g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; } else { - printf("ERROR: failed to read json, insert_rows not found\n"); + fprintf(stderr, "failed to read json, insert_rows input mistake"); + goto PARSE_OVER; + } + + cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval"); + if (insertInterval && insertInterval->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint; + } else if (!insertInterval) { + debugPrint("%s() LN%d: stable insert interval be overrided by global %d.\n", + __func__, __LINE__, g_args.insert_interval); + g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval; + } else { + fprintf(stderr, "failed to read json, insert_interval input mistake"); goto PARSE_OVER; } @@ -3993,8 +4005,9 @@ static void syncWriteForNumberOfTblInOneSql( uint64_t time_counter = winfo->start_time; int sampleUsePos; + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; int64_t st = 0; - int64_t et = 0; + int64_t et = 0xffffffff; for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { @@ -4131,14 +4144,14 @@ static void syncWriteForNumberOfTblInOneSql( inserted += superTblInfo->rowsPerTbl; send_to_server: - if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { - int sleep_time = g_args.insert_interval - (et -st); - printf("sleep: %d ms specified by insert_interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } + if (insert_interval) { + st = taosGetTimestampUs(); - if (g_args.insert_interval) { - st = taosGetTimestampMs(); + if (insert_interval > ((et - st)/1000)) { + int sleep_time = insert_interval - (et -st); + printf("sleep: %d ms insert interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } } if (0 == strncasecmp(superTblInfo->insertMode, @@ -4189,8 +4202,8 @@ send_to_server: goto free_and_statistics; } } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); + if (insert_interval) { + et = taosGetTimestampUs(); } break; @@ -4347,8 +4360,9 @@ static void* syncWrite(void *sarg) { return NULL; } + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; - uint64_t et = 0; + uint64_t et = 0xffffffff; winfo->totalRowsInserted = 0; winfo->totalAffectedRows = 0; @@ -4371,6 +4385,10 @@ static void* syncWrite(void *sarg) { sampleUsePos = samplePos; + if (insert_interval) { + st = taosGetTimestampUs(); + } + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; @@ -4502,17 +4520,6 @@ static void* syncWrite(void *sarg) { winfo->totalRowsInserted += k; - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - printf("sleep: %d ms for insert interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - } - int64_t startTs = taosGetTimestampUs(); int64_t endTs; int affectedRows; @@ -4559,12 +4566,20 @@ static void* syncWrite(void *sarg) { lastPrintTime = currentPrintTime; } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } - if (prepared >= insertRows) break; + + if (insert_interval) { + et = taosGetTimestampUs(); + + printf("et: %ld ms st: %ld\n", et, st); + if (insert_interval > ((et - st)/1000) ) { + int sleep_time = insert_interval - (et -st)/1000; + printf("sleep: %d ms for insert interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } + } + } // num_of_DPT if ((tID == winfo->end_table_id) && superTblInfo && @@ -4588,11 +4603,13 @@ free_and_statistics_2: void callBack(void *param, TAOS_RES *res, int code) { threadInfo* winfo = (threadInfo*)param; + SSuperTable* superTblInfo = winfo->superTblInfo; - if (g_args.insert_interval) { - winfo->et = taosGetTimestampMs(); - if (winfo->et - winfo->st < 1000) { - taosMsleep(1000 - (winfo->et - winfo->st)); // ms + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + if (insert_interval) { + winfo->et = taosGetTimestampUs(); + if (((winfo->et - winfo->st)/1000) < insert_interval) { + taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms } } @@ -4632,8 +4649,8 @@ void callBack(void *param, TAOS_RES *res, int code) { } } - if (g_args.insert_interval) { - winfo->st = taosGetTimestampMs(); + if (insert_interval) { + winfo->st = taosGetTimestampUs(); } taos_query_a(winfo->taos, buffer, callBack, winfo); free(buffer); @@ -4644,13 +4661,15 @@ void callBack(void *param, TAOS_RES *res, int code) { void *asyncWrite(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = winfo->superTblInfo; winfo->st = 0; winfo->et = 0; winfo->lastTs = winfo->start_time; - if (g_args.insert_interval) { - winfo->st = taosGetTimestampMs(); + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + if (insert_interval) { + winfo->st = taosGetTimestampUs(); } taos_query_a(winfo->taos, "show databases", callBack, winfo); @@ -5076,7 +5095,7 @@ void *superQueryProcess(void *sarg) { //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } - st = taosGetTimestampMs(); + st = taosGetTimestampUs(); for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { int64_t t1 = taosGetTimestampUs(); @@ -5102,7 +5121,7 @@ void *superQueryProcess(void *sarg) { } } } - et = taosGetTimestampMs(); + et = taosGetTimestampUs(); printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); } @@ -5142,7 +5161,7 @@ static void *subQueryProcess(void *sarg) { //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } - st = taosGetTimestampMs(); + st = taosGetTimestampUs(); for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); @@ -5156,12 +5175,12 @@ static void *subQueryProcess(void *sarg) { selectAndGetResult(winfo->taos, sqlstr, tmpFile); } } - et = taosGetTimestampMs(); + et = taosGetTimestampUs(); printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), winfo->start_table_id, winfo->end_table_id, - (double)(et - st)/1000.0); + (double)(et - st)/1000000.0); } return NULL; } diff --git a/tests/pytest/tools/taosdemoWithMetric.py b/tests/pytest/tools/taosdemoTestWithoutMetric.py similarity index 100% rename from tests/pytest/tools/taosdemoWithMetric.py rename to tests/pytest/tools/taosdemoTestWithoutMetric.py