提交 87cdb1c5 编写于 作者: S Shuaiqiang Chang

fix: taosdemo params error

上级 dfbe9ace
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
"sync" "sync"
"time" "time"
_ "github.com/taosdata/TDengine/src/connector/go/src/taosSql" _ "github.com/taosdata/TDengine/src/connector/go/taosSql"
) )
const ( const (
...@@ -634,6 +634,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -634,6 +634,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows == batch { if appendRows == batch {
// executebatch // executebatch
insertSql := buffers.String() insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection) affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows successRows[threadIndex] += affectedRows
...@@ -658,6 +659,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] ...@@ -658,6 +659,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows > 0 { if appendRows > 0 {
// executebatch // executebatch
insertSql := buffers.String() insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection) affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows successRows[threadIndex] += affectedRows
......
...@@ -50,7 +50,7 @@ static struct argp_option options[] = { ...@@ -50,7 +50,7 @@ static struct argp_option options[] = {
{0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 1}, {0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 1},
{0, 'u', "user", 0, "The TDEngine user name to use when connecting to the server. Default is 'root'.", 2}, {0, 'u', "user", 0, "The TDEngine user name to use when connecting to the server. Default is 'root'.", 2},
{0, 'a', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'a', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'P', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13}, {0, 'M', 0, 0, "Use metric flag.", 13},
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14}, {0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14},
...@@ -58,11 +58,11 @@ static struct argp_option options[] = { ...@@ -58,11 +58,11 @@ static struct argp_option options[] = {
{0, 'b', "type_of_cols", 0, "The data_type of columns: 'INT', 'TINYINT', 'SMALLINT', 'BIGINT', 'FLOAT', 'DOUBLE', 'BINARY'. Default is 'INT'.", 7}, {0, 'b', "type_of_cols", 0, "The data_type of columns: 'INT', 'TINYINT', 'SMALLINT', 'BIGINT', 'FLOAT', 'DOUBLE', 'BINARY'. Default is 'INT'.", 7},
{0, 'w', "length_of_binary", 0, "The length of data_type 'BINARY'. Only applicable when type of cols is 'BINARY'. Default is 8", 8}, {0, 'w', "length_of_binary", 0, "The length of data_type 'BINARY'. Only applicable when type of cols is 'BINARY'. Default is 8", 8},
{0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 3.", 8}, {0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 3.", 8},
{0, 'c', "num_of_conns", 0, "The number of connections. Default is 10.", 9}, {0, 'T', "num_of_threads", 0, "The number of threads. Default is 10.", 9},
{0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 1000.", 10}, {0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 1000.", 10},
{0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 11}, {0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 11},
{0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 100000.", 12}, {0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 100000.", 12},
{0, 'f', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14}, {0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14},
{0, 'x', 0, 0, "Insert only flag.", 13}, {0, 'x', 0, 0, "Insert only flag.", 13},
{0}}; {0}};
...@@ -81,7 +81,7 @@ typedef struct DemoArguments { ...@@ -81,7 +81,7 @@ typedef struct DemoArguments {
char *datatype[MAX_NUM_DATATYPE]; char *datatype[MAX_NUM_DATATYPE];
int len_of_binary; int len_of_binary;
int num_of_CPR; int num_of_CPR;
int num_of_connections; int num_of_threads;
int num_of_RPR; int num_of_RPR;
int num_of_tables; int num_of_tables;
int num_of_DPT; int num_of_DPT;
...@@ -106,7 +106,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -106,7 +106,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'u': case 'u':
arguments->user = arg; arguments->user = arg;
break; break;
case 'a': case 'P':
arguments->password = arg; arguments->password = arg;
break; break;
case 'o': case 'o':
...@@ -115,8 +115,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -115,8 +115,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'q': case 'q':
arguments->mode = atoi(arg); arguments->mode = atoi(arg);
break; break;
case 'c': case 'T':
arguments->num_of_connections = atoi(arg); arguments->num_of_threads = atoi(arg);
break; break;
case 'r': case 'r':
arguments->num_of_RPR = atoi(arg); arguments->num_of_RPR = atoi(arg);
...@@ -176,7 +176,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -176,7 +176,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'x': case 'x':
arguments->insert_only = true; arguments->insert_only = true;
break; break;
case 'f': case 'c':
if (wordexp(arg, &full_path, 0) != 0) { if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg); fprintf(stderr, "Invalid path %s\n", arg);
return -1; return -1;
...@@ -291,7 +291,7 @@ int main(int argc, char *argv[]) { ...@@ -291,7 +291,7 @@ int main(int argc, char *argv[]) {
}, },
8, // len_of_binary 8, // len_of_binary
1, // num_of_CPR 1, // num_of_CPR
1, // num_of_connections 1, // num_of_connections/thread
1, // num_of_RPR 1, // num_of_RPR
1, // num_of_tables 1, // num_of_tables
50000, // num_of_DPT 50000, // num_of_DPT
...@@ -304,7 +304,7 @@ int main(int argc, char *argv[]) { ...@@ -304,7 +304,7 @@ int main(int argc, char *argv[]) {
// For demo use, change default values for some parameters; // For demo use, change default values for some parameters;
arguments.num_of_tables = 10000; arguments.num_of_tables = 10000;
arguments.num_of_CPR = 3; arguments.num_of_CPR = 3;
arguments.num_of_connections = 10; arguments.num_of_threads = 10;
arguments.num_of_DPT = 100000; arguments.num_of_DPT = 100000;
arguments.num_of_RPR = 1000; arguments.num_of_RPR = 1000;
arguments.use_metric = true; arguments.use_metric = true;
...@@ -331,7 +331,7 @@ int main(int argc, char *argv[]) { ...@@ -331,7 +331,7 @@ int main(int argc, char *argv[]) {
int len_of_binary = arguments.len_of_binary; int len_of_binary = arguments.len_of_binary;
int ncols_per_record = arguments.num_of_CPR; int ncols_per_record = arguments.num_of_CPR;
int ntables = arguments.num_of_tables; int ntables = arguments.num_of_tables;
int nconnections = arguments.num_of_connections; int threads = arguments.num_of_threads;
int nrecords_per_table = arguments.num_of_DPT; int nrecords_per_table = arguments.num_of_DPT;
int nrecords_per_request = arguments.num_of_RPR; int nrecords_per_request = arguments.num_of_RPR;
bool use_metric = arguments.use_metric; bool use_metric = arguments.use_metric;
...@@ -371,7 +371,7 @@ int main(int argc, char *argv[]) { ...@@ -371,7 +371,7 @@ int main(int argc, char *argv[]) {
printf("# Binary Length(If applicable): %d\n", printf("# Binary Length(If applicable): %d\n",
(strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1); (strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1);
printf("# Number of Columns per record: %d\n", ncols_per_record); printf("# Number of Columns per record: %d\n", ncols_per_record);
printf("# Number of Connections: %d\n", nconnections); printf("# Number of Threads: %d\n", threads);
printf("# Number of Tables: %d\n", ntables); printf("# Number of Tables: %d\n", ntables);
printf("# Number of Data per Table: %d\n", nrecords_per_table); printf("# Number of Data per Table: %d\n", nrecords_per_table);
printf("# Records/Request: %d\n", nrecords_per_request); printf("# Records/Request: %d\n", nrecords_per_request);
...@@ -392,7 +392,7 @@ int main(int argc, char *argv[]) { ...@@ -392,7 +392,7 @@ int main(int argc, char *argv[]) {
fprintf(fp, "# Binary Length(If applicable): %d\n", fprintf(fp, "# Binary Length(If applicable): %d\n",
(strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1); (strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1);
fprintf(fp, "# Number of Columns per record: %d\n", ncols_per_record); fprintf(fp, "# Number of Columns per record: %d\n", ncols_per_record);
fprintf(fp, "# Number of Connections: %d\n", nconnections); fprintf(fp, "# Number of Threads: %d\n", threads);
fprintf(fp, "# Number of Tables: %d\n", ntables); fprintf(fp, "# Number of Tables: %d\n", ntables);
fprintf(fp, "# Number of Data per Table: %d\n", nrecords_per_table); fprintf(fp, "# Number of Data per Table: %d\n", nrecords_per_table);
fprintf(fp, "# Records/Request: %d\n", nrecords_per_request); fprintf(fp, "# Records/Request: %d\n", nrecords_per_request);
...@@ -414,7 +414,7 @@ int main(int argc, char *argv[]) { ...@@ -414,7 +414,7 @@ int main(int argc, char *argv[]) {
sprintf(command, "drop database %s;", db_name); sprintf(command, "drop database %s;", db_name);
taos_query(taos, command); taos_query(taos, command);
sleep(3);
sprintf(command, "create database %s;", db_name); sprintf(command, "create database %s;", db_name);
taos_query(taos, command); taos_query(taos, command);
...@@ -479,22 +479,22 @@ int main(int argc, char *argv[]) { ...@@ -479,22 +479,22 @@ int main(int argc, char *argv[]) {
taos_close(taos); taos_close(taos);
} }
/* Wait for table to create */ /* Wait for table to create */
sleep(5);
/* Insert data */ /* Insert data */
double ts = getCurrentTime(); double ts = getCurrentTime();
printf("Inserting data......\n"); printf("Inserting data......\n");
pthread_t *pids = malloc(nconnections * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(nconnections * sizeof(info)); info *infos = malloc(threads * sizeof(info));
int a = ntables / nconnections; int a = ntables / threads;
if (a < 1) { if (a < 1) {
nconnections = ntables; threads = ntables;
a = 1; a = 1;
} }
int b = ntables % nconnections; int b = ntables % threads;
int last = 0; int last = 0;
for (int i = 0; i < nconnections; i++) { for (int i = 0; i < threads; i++) {
info *t_info = infos + i; info *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
strcpy(t_info->db_name, db_name); strcpy(t_info->db_name, db_name);
...@@ -520,15 +520,15 @@ int main(int argc, char *argv[]) { ...@@ -520,15 +520,15 @@ int main(int argc, char *argv[]) {
pthread_create(pids + i, NULL, asyncWrite, t_info); pthread_create(pids + i, NULL, asyncWrite, t_info);
} }
} }
for (int i = 0; i < nconnections; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
double t = getCurrentTime() - ts; double t = getCurrentTime() - ts;
if (query_mode == SYNC) { if (query_mode == SYNC) {
printf("SYNC Insert with %d connections:\n", nconnections); printf("SYNC Insert with %d connections:\n", threads);
} else { } else {
printf("ASYNC Insert with %d connections:\n", nconnections); printf("ASYNC Insert with %d connections:\n", threads);
} }
fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n", fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n",
...@@ -540,7 +540,7 @@ int main(int argc, char *argv[]) { ...@@ -540,7 +540,7 @@ int main(int argc, char *argv[]) {
t, ntables * nrecords_per_table, nrecords_per_request, t, ntables * nrecords_per_table, nrecords_per_request,
ntables * nrecords_per_table / t); ntables * nrecords_per_table / t);
for (int i = 0; i < nconnections; i++) { for (int i = 0; i < threads; i++) {
info *t_info = infos + i; info *t_info = infos + i;
taos_close(t_info->taos); taos_close(t_info->taos);
sem_destroy(&(t_info->mutex_sem)); sem_destroy(&(t_info->mutex_sem));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册