未验证 提交 c3bbb10a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2075 from taosdata/hotfix/taos-tools

Hotfix/taos tools
......@@ -18,7 +18,7 @@ import (
_ "github.com/taosdata/TDengine/src/connector/go/src/taosSql"
_ "github.com/taosdata/TDengine/src/connector/go/taosSql"
const (
......@@ -634,6 +634,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows == batch {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows
......@@ -658,6 +659,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows > 0 {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows
......@@ -49,7 +49,7 @@ static struct argp_option options[] = {
{0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0},
{0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 1},
{0, 'u', "user", 0, "The TDEngine user name to use when connecting to the server. Default is 'root'.", 2},
{0, 'a', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13},
......@@ -58,12 +58,15 @@ static struct argp_option options[] = {
{0, 'b', "type_of_cols", 0, "The data_type of columns: 'INT', 'TINYINT', 'SMALLINT', 'BIGINT', 'FLOAT', 'DOUBLE', 'BINARY'. Default is 'INT'.", 7},
{0, 'w', "length_of_binary", 0, "The length of data_type 'BINARY'. Only applicable when type of cols is 'BINARY'. Default is 8", 8},
{0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 3.", 8},
{0, 'c', "num_of_conns", 0, "The number of connections. Default is 10.", 9},
{0, 'T', "num_of_threads", 0, "The number of threads. Default is 10.", 9},
{0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 1000.", 10},
{0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 11},
{0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 100000.", 12},
{0, 'f', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14},
{0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14},
{0, 'x', 0, 0, "Insert only flag.", 13},
{0, 'O', "order", 0, "Insert mode--0: In order, 1: Out of order. Default is in order.", 14},
{0, 'R', "rate", 0, "Out of order data's rate--if order=1 Default 10, min: 0, max: 50.", 14},
{0, 'D', "delete table", 0, "Delete data methods——0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database", 14},
/* Used by main to communicate with parse_opt. */
......@@ -81,11 +84,14 @@ typedef struct DemoArguments {
char *datatype[MAX_NUM_DATATYPE];
int len_of_binary;
int num_of_CPR;
int num_of_connections;
int num_of_threads;
int num_of_RPR;
int num_of_tables;
int num_of_DPT;
int abort;
int order;
int rate;
int method_of_delete;
char **arg_list;
} SDemoArguments;
......@@ -106,7 +112,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'u':
arguments->user = arg;
case 'a':
case 'P':
arguments->password = arg;
case 'o':
......@@ -115,8 +121,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'q':
arguments->mode = atoi(arg);
case 'c':
arguments->num_of_connections = atoi(arg);
case 'T':
arguments->num_of_threads = atoi(arg);
case 'r':
arguments->num_of_RPR = atoi(arg);
......@@ -176,7 +182,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'x':
arguments->insert_only = true;
case 'f':
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
......@@ -184,6 +190,30 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
case 'O':
arguments->order = atoi(arg);
if (arguments->order > 1 || arguments->order < 0)
arguments->order = 0;
} else if (arguments->order == 1)
arguments->rate = 10;
case 'R':
arguments->rate = atoi(arg);
if (arguments->order == 1 && (arguments->rate > 50 || arguments->rate <= 0))
arguments->rate = 10;
case 'D':
arguments->method_of_delete = atoi(arg);
if (arguments->method_of_delete < 0 || arguments->method_of_delete > 3)
arguments->method_of_delete = 0;
arguments->abort = 1;
......@@ -217,6 +247,8 @@ typedef struct {
int ncols_per_record;
int nrecords_per_table;
int nrecords_per_request;
int data_of_order;
int data_of_rate;
int64_t start_time;
bool do_aggreFunc;
......@@ -236,6 +268,8 @@ typedef struct {
int ncols_per_record;
char **data_type;
int len_of_binary;
int data_of_order;
int data_of_rate;
sem_t *mutex_sem;
int *notFinished;
......@@ -258,6 +292,8 @@ void *readMetric(void *sarg);
void *syncWrite(void *sarg);
void *deleteTable();
void *asyncWrite(void *sarg);
void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary);
......@@ -291,11 +327,14 @@ int main(int argc, char *argv[]) {
8, // len_of_binary
1, // num_of_CPR
1, // num_of_connections
1, // num_of_connections/thread
1, // num_of_RPR
1, // num_of_tables
50000, // num_of_DPT
0, // abort
0, // order
0, // rate
0, // method_of_delete
NULL // arg_list
......@@ -304,7 +343,7 @@ int main(int argc, char *argv[]) {
// For demo use, change default values for some parameters;
arguments.num_of_tables = 10000;
arguments.num_of_CPR = 3;
arguments.num_of_connections = 10;
arguments.num_of_threads = 10;
arguments.num_of_DPT = 100000;
arguments.num_of_RPR = 1000;
arguments.use_metric = true;
......@@ -330,8 +369,11 @@ int main(int argc, char *argv[]) {
char *tb_prefix = arguments.tb_prefix;
int len_of_binary = arguments.len_of_binary;
int ncols_per_record = arguments.num_of_CPR;
int order = arguments.order;
int rate = arguments.rate;
int method_of_delete = arguments.method_of_delete;
int ntables = arguments.num_of_tables;
int nconnections = arguments.num_of_connections;
int threads = arguments.num_of_threads;
int nrecords_per_table = arguments.num_of_DPT;
int nrecords_per_request = arguments.num_of_RPR;
bool use_metric = arguments.use_metric;
......@@ -371,12 +413,19 @@ int main(int argc, char *argv[]) {
printf("# Binary Length(If applicable): %d\n",
(strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1);
printf("# Number of Columns per record: %d\n", ncols_per_record);
printf("# Number of Connections: %d\n", nconnections);
printf("# Number of Threads: %d\n", threads);
printf("# Number of Tables: %d\n", ntables);
printf("# Number of Data per Table: %d\n", nrecords_per_table);
printf("# Records/Request: %d\n", nrecords_per_request);
printf("# Database name: %s\n", db_name);
printf("# Table prefix: %s\n", tb_prefix);
if (order == 1)
printf("# Data order: %d\n", order);
printf("# Data out of order rate: %d\n", rate);
printf("# Delete method: %d\n", method_of_delete);
printf("# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
......@@ -392,12 +441,18 @@ int main(int argc, char *argv[]) {
fprintf(fp, "# Binary Length(If applicable): %d\n",
(strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1);
fprintf(fp, "# Number of Columns per record: %d\n", ncols_per_record);
fprintf(fp, "# Number of Connections: %d\n", nconnections);
fprintf(fp, "# Number of Threads: %d\n", threads);
fprintf(fp, "# Number of Tables: %d\n", ntables);
fprintf(fp, "# Number of Data per Table: %d\n", nrecords_per_table);
fprintf(fp, "# Records/Request: %d\n", nrecords_per_request);
fprintf(fp, "# Database name: %s\n", db_name);
fprintf(fp, "# Table prefix: %s\n", tb_prefix);
if (order == 1)
printf("# Data order: %d\n", order);
printf("# Data out of order rate: %d\n", rate);
fprintf(fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
fprintf(fp, "###################################################################\n\n");
......@@ -414,7 +469,7 @@ int main(int argc, char *argv[]) {
sprintf(command, "drop database %s;", db_name);
taos_query(taos, command);
sprintf(command, "create database %s;", db_name);
taos_query(taos, command);
......@@ -479,22 +534,22 @@ int main(int argc, char *argv[]) {
/* Wait for table to create */
/* Insert data */
double ts = getCurrentTime();
printf("Inserting data......\n");
pthread_t *pids = malloc(nconnections * sizeof(pthread_t));
info *infos = malloc(nconnections * sizeof(info));
pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(threads * sizeof(info));
int a = ntables / nconnections;
int a = ntables / threads;
if (a < 1) {
nconnections = ntables;
threads = ntables;
a = 1;
int b = ntables % nconnections;
int b = ntables % threads;
int last = 0;
for (int i = 0; i < nconnections; i++) {
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
t_info->threadID = i;
strcpy(t_info->db_name, db_name);
......@@ -507,6 +562,8 @@ int main(int argc, char *argv[]) {
t_info->len_of_binary = len_of_binary;
t_info->nrecords_per_request = nrecords_per_request;
t_info->start_table_id = last;
t_info->data_of_order = order;
t_info->data_of_rate = rate;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
......@@ -520,15 +577,15 @@ int main(int argc, char *argv[]) {
pthread_create(pids + i, NULL, asyncWrite, t_info);
for (int i = 0; i < nconnections; i++) {
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
double t = getCurrentTime() - ts;
if (query_mode == SYNC) {
printf("SYNC Insert with %d connections:\n", nconnections);
printf("SYNC Insert with %d connections:\n", threads);
} else {
printf("ASYNC Insert with %d connections:\n", nconnections);
printf("ASYNC Insert with %d connections:\n", threads);
fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n",
......@@ -540,7 +597,7 @@ int main(int argc, char *argv[]) {
t, ntables * nrecords_per_table, nrecords_per_request,
ntables * nrecords_per_table / t);
for (int i = 0; i < nconnections; i++) {
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
......@@ -551,6 +608,55 @@ int main(int argc, char *argv[]) {
if (method_of_delete != 0)
TAOS *dtaos = taos_connect(ip_addr, user, pass, db_name, port);
double dts = getCurrentTime();
printf("Deleteing %d table(s)......\n", ntables);
switch (method_of_delete)
case 1:
// delete by table
/* Create all the tables; */
for (int i = 0; i < ntables; i++) {
sprintf(command, "drop table %s.%s%d;", db_name, tb_prefix, i);
queryDB(dtaos, command);
case 2:
// delete by stable
if (!use_metric) {
sprintf(command, "drop table %s.meters;", db_name);
queryDB(dtaos, command);
case 3:
// delete by database
sprintf(command, "drop database %s;", db_name);
queryDB(dtaos, command);
printf("Table(s) droped!\n");
double dt = getCurrentTime() - dts;
printf("Spent %.4f seconds to drop %d tables\n", dt, ntables);
FILE *fp = fopen(arguments.output_file, "a");
fprintf(fp, "Spent %.4f seconds to drop %d tables\n", dt, ntables);
if (!insert_only) {
// query data
pthread_t read_id;
......@@ -735,7 +841,15 @@ void *syncWrite(void *sarg) {
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, tID);
int k;
for (k = 0; k < winfo->nrecords_per_request;) {
generateData(data, data_type, ncols_per_record, tmp_time++, len_of_binary);
int rand_num = rand() % 100;
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
long d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, data_type, ncols_per_record, d, len_of_binary);
} else
generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
pstr += sprintf(pstr, " %s", data);
......@@ -774,6 +888,8 @@ void *asyncWrite(void *sarg) {
tb_info->mutex_sem = &(winfo->mutex_sem);
tb_info->notFinished = &(winfo->notFinished);
tb_info->lock_sem = &(winfo->lock_sem);
tb_info->data_of_order = winfo->data_of_order;
tb_info->data_of_rate = winfo->data_of_rate;
/* char buff[BUFFER_SIZE] = "\0"; */
/* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */
......@@ -815,7 +931,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
generateData(data, datatype, ncols_per_record, tmp_time++, len_of_binary);
int rand_num = rand() % 100;
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate)
long d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, datatype, ncols_per_record, d, len_of_binary);
} else
generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
pstr += sprintf(pstr, "%s", data);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册