From e437f5c823b0202ac07711cd9be414305b8042db Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 4 Mar 2021 16:17:59 +0800 Subject: [PATCH] [TD-3147] : support insert interval. json works. --- src/kit/taosdemo/insert.json | 4 +- src/kit/taosdemo/taosdemo.c | 503 ++++++++++++++++++++--------------- 2 files changed, 295 insertions(+), 212 deletions(-) diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index ebc0cfd607..33fd587509 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -9,6 +9,8 @@ "thread_count_create_tbl": 1, "result_file": "./insert_res.txt", "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 100, "databases": [{ "dbinfo": { "name": "db", @@ -35,8 +37,6 @@ "auto_create_table": "no", "data_source": "rand", "insert_mode": "taosc", - "insert_interval": 0, - "num_of_records_per_req": 100, "insert_rows": 100000, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 1, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 28a41b7925..449768bff5 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -61,56 +61,6 @@ #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 -#ifdef WINDOWS -#include -// Some old MinGW/CYGWIN distributions don't define this: -#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING -#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004 -#endif - -static HANDLE g_stdoutHandle; -static DWORD g_consoleMode; - -void setupForAnsiEscape(void) { - DWORD mode = 0; - g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE); - - if(g_stdoutHandle == INVALID_HANDLE_VALUE) { - exit(GetLastError()); - } - - if(!GetConsoleMode(g_stdoutHandle, &mode)) { - exit(GetLastError()); - } - - g_consoleMode = mode; - - // Enable ANSI escape codes - mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; - - if(!SetConsoleMode(g_stdoutHandle, mode)) { - exit(GetLastError()); - } -} - -void resetAfterAnsiEscape(void) { - // Reset colors - printf("\x1b[0m"); - - // Reset console mode - if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) { - exit(GetLastError()); - } -} -#else -void setupForAnsiEscape(void) {} - -void resetAfterAnsiEscape(void) { - // Reset colors - printf("\x1b[0m"); -} -#endif - extern char configDir[]; #define INSERT_JSON_NAME "insert.json" @@ -163,7 +113,7 @@ enum MODE { ASYNC, MODE_BUT }; - + enum QUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, @@ -233,6 +183,7 @@ typedef struct SArguments_S { bool use_metric; bool insert_only; bool answer_yes; + bool debug_print; char * output_file; int mode; char * datatype[MAX_NUM_DATATYPE + 1]; @@ -267,8 +218,6 @@ typedef struct SSuperTable_S { char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful - uint32_t insertInterval; // interval time between insert twice - uint32_t numRecPerReq; int multiThreadWriteOneTbl; // 0: no, 1: yes int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl @@ -374,6 +323,9 @@ typedef struct SDbs_S { int dbCount; SDataBase db[MAX_DB_COUNT]; + int insert_interval; + int num_of_RPR; + // statistics int64_t totalRowsInserted; int64_t totalAffectedRows; @@ -458,6 +410,125 @@ typedef struct SThreadInfo_S { } threadInfo; +#ifdef WINDOWS +#include +// Some old MinGW/CYGWIN distributions don't define this: +#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING +#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004 +#endif + +static HANDLE g_stdoutHandle; +static DWORD g_consoleMode; + +void setupForAnsiEscape(void) { + DWORD mode = 0; + g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE); + + if(g_stdoutHandle == INVALID_HANDLE_VALUE) { + exit(GetLastError()); + } + + if(!GetConsoleMode(g_stdoutHandle, &mode)) { + exit(GetLastError()); + } + + g_consoleMode = mode; + + // Enable ANSI escape codes + mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; + + if(!SetConsoleMode(g_stdoutHandle, mode)) { + exit(GetLastError()); + } +} + +void resetAfterAnsiEscape(void) { + // Reset colors + printf("\x1b[0m"); + + // Reset console mode + if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) { + exit(GetLastError()); + } +} +#else +void setupForAnsiEscape(void) {} + +void resetAfterAnsiEscape(void) { + // Reset colors + printf("\x1b[0m"); +} +#endif + +static int createDatabases(); +static void createChildTables(); +static int queryDbExec(TAOS *taos, char *command, int type); + +/* ************ Global variables ************ */ + +int32_t randint[MAX_PREPARED_RAND]; +int64_t randbigint[MAX_PREPARED_RAND]; +float randfloat[MAX_PREPARED_RAND]; +double randdouble[MAX_PREPARED_RAND]; +char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", + "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; + +SArguments g_args = {NULL, + "127.0.0.1", // host + 6030, // port + "root", // user + #ifdef _TD_POWER_ + "powerdb", // password + #else + "taosdata", // password + #endif + "test", // database + 1, // replica + "t", // tb_prefix + NULL, // sqlFile + false, // use_metric + false, // insert_only + false, // debug_print + false, // answer_yes; + "./output.txt", // output_file + 0, // mode : sync or async + { + "TINYINT", // datatype + "SMALLINT", + "INT", + "BIGINT", + "FLOAT", + "DOUBLE", + "BINARY", + "NCHAR", + "BOOL", + "TIMESTAMP" + }, + 16, // len_of_binary + 10, // num_of_CPR + 10, // num_of_connections/thread + 0, // insert_interval + 100, // num_of_RPR + 10000, // num_of_tables + 10000, // num_of_DPT + 0, // abort + 0, // disorderRatio + 1000, // disorderRange + 1, // method_of_delete + NULL // arg_list +}; + + +static int g_jsonType = 0; + +static SDbs g_Dbs; +static int g_totalChildTables = 0; +static SQueryMetaInfo g_queryInfo; +static FILE * g_fpOfInsertResult = NULL; + +#define debugPrint(fmt, ...) \ + do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) +/////////////////////////////////////////////////// void printHelp() { char indent[10] = " "; @@ -514,6 +585,8 @@ void printHelp() { "Insert mode--0: In order, > 0: disorder ratio. Default is in order."); printf("%s%s%s%s\n", indent, "-R", indent, "Out of order data's range, ms, default is 1000."); + printf("%s%s%s%s\n", indent, "-g", indent, + "Print debug info."); /* printf("%s%s%s%s\n", indent, "-D", indent, "if elete database if exists. 0: no, 1: yes, default is 1"); */ @@ -614,6 +687,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->insert_only = true; } else if (strcmp(argv[i], "-y") == 0) { arguments->answer_yes = true; + } else if (strcmp(argv[i], "-g") == 0) { + arguments->debug_print = true; } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-O") == 0) { @@ -651,77 +726,42 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { exit(EXIT_FAILURE); } } + + if (arguments->debug_print) { + printf("###################################################################\n"); + printf("# Server IP: %s:%hu\n", + arguments->host == NULL ? "localhost" : arguments->host, + arguments->port ); + printf("# User: %s\n", arguments->user); + printf("# Password: %s\n", arguments->password); + printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + printf("# Insertion interval: %d\n", arguments->insert_interval); + printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); + printf("# Number of Threads: %d\n", arguments->num_of_threads); + printf("# Number of Tables: %d\n", arguments->num_of_tables); + printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); + printf("# Database name: %s\n", arguments->database); + printf("# Table prefix: %s\n", arguments->tb_prefix); + if (arguments->disorderRatio) { + printf("# Data order: %d\n", arguments->disorderRatio); + printf("# Data out of order rate: %d\n", arguments->disorderRange); + + } + printf("# Delete method: %d\n", arguments->method_of_delete); + printf("# Answer yes when prompt: %d\n", arguments->answer_yes); + printf("# Print debug info: %d\n", arguments->debug_print); + printf("###################################################################\n"); + if (!arguments->answer_yes) { + printf("Press enter key to continue\n\n"); + (void) getchar(); + } + } } static bool getInfoFromJsonFile(char* file); //static int generateOneRowDataForStb(SSuperTable* stbInfo); //static int getDataIntoMemForStb(SSuperTable* stbInfo); static void init_rand_data(); -static int createDatabases(); -static void createChildTables(); -static int queryDbExec(TAOS *taos, char *command, int type); - -/* ************ Global variables ************ */ - -int32_t randint[MAX_PREPARED_RAND]; -int64_t randbigint[MAX_PREPARED_RAND]; -float randfloat[MAX_PREPARED_RAND]; -double randdouble[MAX_PREPARED_RAND]; -char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", - "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; - -SArguments g_args = {NULL, - "127.0.0.1", // host - 6030, // port - "root", // user - #ifdef _TD_POWER_ - "powerdb", // password - #else - "taosdata", // password - #endif - "test", // database - 1, // replica - "t", // tb_prefix - NULL, // sqlFile - false, // use_metric - false, // insert_only - false, // answer_yes; - "./output.txt", // output_file - 0, // mode : sync or async - { - "TINYINT", // datatype - "SMALLINT", - "INT", - "BIGINT", - "FLOAT", - "DOUBLE", - "BINARY", - "NCHAR", - "BOOL", - "TIMESTAMP" - }, - 16, // len_of_binary - 10, // num_of_CPR - 10, // num_of_connections/thread - 0, // insert_interval - 100, // num_of_RPR - 10000, // num_of_tables - 10000, // num_of_DPT - 0, // abort - 0, // disorderRatio - 1000, // disorderRange - 1, // method_of_delete - NULL // arg_list -}; - - -static int g_jsonType = 0; -static SDbs g_Dbs; -static int g_totalChildTables = 0; -static SQueryMetaInfo g_queryInfo; -static FILE * g_fpOfInsertResult = NULL; - - void tmfclose(FILE *fp) { if (NULL != fp) { fclose(fp); @@ -917,6 +957,8 @@ static int printfInsertMeta() { printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); + printf("insert interval: \033[33m%d\033[0m\n", g_Dbs.insert_interval); + printf("number of records per req: \033[33m%d\033[0m\n", g_Dbs.num_of_RPR); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -999,8 +1041,6 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); - printf(" insertInterval: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertInterval); - printf(" numRecPerReq: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numRecPerReq); printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { @@ -1138,8 +1178,6 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); - fprintf(fp, " insertInterval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval); - fprintf(fp, " numRecPerReq: %d\n", g_Dbs.db[i].superTbls[j].numRecPerReq); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { @@ -2530,6 +2568,26 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval"); + if (insertInterval && insertInterval->type == cJSON_Number) { + g_Dbs.insert_interval = insertInterval->valueint; + } else if (!insertInterval) { + g_Dbs.insert_interval = 0; + } else { + printf("failed to read json, insert_interval not found"); + goto PARSE_OVER; + } + + cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); + if (numRecPerReq && numRecPerReq->type == cJSON_Number) { + g_Dbs.num_of_RPR = numRecPerReq->valueint; + } else if (!numRecPerReq) { + g_Dbs.num_of_RPR = 0; + } else { + printf("failed to read json, num_of_records_per_req not found"); + goto PARSE_OVER; + } + cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) { if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { @@ -2983,26 +3041,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval"); - if (insertInterval && insertInterval->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint; - } else if (!insertInterval) { - g_Dbs.db[i].superTbls[j].insertInterval = 0; - } else { - printf("failed to read json, insert_interval not found"); - goto PARSE_OVER; - } - - cJSON* numRecPerReq = cJSON_GetObjectItem(stbInfo, "num_of_records_per_req"); - if (numRecPerReq && numRecPerReq->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].numRecPerReq = numRecPerReq->valueint; - } else if (!numRecPerReq) { - g_Dbs.db[i].superTbls[j].numRecPerReq = 0; - } else { - printf("failed to read json, num_of_records_per_req not found"); - goto PARSE_OVER; - } - cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { @@ -3414,7 +3452,7 @@ static bool getInfoFromJsonFile(char* file) { } else { printf("input json file type error! please input correct file type: insert or query or subscribe\n"); goto PARSE_OVER; - } + } PARSE_OVER: free(content); @@ -3423,7 +3461,6 @@ PARSE_OVER: return ret; } - void prePareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { @@ -3526,7 +3563,8 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* return dataLen; } -void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDataBuf) { +void syncWriteForNumberOfTblInOneSql( + threadInfo *winfo, FILE *fp, char* sampleDataBuf) { SSuperTable* superTblInfo = winfo->superTblInfo; int samplePos = 0; @@ -3555,11 +3593,11 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa int64_t st = 0; int64_t et = 0; for (int i = 0; i < superTblInfo->insertRows;) { - if (superTblInfo->insertInterval && (superTblInfo->insertInterval > (et - st))) { - taosMsleep(superTblInfo->insertInterval - (et - st)); // ms + if (g_Dbs.insert_interval && (g_Dbs.insert_interval > (et - st))) { + taosMsleep(g_Dbs.insert_interval - (et - st)); // ms } - if (superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { st = taosGetTimestampMs(); } @@ -3567,8 +3605,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; - for (int k = 0; k < winfo->superTblInfo->numRecPerReq;) - { + for (int k = 0; k < g_Dbs.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -3649,7 +3686,8 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa tmp_time = time_counter; for (k = 0; k < superTblInfo->rowsPerTbl;) { int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample"))) { retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, @@ -3684,7 +3722,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa //inserted++; k++; totalRowsInserted++; - + if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; @@ -3693,13 +3731,12 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa goto send_to_server; } } - } tID = tbl_id; inserted += superTblInfo->rowsPerTbl; - send_to_server: +send_to_server: if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { @@ -3758,7 +3795,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa } } - if (superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { et = taosGetTimestampMs(); } //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); @@ -3843,12 +3880,12 @@ void *syncWrite(void *sarg) { uint64_t et = 0; for (int i = 0; i < superTblInfo->insertRows;) { - if (i > 0 && superTblInfo->insertInterval - && (superTblInfo->insertInterval > (et - st) )) { - taosMsleep(superTblInfo->insertInterval - (et - st)); // ms + if (i > 0 && g_Dbs.insert_interval + && (g_Dbs.insert_interval > (et - st) )) { + taosMsleep(g_Dbs.insert_interval - (et - st)); // ms } - if (superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { st = taosGetTimestampMs(); } @@ -3901,10 +3938,10 @@ void *syncWrite(void *sarg) { superTblInfo->childTblPrefix, tID); } - - for (k = 0; k < superTblInfo->numRecPerReq;) { + + for (k = 0; k < g_Dbs.num_of_RPR;) { int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", 6)) { + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, @@ -3916,9 +3953,10 @@ void *syncWrite(void *sarg) { if (retLen < 0) { goto free_and_statistics_2; } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", 8)) { + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { + if (0 != superTblInfo->disorderRatio + && rand_num < superTblInfo->disorderRatio) { int64_t d = tmp_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, @@ -3946,7 +3984,7 @@ void *syncWrite(void *sarg) { break; } - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); //int64_t t1 = taosGetTimestampMs(); int64_t startTs; @@ -3975,7 +4013,7 @@ void *syncWrite(void *sarg) { totalAffectedRows); lastPrintTime = currentPrintTime; } - //int64_t t2 = taosGetTimestampMs(); + //int64_t t2 = taosGetTimestampMs(); //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); } else { //int64_t t1 = taosGetTimestampMs(); @@ -3992,15 +4030,15 @@ void *syncWrite(void *sarg) { if (tID == winfo->end_table_id) { if (0 == strncasecmp( - superTblInfo->dataSource, "sample", 6)) { + superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } i = inserted; time_counter = tmp_time; } - } - - if (superTblInfo->insertInterval) { + } + + if (g_Dbs.insert_interval) { et = taosGetTimestampMs(); } //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); @@ -4024,7 +4062,7 @@ free_and_statistics_2: void callBack(void *param, TAOS_RES *res, int code) { threadInfo* winfo = (threadInfo*)param; - if (winfo->superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { winfo->et = taosGetTimestampMs(); if (winfo->et - winfo->st < 1000) { taosMsleep(1000 - (winfo->et - winfo->st)); // ms @@ -4047,7 +4085,7 @@ void callBack(void *param, TAOS_RES *res, int code) { return; } - for (int i = 0; i < winfo->superTblInfo->numRecPerReq; i++) { + for (int i = 0; i < g_Dbs.num_of_RPR; i++) { int rand_num = rand() % 100; if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio) { @@ -4066,7 +4104,7 @@ void callBack(void *param, TAOS_RES *res, int code) { } } - if (winfo->superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { winfo->st = taosGetTimestampMs(); } taos_query_a(winfo->taos, buffer, callBack, winfo); @@ -4083,7 +4121,7 @@ void *asyncWrite(void *sarg) { winfo->et = 0; winfo->lastTs = winfo->start_time; - if (winfo->superTblInfo->insertInterval) { + if (g_Dbs.insert_interval) { winfo->st = taosGetTimestampMs(); } taos_query_a(winfo->taos, "show databases", callBack, winfo); @@ -4136,7 +4174,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { start_time = taosGetTimestamp(timePrec); } else { - (void)taosParseTime(superTblInfo->startTimestamp, &start_time, strlen(superTblInfo->startTimestamp), timePrec, 0); + (void)taosParseTime( + superTblInfo->startTimestamp, + &start_time, + strlen(superTblInfo->startTimestamp), + timePrec, 0); } double start = getCurrentTime(); @@ -4153,7 +4195,9 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { //t_info->taos = taos; - t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); + t_info->taos = taos_connect( + g_Dbs.host, g_Dbs.user, + g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); exit(-1); @@ -4173,7 +4217,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu } tsem_init(&(t_info->lock_sem), 0, 0); - if (SYNC == g_Dbs.queryMode) { pthread_create(pids + i, NULL, syncWrite, t_info); } else { @@ -4217,7 +4260,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); - printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0); fprintf(g_fpOfInsertResult, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n", @@ -4406,10 +4448,13 @@ int insertTestProcess() { createChildTables(); end = getCurrentTime(); if (g_totalChildTables > 0) { - printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); - fprintf(g_fpOfInsertResult, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); + printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + end - start, g_totalChildTables, g_Dbs.threadCount); + fprintf(g_fpOfInsertResult, + "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + end - start, g_totalChildTables, g_Dbs.threadCount); } - + taosMsleep(1000); // create sub threads for inserting data @@ -4420,11 +4465,15 @@ int insertTestProcess() { if (0 == g_Dbs.db[i].superTbls[j].insertRows) { continue; } - startMultiThreadInsertData(g_Dbs.threadCount, g_Dbs.db[i].dbName, g_Dbs.db[i].dbCfg.precision, superTblInfo); + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + superTblInfo); } } //end = getCurrentTime(); - + //int64_t totalRowsInserted = 0; //int64_t totalAffectedRows = 0; //for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -4443,7 +4492,12 @@ int insertTestProcess() { //rInfo->do_aggreFunc = g_Dbs.do_aggreFunc; //rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows; rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; - rInfo->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, g_Dbs.db[0].dbName, g_Dbs.port); + rInfo->taos = taos_connect( + g_Dbs.host, + g_Dbs.user, + g_Dbs.password, + g_Dbs.db[0].dbName, + g_Dbs.port); strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix); strcpy(rInfo->fp, g_Dbs.resultFile); @@ -4486,12 +4540,15 @@ void *superQueryProcess(void *sarg) { } selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); int64_t t2 = taosGetTimestampUs(); - printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); + printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", + taosGetSelfPthreadId(), (t2 - t1)/1000000.0); } else { int64_t t1 = taosGetTimestampUs(); - int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); + int retCode = postProceSql(g_queryInfo.host, + g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); int64_t t2 = taosGetTimestampUs(); - printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", taosGetSelfPthreadId(), (t2 - t1)/1000000.0); + printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", + taosGetSelfPthreadId(), (t2 - t1)/1000000.0); if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", winfo->threadID); @@ -4500,7 +4557,8 @@ void *superQueryProcess(void *sarg) { } } et = taosGetTimestampMs(); - printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); + printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", + taosGetSelfPthreadId(), (double)(et - st)/1000.0); } return NULL; } @@ -4508,7 +4566,9 @@ void *superQueryProcess(void *sarg) { void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; - sprintf(subTblName, "%s.%s", g_queryInfo.dbName, g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); + sprintf(subTblName, "%s.%s", + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); //printf("inSql: %s\n", inSql); @@ -4543,27 +4603,41 @@ void *subQueryProcess(void *sarg) { replaceSubTblName(g_queryInfo.subQueryInfo.sql[i], sqlstr, i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.subQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", g_queryInfo.subQueryInfo.result[i], winfo->threadID); + sprintf(tmpFile, "%s-%d", + g_queryInfo.subQueryInfo.result[i], + winfo->threadID); } selectAndGetResult(winfo->taos, sqlstr, tmpFile); } } et = taosGetTimestampMs(); - printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), winfo->start_table_id, winfo->end_table_id, (double)(et - st)/1000.0); + printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", + taosGetSelfPthreadId(), + winfo->start_table_id, + winfo->end_table_id, + (double)(et - st)/1000.0); } return NULL; } int queryTestProcess() { TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, NULL, g_queryInfo.port); + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + NULL, + g_queryInfo.port); if (taos == NULL) { fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - (void)getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); + (void)getAllChildNameOfSuperTable(taos, + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.sTblName, + &g_queryInfo.subQueryInfo.childTblName, + &g_queryInfo.subQueryInfo.childTblCount); } printfQueryMeta(); @@ -4683,9 +4757,14 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF TAOS_SUB* tsub = NULL; if (g_queryInfo.superQueryInfo.subscribeMode) { - tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, subscribe_callback, (void*)resultFileName, g_queryInfo.superQueryInfo.subscribeInterval); + tsub = taos_subscribe(taos, + g_queryInfo.superQueryInfo.subscribeRestart, + topic, sql, subscribe_callback, (void*)resultFileName, + g_queryInfo.superQueryInfo.subscribeInterval); } else { - tsub = taos_subscribe(taos, g_queryInfo.superQueryInfo.subscribeRestart, topic, sql, NULL, NULL, 0); + tsub = taos_subscribe(taos, + g_queryInfo.superQueryInfo.subscribeRestart, + topic, sql, NULL, NULL, 0); } if (tsub == NULL) { @@ -4837,7 +4916,11 @@ int subscribeTestProcess() { } TAOS * taos = NULL; - taos = taos_connect(g_queryInfo.host, g_queryInfo.user, g_queryInfo.password, g_queryInfo.dbName, g_queryInfo.port); + taos = taos_connect(g_queryInfo.host, + g_queryInfo.user, + g_queryInfo.password, + g_queryInfo.dbName, + g_queryInfo.port); if (taos == NULL) { fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); exit(-1); @@ -4935,23 +5018,23 @@ int subscribeTestProcess() { void initOfInsertMeta() { memset(&g_Dbs, 0, sizeof(SDbs)); - // set default values - tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE); - g_Dbs.port = 6030; - tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); - tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); - g_Dbs.threadCount = 2; - g_Dbs.use_metric = true; + // set default values + tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE); + g_Dbs.port = 6030; + tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); + g_Dbs.threadCount = 2; + g_Dbs.use_metric = true; } void initOfQueryMeta() { memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo)); - // set default values - tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE); - g_queryInfo.port = 6030; - tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); - tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); + // set default values + tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE); + g_queryInfo.port = 6030; + tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_DB_NAME_SIZE); + tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE); } void setParaFromArg(){ @@ -4995,8 +5078,6 @@ void setParaFromArg(){ g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; - g_Dbs.db[0].superTbls[0].insertInterval = 0; - g_Dbs.db[0].superTbls[0].numRecPerReq = 0; g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange; g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio; tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, @@ -5145,10 +5226,12 @@ int main(int argc, char *argv[]) { if (g_args.metaFile) { initOfInsertMeta(); initOfQueryMeta(); + if (false == getInfoFromJsonFile(g_args.metaFile)) { printf("Failed to read %s\n", g_args.metaFile); return 1; } + if (INSERT_MODE == g_jsonType) { if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir); (void)insertTestProcess(); -- GitLab