未验证 提交 31d5d302 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3636] <fix>: taosdemo disorder rework. (#5671)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 763a14c8
...@@ -211,8 +211,8 @@ typedef struct SArguments_S { ...@@ -211,8 +211,8 @@ typedef struct SArguments_S {
int num_of_tables; int num_of_tables;
int num_of_DPT; int num_of_DPT;
int abort; int abort;
int disorderRatio; int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; int disorderRange; // ms or us by database precision
int method_of_delete; int method_of_delete;
char ** arg_list; char ** arg_list;
int64_t totalInsertRows; int64_t totalInsertRows;
...@@ -229,25 +229,25 @@ typedef struct SColumn_S { ...@@ -229,25 +229,25 @@ typedef struct SColumn_S {
typedef struct SSuperTable_S { typedef struct SSuperTable_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
int childTblCount; int childTblCount;
bool childTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
int childTblLimit; int childTblLimit;
int childTblOffset; int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes int multiThreadWriteOneTbl; // 0: no, 1: yes
int interlaceRows; // int interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
int maxSqlLen; // int maxSqlLen; //
int insertInterval; // insert interval, will override global insert interval int insertInterval; // insert interval, will override global insert interval
int64_t insertRows; // 0: no limit int64_t insertRows; // 0: no limit
int timeStampStep; int timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; // char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN+1]; char sampleFile[MAX_FILE_NAME_LEN+1];
char tagsFile[MAX_FILE_NAME_LEN+1]; char tagsFile[MAX_FILE_NAME_LEN+1];
...@@ -487,7 +487,7 @@ static int taosRandom() ...@@ -487,7 +487,7 @@ static int taosRandom()
return number; return number;
} }
#else #else // Not windows
static void setupForAnsiEscape(void) {} static void setupForAnsiEscape(void) {}
static void resetAfterAnsiEscape(void) { static void resetAfterAnsiEscape(void) {
...@@ -499,11 +499,15 @@ static void resetAfterAnsiEscape(void) { ...@@ -499,11 +499,15 @@ static void resetAfterAnsiEscape(void) {
static int taosRandom() static int taosRandom()
{ {
srand(time(NULL)); struct timeval tv;
gettimeofday(&tv, NULL);
srand(tv.tv_usec);
return rand(); return rand();
} }
#endif #endif // ifdef Windows
static int createDatabasesAndStables(); static int createDatabasesAndStables();
static void createChildTables(); static void createChildTables();
...@@ -676,7 +680,7 @@ static void printHelp() { ...@@ -676,7 +680,7 @@ static void printHelp() {
printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag."); printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag.");
printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt."); printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt.");
printf("%s%s%s%s\n", indent, "-O", indent, printf("%s%s%s%s\n", indent, "-O", indent,
"Insert mode--0: In order, > 0: disorder ratio. Default is in order."); "Insert mode--0: In order, 1 ~ 50: disorder ratio. Default is in order.");
printf("%s%s%s%s\n", indent, "-R", indent, printf("%s%s%s%s\n", indent, "-R", indent,
"Out of order data's range, ms, default is 1000."); "Out of order data's range, ms, default is 1000.");
printf("%s%s%s%s\n", indent, "-g", indent, printf("%s%s%s%s\n", indent, "-g", indent,
...@@ -800,20 +804,21 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -800,20 +804,21 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) { } else if (strcmp(argv[i], "-O") == 0) {
arguments->disorderRatio = atoi(argv[++i]); arguments->disorderRatio = atoi(argv[++i]);
if (arguments->disorderRatio > 1
|| arguments->disorderRatio < 0) { if (arguments->disorderRatio > 50)
arguments->disorderRatio = 50;
if (arguments->disorderRatio < 0)
arguments->disorderRatio = 0; arguments->disorderRatio = 0;
} else if (arguments->disorderRatio == 1) {
arguments->disorderRange = 10;
}
} else if (strcmp(argv[i], "-R") == 0) { } else if (strcmp(argv[i], "-R") == 0) {
arguments->disorderRange = atoi(argv[++i]); arguments->disorderRange = atoi(argv[++i]);
if (arguments->disorderRange == 1 if (arguments->disorderRange < 0)
&& (arguments->disorderRange > 50 arguments->disorderRange = 1000;
|| arguments->disorderRange <= 0)) {
arguments->disorderRange = 10;
}
} else if (strcmp(argv[i], "-a") == 0) { } else if (strcmp(argv[i], "-a") == 0) {
arguments->replica = atoi(argv[++i]); arguments->replica = atoi(argv[++i]);
if (arguments->replica > 3 || arguments->replica < 1) { if (arguments->replica > 3 || arguments->replica < 1) {
...@@ -997,8 +1002,9 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) ...@@ -997,8 +1002,9 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName)
taos_free_result(res); taos_free_result(res);
} }
static double getCurrentTime() { static double getCurrentTimeUs() {
struct timeval tv; struct timeval tv;
if (gettimeofday(&tv, NULL) != 0) { if (gettimeofday(&tv, NULL) != 0) {
perror("Failed to get current time in ms"); perror("Failed to get current time in ms");
return 0.0; return 0.0;
...@@ -3669,6 +3675,12 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3669,6 +3675,12 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio"); cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio");
if (disorderRatio && disorderRatio->type == cJSON_Number) { if (disorderRatio && disorderRatio->type == cJSON_Number) {
if (disorderRatio->valueint > 50)
disorderRatio->valueint = 50;
if (disorderRatio->valueint < 0)
disorderRatio->valueint = 0;
g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint; g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint;
} else if (!disorderRatio) { } else if (!disorderRatio) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0; g_Dbs.db[i].superTbls[j].disorderRatio = 0;
...@@ -4329,6 +4341,8 @@ static int32_t generateData(char *recBuf, char **data_type, ...@@ -4329,6 +4341,8 @@ static int32_t generateData(char *recBuf, char **data_type,
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
verbosePrint("%s() LN%d, recBuf:\n\t%s\n", __func__, __LINE__, recBuf);
return (int32_t)strlen(recBuf); return (int32_t)strlen(recBuf);
} }
...@@ -4440,7 +4454,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4440,7 +4454,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
pSamplePos); pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource, } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) { "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100; int rand_num = taosRandom() % 100;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime int64_t d = startTime
...@@ -4467,15 +4481,16 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4467,15 +4481,16 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
len += retLen; len += retLen;
remainderBufLen -= retLen; remainderBufLen -= retLen;
} else { } else {
int rand_num = taosRandom() % 100;
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
int rand_num = taosRandom() % 100;
if ((g_args.disorderRatio != 0) if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) { && (rand_num < g_args.disorderRatio)) {
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k
- taosRandom() % 1000000 + rand_num; - taosRandom() % g_args.disorderRange;
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
ncols_per_record, d, lenOfBinary); ncols_per_record, d, lenOfBinary);
} else { } else {
...@@ -5024,7 +5039,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -5024,7 +5039,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) { && rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % 1000000 + rand_num; int64_t d = winfo->lastTs - taosRandom() % winfo->superTblInfo->disorderRange;
generateRowData(data, d, winfo->superTblInfo); generateRowData(data, d, winfo->superTblInfo);
} else { } else {
generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo); generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
...@@ -5116,7 +5131,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5116,7 +5131,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
start_time = 1500000000000; start_time = 1500000000000;
} }
double start = getCurrentTime(); double start = getCurrentTimeUs();
// read sample data from file first // read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
...@@ -5311,7 +5326,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5311,7 +5326,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (cntDelay == 0) cntDelay = 1; if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay; avgDelay = (double)totalDelay / cntDelay;
double end = getCurrentTime(); double end = getCurrentTimeUs();
double t = end - start; double t = end - start;
if (superTblInfo) { if (superTblInfo) {
...@@ -5390,7 +5405,7 @@ static void *readTable(void *sarg) { ...@@ -5390,7 +5405,7 @@ static void *readTable(void *sarg) {
sprintf(command, "select %s from %s%d where ts>= %" PRId64, sprintf(command, "select %s from %s%d where ts>= %" PRId64,
aggreFunc[j], tb_prefix, i, sTime); aggreFunc[j], tb_prefix, i, sTime);
double t = getCurrentTime(); double t = getCurrentTimeUs();
TAOS_RES *pSql = taos_query(taos, command); TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
...@@ -5406,7 +5421,7 @@ static void *readTable(void *sarg) { ...@@ -5406,7 +5421,7 @@ static void *readTable(void *sarg) {
count++; count++;
} }
t = getCurrentTime() - t; t = getCurrentTimeUs() - t;
totalT += t; totalT += t;
taos_free_result(pSql); taos_free_result(pSql);
...@@ -5465,7 +5480,7 @@ static void *readMetric(void *sarg) { ...@@ -5465,7 +5480,7 @@ static void *readMetric(void *sarg) {
printf("Where condition: %s\n", condition); printf("Where condition: %s\n", condition);
fprintf(fp, "%s\n", command); fprintf(fp, "%s\n", command);
double t = getCurrentTime(); double t = getCurrentTimeUs();
TAOS_RES *pSql = taos_query(taos, command); TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
...@@ -5481,7 +5496,7 @@ static void *readMetric(void *sarg) { ...@@ -5481,7 +5496,7 @@ static void *readMetric(void *sarg) {
while (taos_fetch_row(pSql) != NULL) { while (taos_fetch_row(pSql) != NULL) {
count++; count++;
} }
t = getCurrentTime() - t; t = getCurrentTimeUs() - t;
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n", fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / t, t * 1000); num_of_tables * num_of_DPT / t, t * 1000);
...@@ -5535,9 +5550,9 @@ static int insertTestProcess() { ...@@ -5535,9 +5550,9 @@ static int insertTestProcess() {
double end; double end;
// create child tables // create child tables
start = getCurrentTime(); start = getCurrentTimeUs();
createChildTables(); createChildTables();
end = getCurrentTime(); end = getCurrentTimeUs();
if (g_totalChildTables > 0) { if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
...@@ -5549,7 +5564,7 @@ static int insertTestProcess() { ...@@ -5549,7 +5564,7 @@ static int insertTestProcess() {
taosMsleep(1000); taosMsleep(1000);
// create sub threads for inserting data // create sub threads for inserting data
//start = getCurrentTime(); //start = getCurrentTimeUs();
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) { if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
...@@ -5574,7 +5589,7 @@ static int insertTestProcess() { ...@@ -5574,7 +5589,7 @@ static int insertTestProcess() {
NULL); NULL);
} }
} }
//end = getCurrentTime(); //end = getCurrentTimeUs();
//int64_t totalInsertRows = 0; //int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0; //int64_t totalAffectedRows = 0;
...@@ -6395,7 +6410,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -6395,7 +6410,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
char * line = NULL; char * line = NULL;
size_t line_len = 0; size_t line_len = 0;
double t = getCurrentTime(); double t = getCurrentTimeUs();
while ((read_len = tgetline(&line, &line_len, fp)) != -1) { while ((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue; if (read_len >= MAX_SQL_SIZE) continue;
...@@ -6426,7 +6441,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -6426,7 +6441,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
cmd_len = 0; cmd_len = 0;
} }
t = getCurrentTime() - t; t = getCurrentTimeUs() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t); printf("run %s took %.6f second(s)\n\n", sqlFile, t);
tmfree(cmd); tmfree(cmd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册