未验证 提交 82d58725 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

cherry pick from develop branch. (#7940)

上级 99eb4f8e
......@@ -223,7 +223,7 @@ typedef struct SArguments_S {
char * sqlFile;
bool use_metric;
bool drop_database;
bool insert_only;
bool aggr_func;
bool answer_yes;
bool debug_print;
bool verbose_print;
......@@ -367,8 +367,7 @@ typedef struct SDbs_S {
char password[MAX_PASSWORD_SIZE];
char resultFile[MAX_FILE_NAME_LEN];
bool use_metric;
bool insert_only;
bool do_aggreFunc;
bool aggr_func;
bool asyncMode;
uint32_t threadCount;
......@@ -586,10 +585,12 @@ char *g_rand_current_buff = NULL;
char *g_rand_phase_buff = NULL;
char *g_randdouble_buff = NULL;
char *g_aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
#define DEFAULT_DATATYPE_NUM 3
char *g_aggreFuncDemo[] = {"*", "count(*)", "avg(current)", "sum(current)",
"max(current)", "min(current)", "first(current)", "last(current)"};
char *g_aggreFunc[] = {"*", "count(*)", "avg(C0)", "sum(C0)",
"max(C0)", "min(C0)", "first(C0)", "last(C0)"};
SArguments g_args = {
NULL, // metaFile
......@@ -611,7 +612,7 @@ SArguments g_args = {
NULL, // sqlFile
true, // use_metric
true, // drop_database
true, // insert_only
false, // aggr_func
false, // debug_print
false, // verbose_print
false, // performance statistic print
......@@ -774,13 +775,13 @@ static void printHelp() {
"The number of records per table. Default is 10000.");
printf("%s%s%s%s\n", indent, "-M", indent,
"The value of records generated are totally random.");
printf("%s%s%s%s\n", indent, indent, indent,
" The default is to simulate power equipment senario.");
printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag.");
printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt.");
printf("%s%s%s%s\n", indent, "-O", indent,
"Insert mode--0: In order, 1 ~ 50: disorder ratio. Default is in order.");
printf("%s%s%s%s\n", indent, "-R", indent,
printf("%s\n", "\t\t\t\tThe default is to simulate power equipment scenario.");
printf("%s%s%s%s\n", indent, "-x, --aggr-func", "\t\t",
"Test aggregation funtions after insertion.");
printf("%s%s%s%s\n", indent, "-y, --answer-yes", "\t\t", "Default input yes for prompt.");
printf("%s%s%s%s\n", indent, "-O, --disorder=NUMBER", "\t\t",
"Insert order mode--0: In order, 1 ~ 50: disorder ratio. Default is in order.");
printf("%s%s%s%s\n", indent, "-R, --disorder-range=NUMBER", "\t",
"Out of order data's range, ms, default is 1000.");
printf("%s%s%s%s\n", indent, "-g", indent,
"Print debug info.");
......@@ -1053,14 +1054,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrint("%s", "\n\t-m need a letter-initial string following!\n");
exit(EXIT_FAILURE);
}
arguments->tb_prefix = argv[++i];
} else if (strcmp(argv[i], "-N") == 0) {
} else if ((strcmp(argv[i], "-N") == 0)
|| (0 == strcmp(argv[i], "--normal-table"))) {
arguments->demo_mode = false;
arguments->use_metric = false;
} else if (strcmp(argv[i], "-M") == 0) {
arguments->demo_mode = false;
} else if (strcmp(argv[i], "-x") == 0) {
arguments->insert_only = false;
} else if (strcmp(argv[i], "-y") == 0) {
} else if ((strcmp(argv[i], "-x") == 0)
|| (0 == strcmp(argv[i], "--aggr-func"))) {
arguments->aggr_func = true;
} else if ((strcmp(argv[i], "-y") == 0)
|| (0 == strcmp(argv[i], "--answer-yes"))) {
arguments->answer_yes = true;
} else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true;
......@@ -1601,10 +1605,11 @@ static void init_rand_data() {
static int printfInsertMeta() {
SHOW_PARSE_RESULT_START();
if (g_args.demo_mode)
if (g_args.demo_mode) {
printf("\ntaosdemo is simulating data generated by power equipments monitoring...\n\n");
else
} else {
printf("\ntaosdemo is simulating random data as you request..\n\n");
}
if (g_args.iface != INTERFACE_BUT) {
// first time if no iface specified
......@@ -7291,13 +7296,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
free(infos);
}
static void *readTable(void *sarg) {
#if 1
static void *queryNtableAggrFunc(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readTable");
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = pThreadInfo->start_time;
setThreadName("queryNtableAggrFunc");
char *command = calloc(1, BUFFER_SIZE);
assert(command);
uint64_t startTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
......@@ -7315,10 +7321,20 @@ static void *readTable(void *sarg) {
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
bool aggr_func = g_Dbs.aggr_func;
int n = do_aggreFunc ? (sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
if (!do_aggreFunc) {
char **aggreFunc;
int n;
if (g_args.demo_mode) {
aggreFunc = g_aggreFuncDemo;
n = aggr_func?(sizeof(g_aggreFuncDemo) / sizeof(g_aggreFuncDemo[0])) : 2;
} else {
aggreFunc = g_aggreFunc;
n = aggr_func?(sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
}
if (!aggr_func) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
......@@ -7329,9 +7345,11 @@ static void *readTable(void *sarg) {
uint64_t count = 0;
for (int64_t i = 0; i < num_of_tables; i++) {
sprintf(command, "select %s from %s%"PRId64" where ts>= %" PRIu64,
g_aggreFunc[j], tb_prefix, i, sTime);
aggreFunc[j], tb_prefix, i, startTime);
double t = taosGetTimestampMs();
double t = taosGetTimestampUs();
debugPrint("%s() LN%d, sql command: %s\n",
__func__, __LINE__, command);
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
......@@ -7347,29 +7365,30 @@ static void *readTable(void *sarg) {
count++;
}
t = taosGetTimestampMs() - t;
t = taosGetTimestampUs() - t;
totalT += t;
taos_free_result(pSql);
}
fprintf(fp, "|%10s | %"PRId64" | %12.2f | %10.2f |\n",
g_aggreFunc[j][0] == '*' ? " * " : g_aggreFunc[j], totalData,
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
printf("select %10s took %.6f second(s)\n", g_aggreFunc[j], totalT * 1000);
aggreFunc[j][0] == '*' ? " * " : aggreFunc[j], totalData,
(double)(num_of_tables * num_of_DPT) / totalT, totalT / 1000000);
printf("select %10s took %.6f second(s)\n", aggreFunc[j], totalT / 1000000);
}
fprintf(fp, "\n");
fclose(fp);
#endif
free(command);
return NULL;
}
static void *readMetric(void *sarg) {
#if 1
static void *queryStableAggrFunc(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readMetric");
char command[BUFFER_SIZE] = "\0";
setThreadName("queryStableAggrFunc");
char *command = calloc(1, BUFFER_SIZE);
assert(command);
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
......@@ -7379,12 +7398,23 @@ static void *readMetric(void *sarg) {
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
bool aggr_func = g_Dbs.aggr_func;
int n = do_aggreFunc ? (sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
if (!do_aggreFunc) {
char **aggreFunc;
int n;
if (g_args.demo_mode) {
aggreFunc = g_aggreFuncDemo;
n = aggr_func?(sizeof(g_aggreFuncDemo) / sizeof(g_aggreFuncDemo[0])) : 2;
} else {
aggreFunc = g_aggreFunc;
n = aggr_func?(sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
}
if (!aggr_func) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
fprintf(fp, "Querying On %"PRId64" records:\n", totalData);
......@@ -7396,18 +7426,29 @@ static void *readMetric(void *sarg) {
for (int64_t i = 1; i <= m; i++) {
if (i == 1) {
sprintf(tempS, "t1 = %"PRId64"", i);
if (g_args.demo_mode) {
sprintf(tempS, "groupid = %"PRId64"", i);
} else {
sprintf(tempS, "t0 = %"PRId64"", i);
}
} else {
sprintf(tempS, " or t1 = %"PRId64" ", i);
if (g_args.demo_mode) {
sprintf(tempS, " or groupid = %"PRId64" ", i);
} else {
sprintf(tempS, " or t0 = %"PRId64" ", i);
}
}
strncat(condition, tempS, COND_BUF_LEN - 1);
sprintf(command, "select %s from meters where %s", g_aggreFunc[j], condition);
sprintf(command, "SELECT %s FROM meters WHERE %s", aggreFunc[j], condition);
printf("Where condition: %s\n", condition);
debugPrint("%s() LN%d, sql command: %s\n",
__func__, __LINE__, command);
fprintf(fp, "%s\n", command);
double t = taosGetTimestampMs();
double t = taosGetTimestampUs();
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
......@@ -7423,18 +7464,19 @@ static void *readMetric(void *sarg) {
while(taos_fetch_row(pSql) != NULL) {
count++;
}
t = taosGetTimestampMs() - t;
t = taosGetTimestampUs() - t;
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / (t * 1000.0), t);
printf("select %10s took %.6f second(s)\n\n", g_aggreFunc[j], t * 1000.0);
num_of_tables * num_of_DPT / (t / 1000), t);
printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t / 1000000);
taos_free_result(pSql);
}
fprintf(fp, "\n");
}
fclose(fp);
#endif
free(command);
return NULL;
}
......@@ -8414,9 +8456,8 @@ static void setParaFromArg() {
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
g_Dbs.use_metric = g_args.use_metric;
g_Dbs.insert_only = g_args.insert_only;
g_Dbs.do_aggreFunc = true;
g_Dbs.aggr_func = g_args.aggr_func;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char **data_type = g_args.datatype;
......@@ -8426,7 +8467,7 @@ static void setParaFromArg() {
if (strcasecmp(data_type[0], "BINARY") == 0
|| strcasecmp(data_type[0], "BOOL") == 0
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
g_Dbs.do_aggreFunc = false;
g_Dbs.aggr_func = false;
}
if (g_args.use_metric) {
......@@ -8607,7 +8648,7 @@ static void testMetaFile() {
}
}
static void queryResult() {
static void queryAggrFunc() {
// query data
pthread_t read_id;
......@@ -8616,7 +8657,6 @@ static void queryResult() {
pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
pThreadInfo->start_table_from = 0;
//pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
......@@ -8645,9 +8685,9 @@ static void queryResult() {
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, pThreadInfo);
pthread_create(&read_id, NULL, queryNtableAggrFunc, pThreadInfo);
} else {
pthread_create(&read_id, NULL, readMetric, pThreadInfo);
pthread_create(&read_id, NULL, queryStableAggrFunc, pThreadInfo);
}
pthread_join(read_id, NULL);
taos_close(pThreadInfo->taos);
......@@ -8669,8 +8709,9 @@ static void testCmdLine() {
g_args.test_mode = INSERT_TEST;
insertTestProcess();
if (false == g_Dbs.insert_only)
queryResult();
if (g_Dbs.aggr_func) {
queryAggrFunc();
}
}
int main(int argc, char *argv[]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册