diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3265285ccadcdd89e74a2e77eb6b1f1e3dd78ce6..9a5aedcdb79b705a9be72c13afa860fb4f5a8506 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -32,6 +32,7 @@ #include #include #include +#include #include "taos.h" #include "tutil.h" @@ -54,6 +55,7 @@ static struct argp_option options[] = { {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, + {0, 's', "sql file", 0, "The select sql file.", 3}, {0, 'M', 0, 0, "Use metric flag.", 13}, {0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14}, {0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6}, @@ -79,6 +81,7 @@ typedef struct DemoArguments { char *password; char *database; char *tb_prefix; + char *sqlFile; bool use_metric; bool insert_only; char *output_file; @@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'o': arguments->output_file = arg; break; + case 's': + arguments->sqlFile = arg; + break; case 'q': arguments->mode = atoi(arg); break; @@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { arguments->tb_prefix = arg; break; case 'M': - arguments->use_metric = true; + arguments->use_metric = false; break; case 'x': - arguments->insert_only = true; + arguments->insert_only = false; break; case 'c': if (wordexp(arg, &full_path, 0) != 0) { @@ -253,6 +259,9 @@ typedef struct { int data_of_rate; int64_t start_time; bool do_aggreFunc; + + char* cols; + bool use_metric; sem_t mutex_sem; int notFinished; @@ -305,6 +314,8 @@ void rand_string(char *str, int size); double getCurrentTime(); void callBack(void *param, TAOS_RES *res, int code); +void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass); +void querySqlFile(TAOS* taos, char* sqlFile); int main(int argc, char *argv[]) { SDemoArguments arguments = { NULL, // host @@ -313,6 +324,7 @@ int main(int argc, char *argv[]) { "taosdata", // password "test", // database "t", // tb_prefix + NULL, false, // use_metric false, // insert_only "./output.txt", // output_file @@ -361,7 +373,7 @@ int main(int argc, char *argv[]) { abort(); #endif } - + enum MODE query_mode = arguments.mode; char *ip_addr = arguments.host; uint16_t port = arguments.port; @@ -385,6 +397,13 @@ int main(int argc, char *argv[]) { char dataString[STRING_LEN]; bool do_aggreFunc = true; + if (NULL != arguments.sqlFile) { + TAOS* qtaos = taos_connect(ip_addr, user, pass, db_name, port); + querySqlFile(qtaos, arguments.sqlFile); + taos_close(qtaos); + return 0; + } + memset(dataString, 0, STRING_LEN); int len = 0; @@ -495,47 +514,19 @@ int main(int argc, char *argv[]) { len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary); } - if (!use_metric) { - /* Create all the tables; */ - printf("Creating %d table(s)......\n", ntables); - for (int i = 0; i < ntables; i++) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); - queryDB(taos, command); - } - - printf("Table(s) created!\n"); - taos_close(taos); - - } else { + if (use_metric) { /* Create metric table */ printf("Creating meters super table...\n"); snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); queryDB(taos, command); printf("meters created!\n"); - /* Create all the tables; */ - printf("Creating %d table(s)......\n", ntables); - for (int i = 0; i < ntables; i++) { - int j; - if (i % 10 == 0) { - j = 10; - } else { - j = i % 10; - } - if (j % 2 == 0) { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); - } else { - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); - } - queryDB(taos, command); - } - - printf("Table(s) created!\n"); taos_close(taos); } + /* Wait for table to create */ + multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass); - /* Insert data */ double ts = getCurrentTime(); printf("Inserting data......\n"); @@ -685,6 +676,198 @@ int main(int argc, char *argv[]) { return 0; } +#define MAX_SQL_SIZE 65536 +void selectSql(TAOS* taos, char* sqlcmd) +{ + TAOS_RES *pSql = taos_query(taos, sqlcmd); + int32_t code = taos_errno(pSql); + + if (code != 0) { + printf("Failed to sqlcmd:%s, reason:%s\n", sqlcmd, taos_errstr(pSql)); + taos_free_result(pSql); + exit(1); + } + + int count = 0; + while (taos_fetch_row(pSql) != NULL) { + count++; + } + + taos_free_result(pSql); + return; +} + + +/* Function to do regular expression check */ +static int regexMatch(const char *s, const char *reg, int cflags) { + regex_t regex; + char msgbuf[100] = {0}; + + /* Compile regular expression */ + if (regcomp(®ex, reg, cflags) != 0) { + printf("Fail to compile regex\n"); + exit(-1); + } + + /* Execute regular expression */ + int reti = regexec(®ex, s, 0, NULL, 0); + if (!reti) { + regfree(®ex); + return 1; + } else if (reti == REG_NOMATCH) { + regfree(®ex); + return 0; + } else { + regerror(reti, ®ex, msgbuf, sizeof(msgbuf)); + printf("Regex match failed: %s\n", msgbuf); + regfree(®ex); + exit(-1); + } + + return 0; +} + +static int isCommentLine(char *line) { + if (line == NULL) return 1; + + return regexMatch(line, "^\\s*#.*", REG_EXTENDED); +} + +void querySqlFile(TAOS* taos, char* sqlFile) +{ + FILE *fp = fopen(sqlFile, "r"); + if (fp == NULL) { + printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); + exit(-1); + } + + int read_len = 0; + char * cmd = calloc(1, MAX_SQL_SIZE); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; + + double t = getCurrentTime(); + + while ((read_len = getline(&line, &line_len, fp)) != -1) { + if (read_len >= MAX_SQL_SIZE) continue; + line[--read_len] = '\0'; + + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } + + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + + memcpy(cmd + cmd_len, line, read_len); + selectSql(taos, cmd); + memset(cmd, 0, MAX_SQL_SIZE); + cmd_len = 0; + } + + t = getCurrentTime() - t; + printf("run %s took %.6f second(s)\n\n", sqlFile, t); + + free(cmd); + if (line) free(line); + fclose(fp); + return; +} + +void * createTable(void *sarg) +{ + char command[BUFFER_SIZE] = "\0"; + + info *winfo = (info *)sarg; + + if (!winfo->use_metric) { + /* Create all the tables; */ + printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); + for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); + queryDB(winfo->taos, command); + } + + taos_close(winfo->taos); + + } else { + /* Create all the tables; */ + printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); + for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + int j; + if (i % 10 == 0) { + j = 10; + } else { + j = i % 10; + } + if (j % 2 == 0) { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "shanghai"); + } else { + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "beijing"); + } + queryDB(winfo->taos, command); + } + taos_close(winfo->taos); + } + + return NULL; +} + +void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass) { + double ts = getCurrentTime(); + printf("create table......\n"); + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + info *infos = malloc(threads * sizeof(info)); + + int a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } + + int b = 0; + if (threads != 0) + b = ntables % threads; + int last = 0; + for (int i = 0; i < threads; i++) { + info *t_info = infos + i; + t_info->threadID = i; + tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); + tstrncpy(t_info->tb_prefix, tb_prefix, MAX_TB_NAME_SIZE); + t_info->taos = taos_connect(ip_addr, user, pass, db_name, port); + t_info->start_table_id = last; + t_info->end_table_id = i < b ? last + a : last + a - 1; + last = t_info->end_table_id + 1; + t_info->use_metric = use_metric; + t_info->cols = cols; + pthread_create(pids + i, NULL, createTable, t_info); + } + + for (int i = 0; i < threads; i++) { + pthread_join(pids[i], NULL); + } + + double t = getCurrentTime() - ts; + printf("Spent %.4f seconds to create %d tables with %d connections\n", t, ntables, threads); + + for (int i = 0; i < threads; i++) { + info *t_info = infos + i; + taos_close(t_info->taos); + sem_destroy(&(t_info->mutex_sem)); + sem_destroy(&(t_info->lock_sem)); + } + + free(pids); + free(infos); + + return ; +} + void *readTable(void *sarg) { info *rinfo = (info *)sarg; TAOS *taos = rinfo->taos;