Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
15418181
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
15418181
编写于
3月 04, 2021
作者:
S
Shuduo Sang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-3147] <fix>: support insert interval instead of insert rate. cleanup
上级
76a6b2ff
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
228 addition
and
403 deletion
+228
-403
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+228
-403
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
15418181
...
...
@@ -239,6 +239,7 @@ typedef struct SArguments_S {
int
len_of_binary
;
int
num_of_CPR
;
int
num_of_threads
;
int
insert_interval
;
int
num_of_RPR
;
int
num_of_tables
;
int
num_of_DPT
;
...
...
@@ -458,286 +459,67 @@ typedef struct SThreadInfo_S {
}
threadInfo
;
#if 0
#ifdef LINUX
/* The options we understand. */
static struct argp_option options[] = {
{0, 'f', "meta file", 0, "The meta data to the execution procedure, if use -f, all others options invalid. Default is NULL.", 0},
#ifdef _TD_POWER_
{0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/power/'.", 1},
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'powerdb'.", 2},
#else
{0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 1},
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 2},
#endif
{0, 'h', "host", 0, "The host to connect to TDengine. Default is localhost.", 2},
{0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 2},
{0, 'u', "user", 0, "The TDengine user name to use when connecting to the server. Default is 'root'.", 2},
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 4},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 4},
{0, 's', "sql file", 0, "The select sql file.", 6},
{0, 'M', 0, 0, "Use metric flag.", 4},
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 6},
{0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 4},
{0, 'b', "type_of_cols", 0, "The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP.", 4},
{0, 'w', "length_of_chartype", 0, "The length of data_type 'BINARY' or 'NCHAR'. Default is 16", 4},
{0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 10.", 4},
{0, 'T', "num_of_threads", 0, "The number of threads. Default is 10.", 4},
// {0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 100.", 4},
{0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 4},
{0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 10000.", 4},
{0, 'x', 0, 0, "Not insert only flag.", 4},
{0, 'y', 0, 0, "Default input yes for prompt.", 4},
{0, 'O', "disorderRatio", 0, "Insert mode--0: In order, > 0: disorder ratio. Default is in order.", 4},
{0, 'R', "disorderRang", 0, "Out of order data's range, ms, default is 1000.", 4},
//{0, 'D', "delete database", 0, "if elete database if exists. 0: no, 1: yes, default is 1", 5},
{0}};
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
// Get the input argument from argp_parse, which we know is a pointer to our arguments structure.
SArguments *arguments = state->input;
wordexp_t full_path;
char **sptr;
switch (key) {
case 'f':
arguments->metaFile = arg;
break;
case 'h':
arguments->host = arg;
break;
case 'p':
arguments->port = atoi(arg);
break;
case 'u':
arguments->user = arg;
break;
case 'P':
arguments->password = arg;
break;
case 'o':
arguments->output_file = arg;
break;
case 's':
arguments->sqlFile = arg;
break;
case 'q':
arguments->mode = atoi(arg);
break;
case 'T':
arguments->num_of_threads = atoi(arg);
break;
//case 'r':
// arguments->num_of_RPR = atoi(arg);
// break;
case 't':
arguments->num_of_tables = atoi(arg);
break;
case 'n':
arguments->num_of_DPT = atoi(arg);
break;
case 'd':
arguments->database = arg;
break;
case 'l':
arguments->num_of_CPR = atoi(arg);
break;
case 'b':
sptr = arguments->datatype;
if (strstr(arg, ",") == NULL) {
if (strcasecmp(arg, "INT") != 0 && strcasecmp(arg, "FLOAT") != 0 &&
strcasecmp(arg, "TINYINT") != 0 && strcasecmp(arg, "BOOL") != 0 &&
strcasecmp(arg, "SMALLINT") != 0 && strcasecmp(arg, "TIMESTAMP") != 0 &&
strcasecmp(arg, "BIGINT") != 0 && strcasecmp(arg, "DOUBLE") != 0 &&
strcasecmp(arg, "BINARY") != 0 && strcasecmp(arg, "NCHAR") != 0) {
argp_error(state, "Invalid data_type!");
}
sptr[0] = arg;
} else {
int index = 0;
char *dupstr = strdup(arg);
char *running = dupstr;
char *token = strsep(&running, ",");
while (token != NULL) {
if (strcasecmp(token, "INT") != 0 && strcasecmp(token, "FLOAT") != 0 &&
strcasecmp(token, "TINYINT") != 0 && strcasecmp(token, "BOOL") != 0 &&
strcasecmp(token, "SMALLINT") != 0 && strcasecmp(token, "TIMESTAMP") != 0 &&
strcasecmp(token, "BIGINT") != 0 && strcasecmp(token, "DOUBLE") != 0 &&
strcasecmp(token, "BINARY") != 0 && strcasecmp(token, "NCHAR") != 0) {
argp_error(state, "Invalid data_type!");
}
sptr[index++] = token;
token = strsep(&running, ",");
if (index >= MAX_NUM_DATATYPE) break;
}
}
break;
case 'w':
arguments->len_of_binary = atoi(arg);
break;
case 'm':
arguments->tb_prefix = arg;
break;
case 'M':
arguments->use_metric = true;
break;
case 'x':
arguments->insert_only = true;
break;
case 'y':
arguments->answer_yes = true;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
wordfree(&full_path);
break;
case 'O':
arguments->disorderRatio = atoi(arg);
if (arguments->disorderRatio < 0 || arguments->disorderRatio > 100)
{
argp_error(state, "Invalid disorder ratio, should 1 ~ 100!");
}
break;
case 'R':
arguments->disorderRange = atoi(arg);
break;
case 'a':
arguments->replica = atoi(arg);
if (arguments->replica > 3 || arguments->replica < 1)
{
arguments->replica = 1;
}
break;
//case 'D':
// arguments->method_of_delete = atoi(arg);
// break;
case OPT_ABORT:
arguments->abort = 1;
break;
case ARGP_KEY_ARG:
/*arguments->arg_list = &state->argv[state->next-1];
state->next = state->argc;*/
argp_usage(state);
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
static struct argp argp = {options, parse_opt, 0, 0};
void parse_args(int argc, char *argv[], SArguments *arguments) {
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (arguments->abort) {
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
abort();
#endif
}
}
#else
#endif
#endif
void
printHelp
()
{
void
printHelp
()
{
char
indent
[
10
]
=
" "
;
printf
(
"%s%s
\n
"
,
indent
,
"-f"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-f"
,
indent
,
"The meta file to the execution procedure. Default is './meta.json'."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-u"
,
indent
,
"The TDengine user name to use when connecting to the server. Default is 'root'."
);
#ifdef _TD_POWER_
printf
(
"%s%s
\n
"
,
indent
,
"-c"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"Configuration directory. Default is '/etc/power/'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-P"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-P"
,
indent
,
"The password to use when connecting to the server. Default is 'powerdb'."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-c"
,
indent
,
"Configuration directory. Default is '/etc/power/'."
);
#else
printf
(
"%s%s
\n
"
,
indent
,
"-c"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"Configuration directory. Default is '/etc/taos/'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-P"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-P"
,
indent
,
"The password to use when connecting to the server. Default is 'taosdata'."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-c"
,
indent
,
"Configuration directory. Default is '/etc/taos/'."
);
#endif
printf
(
"%s%s
\n
"
,
indent
,
"-h"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-h"
,
indent
,
"The host to connect to TDengine. Default is localhost."
);
printf
(
"%s%s
\n
"
,
indent
,
"-p"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-p"
,
indent
,
"The TCP/IP port number to use for the connection. Default is 0."
);
printf
(
"%s%s
\n
"
,
indent
,
"-u"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The TDengine user name to use when connecting to the server. Default is 'root'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-d"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-d"
,
indent
,
"Destination database. Default is 'test'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-a"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-a"
,
indent
,
"Set the replica parameters of the database, Default 1, min: 1, max: 3."
);
printf
(
"%s%s
\n
"
,
indent
,
"-m"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-m"
,
indent
,
"Table prefix name. Default is 't'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-s"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The select sql file."
);
printf
(
"%s%s
\n
"
,
indent
,
"-M"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"Use metric flag."
);
printf
(
"%s%s
\n
"
,
indent
,
"-o"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-s"
,
indent
,
"The select sql file."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-M"
,
indent
,
"Use metric flag."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-o"
,
indent
,
"Direct output to the named file. Default is './output.txt'."
);
printf
(
"%s%s
\n
"
,
indent
,
"-q"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-q"
,
indent
,
"Query mode--0: SYNC, 1: ASYNC. Default is SYNC."
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-b"
,
indent
,
"The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP."
);
printf
(
"%s%s
\n
"
,
indent
,
"-w"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-w"
,
indent
,
"The length of data_type 'BINARY' or 'NCHAR'. Default is 16"
);
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-l"
,
indent
,
"The number of columns per record. Default is 10."
);
printf
(
"%s%s
\n
"
,
indent
,
"-T"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-T"
,
indent
,
"The number of threads. Default is 10."
);
printf
(
"%s%s
\n
"
,
indent
,
"-r"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-i"
,
indent
,
"The sleep time (ms) between insertion. Default is 0."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-r"
,
indent
,
"The number of records per request. Default is 100."
);
printf
(
"%s%s
\n
"
,
indent
,
"-t"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-t"
,
indent
,
"The number of tables. Default is 10000."
);
printf
(
"%s%s
\n
"
,
indent
,
"-n"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-n"
,
indent
,
"The number of records per table. Default is 10000."
);
printf
(
"%s%s
\n
"
,
indent
,
"-x"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"Not insert only flag."
);
printf
(
"%s%s
\n
"
,
indent
,
"-y"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"Default input yes for prompt."
);
printf
(
"%s%s
\n
"
,
indent
,
"-O"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-x"
,
indent
,
"Not insert only flag."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-y"
,
indent
,
"Default input yes for prompt."
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-O"
,
indent
,
"Insert mode--0: In order, > 0: disorder ratio. Default is in order."
);
printf
(
"%s%s
\n
"
,
indent
,
"-R"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
printf
(
"%s%s%s%s
\n
"
,
indent
,
"-R"
,
indent
,
"Out of order data's range, ms, default is 1000."
);
/* printf("%s%s\n", indent, "-D");
printf("%s%s%s\n", indent, indent,
/* printf("%s%s%s%s\n", indent, "-D", indent,
"if elete database if exists. 0: no, 1: yes, default is 1");
*/
}
}
void
parse_args
(
int
argc
,
char
*
argv
[],
SArguments
*
arguments
)
{
void
parse_args
(
int
argc
,
char
*
argv
[],
SArguments
*
arguments
)
{
char
**
sptr
;
wordexp_t
full_path
;
...
...
@@ -768,6 +550,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments
->
mode
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-T"
)
==
0
)
{
arguments
->
num_of_threads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
)
{
arguments
->
insert_interval
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
arguments
->
num_of_RPR
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
...
...
@@ -782,11 +566,15 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
sptr
=
arguments
->
datatype
;
++
i
;
if
(
strstr
(
argv
[
i
],
","
)
==
NULL
)
{
if
(
strcasecmp
(
argv
[
i
],
"INT"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"FLOAT"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"TINYINT"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"BOOL"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"SMALLINT"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"BIGINT"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"DOUBLE"
)
!=
0
&&
strcasecmp
(
argv
[
i
],
"BINARY"
)
&&
strcasecmp
(
argv
[
i
],
"NCHAR"
))
{
if
(
strcasecmp
(
argv
[
i
],
"INT"
)
&&
strcasecmp
(
argv
[
i
],
"FLOAT"
)
&&
strcasecmp
(
argv
[
i
],
"TINYINT"
)
&&
strcasecmp
(
argv
[
i
],
"BOOL"
)
&&
strcasecmp
(
argv
[
i
],
"SMALLINT"
)
&&
strcasecmp
(
argv
[
i
],
"BIGINT"
)
&&
strcasecmp
(
argv
[
i
],
"DOUBLE"
)
&&
strcasecmp
(
argv
[
i
],
"BINARY"
)
&&
strcasecmp
(
argv
[
i
],
"NCHAR"
))
{
fprintf
(
stderr
,
"Invalid data_type!
\n
"
);
printHelp
();
exit
(
EXIT_FAILURE
);
...
...
@@ -798,13 +586,15 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
char
*
running
=
dupstr
;
char
*
token
=
strsep
(
&
running
,
","
);
while
(
token
!=
NULL
)
{
if
(
strcasecmp
(
token
,
"INT"
)
!=
0
&&
strcasecmp
(
token
,
"FLOAT"
)
!=
0
&&
strcasecmp
(
token
,
"TINYINT"
)
!=
0
&&
strcasecmp
(
token
,
"BOOL"
)
!=
0
&&
strcasecmp
(
token
,
"SMALLINT"
)
!=
0
&&
strcasecmp
(
token
,
"BIGINT"
)
!=
0
&&
strcasecmp
(
token
,
"DOUBLE"
)
!=
0
&&
strcasecmp
(
token
,
"BINARY"
)
&&
strcasecmp
(
token
,
"NCHAR"
))
{
if
(
strcasecmp
(
token
,
"INT"
)
&&
strcasecmp
(
token
,
"FLOAT"
)
&&
strcasecmp
(
token
,
"TINYINT"
)
&&
strcasecmp
(
token
,
"BOOL"
)
&&
strcasecmp
(
token
,
"SMALLINT"
)
&&
strcasecmp
(
token
,
"BIGINT"
)
&&
strcasecmp
(
token
,
"DOUBLE"
)
&&
strcasecmp
(
token
,
"BINARY"
)
&&
strcasecmp
(
token
,
"NCHAR"
))
{
fprintf
(
stderr
,
"Invalid data_type!
\n
"
);
printHelp
();
exit
(
EXIT_FAILURE
);
...
...
@@ -828,7 +618,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-O"
)
==
0
)
{
arguments
->
disorderRatio
=
atoi
(
argv
[
++
i
]);
if
(
arguments
->
disorderRatio
>
1
||
arguments
->
disorderRatio
<
0
)
{
if
(
arguments
->
disorderRatio
>
1
||
arguments
->
disorderRatio
<
0
)
{
arguments
->
disorderRatio
=
0
;
}
else
if
(
arguments
->
disorderRatio
==
1
)
{
arguments
->
disorderRange
=
10
;
...
...
@@ -847,7 +638,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
}
}
else
if
(
strcmp
(
argv
[
i
],
"-D"
)
==
0
)
{
arguments
->
method_of_delete
=
atoi
(
argv
[
++
i
]);
if
(
arguments
->
method_of_delete
<
0
||
arguments
->
method_of_delete
>
3
)
{
if
(
arguments
->
method_of_delete
<
0
||
arguments
->
method_of_delete
>
3
)
{
arguments
->
method_of_delete
=
0
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"--help"
)
==
0
)
{
...
...
@@ -859,8 +651,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
exit
(
EXIT_FAILURE
);
}
}
}
//#endif
}
static
bool
getInfoFromJsonFile
(
char
*
file
);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
...
...
@@ -876,7 +667,8 @@ int32_t randint[MAX_PREPARED_RAND];
int64_t
randbigint
[
MAX_PREPARED_RAND
];
float
randfloat
[
MAX_PREPARED_RAND
];
double
randdouble
[
MAX_PREPARED_RAND
];
char
*
aggreFunc
[]
=
{
"*"
,
"count(*)"
,
"avg(col0)"
,
"sum(col0)"
,
"max(col0)"
,
"min(col0)"
,
"first(col0)"
,
"last(col0)"
};
char
*
aggreFunc
[]
=
{
"*"
,
"count(*)"
,
"avg(col0)"
,
"sum(col0)"
,
"max(col0)"
,
"min(col0)"
,
"first(col0)"
,
"last(col0)"
};
SArguments
g_args
=
{
NULL
,
"127.0.0.1"
,
// host
...
...
@@ -911,6 +703,7 @@ SArguments g_args = {NULL,
16
,
// len_of_binary
10
,
// num_of_CPR
10
,
// num_of_connections/thread
0
,
// insert_interval
100
,
// num_of_RPR
10000
,
// num_of_tables
10000
,
// num_of_DPT
...
...
@@ -4952,7 +4745,9 @@ void *subSubscribeProcess(void *sarg) {
if
(
res
)
{
char
tmpFile
[
MAX_FILE_NAME_LEN
*
2
]
=
{
0
};
if
(
g_queryInfo
.
subQueryInfo
.
result
[
i
][
0
]
!=
0
)
{
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
subQueryInfo
.
result
[
i
],
winfo
->
threadID
);
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
subQueryInfo
.
result
[
i
],
winfo
->
threadID
);
}
getResult
(
res
,
tmpFile
);
}
...
...
@@ -4961,7 +4756,8 @@ void *subSubscribeProcess(void *sarg) {
taos_free_result
(
res
);
for
(
int
i
=
0
;
i
<
g_queryInfo
.
subQueryInfo
.
sqlCount
;
i
++
)
{
taos_unsubscribe
(
g_queryInfo
.
subQueryInfo
.
tsub
[
i
],
g_queryInfo
.
subQueryInfo
.
subscribeKeepProgress
);
taos_unsubscribe
(
g_queryInfo
.
subQueryInfo
.
tsub
[
i
],
g_queryInfo
.
subQueryInfo
.
subscribeKeepProgress
);
}
return
NULL
;
}
...
...
@@ -4989,9 +4785,13 @@ void *superSubscribeProcess(void *sarg) {
sprintf
(
topic
,
"taosdemo-subscribe-%d"
,
i
);
char
tmpFile
[
MAX_FILE_NAME_LEN
*
2
]
=
{
0
};
if
(
g_queryInfo
.
subQueryInfo
.
result
[
i
][
0
]
!=
0
)
{
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
superQueryInfo
.
result
[
i
],
winfo
->
threadID
);
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
superQueryInfo
.
result
[
i
],
winfo
->
threadID
);
}
g_queryInfo
.
superQueryInfo
.
tsub
[
i
]
=
subscribeImpl
(
winfo
->
taos
,
g_queryInfo
.
superQueryInfo
.
sql
[
i
],
topic
,
tmpFile
);
g_queryInfo
.
superQueryInfo
.
tsub
[
i
]
=
subscribeImpl
(
winfo
->
taos
,
g_queryInfo
.
superQueryInfo
.
sql
[
i
],
topic
,
tmpFile
);
if
(
NULL
==
g_queryInfo
.
superQueryInfo
.
tsub
[
i
])
{
return
NULL
;
}
...
...
@@ -5012,7 +4812,8 @@ void *superSubscribeProcess(void *sarg) {
if
(
res
)
{
char
tmpFile
[
MAX_FILE_NAME_LEN
*
2
]
=
{
0
};
if
(
g_queryInfo
.
superQueryInfo
.
result
[
i
][
0
]
!=
0
)
{
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
superQueryInfo
.
result
[
i
],
winfo
->
threadID
);
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
superQueryInfo
.
result
[
i
],
winfo
->
threadID
);
}
getResult
(
res
,
tmpFile
);
}
...
...
@@ -5021,7 +4822,8 @@ void *superSubscribeProcess(void *sarg) {
taos_free_result
(
res
);
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
taos_unsubscribe
(
g_queryInfo
.
superQueryInfo
.
tsub
[
i
],
g_queryInfo
.
superQueryInfo
.
subscribeKeepProgress
);
taos_unsubscribe
(
g_queryInfo
.
superQueryInfo
.
tsub
[
i
],
g_queryInfo
.
superQueryInfo
.
subscribeKeepProgress
);
}
return
NULL
;
}
...
...
@@ -5042,14 +4844,19 @@ int subscribeTestProcess() {
}
if
(
0
!=
g_queryInfo
.
subQueryInfo
.
sqlCount
)
{
(
void
)
getAllChildNameOfSuperTable
(
taos
,
g_queryInfo
.
dbName
,
g_queryInfo
.
subQueryInfo
.
sTblName
,
&
g_queryInfo
.
subQueryInfo
.
childTblName
,
&
g_queryInfo
.
subQueryInfo
.
childTblCount
);
(
void
)
getAllChildNameOfSuperTable
(
taos
,
g_queryInfo
.
dbName
,
g_queryInfo
.
subQueryInfo
.
sTblName
,
&
g_queryInfo
.
subQueryInfo
.
childTblName
,
&
g_queryInfo
.
subQueryInfo
.
childTblCount
);
}
pthread_t
*
pids
=
NULL
;
threadInfo
*
infos
=
NULL
;
//==== create sub threads for query from super table
if
(
g_queryInfo
.
superQueryInfo
.
sqlCount
>
0
&&
g_queryInfo
.
superQueryInfo
.
concurrent
>
0
)
{
if
(
g_queryInfo
.
superQueryInfo
.
sqlCount
>
0
&&
g_queryInfo
.
superQueryInfo
.
concurrent
>
0
)
{
pids
=
malloc
(
g_queryInfo
.
superQueryInfo
.
concurrent
*
sizeof
(
pthread_t
));
infos
=
malloc
(
g_queryInfo
.
superQueryInfo
.
concurrent
*
sizeof
(
threadInfo
));
if
((
NULL
==
pids
)
||
(
NULL
==
infos
))
{
...
...
@@ -5069,9 +4876,12 @@ int subscribeTestProcess() {
//==== create sub threads for query from sub table
pthread_t
*
pidsOfSub
=
NULL
;
threadInfo
*
infosOfSub
=
NULL
;
if
((
g_queryInfo
.
subQueryInfo
.
sqlCount
>
0
)
&&
(
g_queryInfo
.
subQueryInfo
.
threadCnt
>
0
))
{
pidsOfSub
=
malloc
(
g_queryInfo
.
subQueryInfo
.
threadCnt
*
sizeof
(
pthread_t
));
infosOfSub
=
malloc
(
g_queryInfo
.
subQueryInfo
.
threadCnt
*
sizeof
(
threadInfo
));
if
((
g_queryInfo
.
subQueryInfo
.
sqlCount
>
0
)
&&
(
g_queryInfo
.
subQueryInfo
.
threadCnt
>
0
))
{
pidsOfSub
=
malloc
(
g_queryInfo
.
subQueryInfo
.
threadCnt
*
sizeof
(
pthread_t
));
infosOfSub
=
malloc
(
g_queryInfo
.
subQueryInfo
.
threadCnt
*
sizeof
(
threadInfo
));
if
((
NULL
==
pidsOfSub
)
||
(
NULL
==
infosOfSub
))
{
printf
(
"malloc failed for create threads
\n
"
);
taos_close
(
taos
);
...
...
@@ -5170,7 +4980,6 @@ void setParaFromArg(){
g_Dbs
.
db
[
0
].
dbCfg
.
replica
=
g_args
.
replica
;
tstrncpy
(
g_Dbs
.
db
[
0
].
dbCfg
.
precision
,
"ms"
,
MAX_DB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
resultFile
,
g_args
.
output_file
,
MAX_FILE_NAME_LEN
);
g_Dbs
.
use_metric
=
g_args
.
use_metric
;
...
...
@@ -5190,10 +4999,12 @@ void setParaFromArg(){
g_Dbs
.
db
[
0
].
superTbls
[
0
].
numRecPerReq
=
0
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
disorderRange
=
g_args
.
disorderRange
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
disorderRatio
=
g_args
.
disorderRatio
;
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblPrefix
,
g_args
.
tb_prefix
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblPrefix
,
g_args
.
tb_prefix
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
dataSource
,
"rand"
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
insertMode
,
"taosc"
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
startTimestamp
,
"2017-07-14 10:40:00.000"
,
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
startTimestamp
,
"2017-07-14 10:40:00.000"
,
MAX_TB_NAME_SIZE
);
g_Dbs
.
db
[
0
].
superTbls
[
0
].
timeStampStep
=
10
;
// g_args.num_of_RPR;
...
...
@@ -5207,7 +5018,9 @@ void setParaFromArg(){
memset
(
dataString
,
0
,
STRING_LEN
);
if
(
strcasecmp
(
data_type
[
0
],
"BINARY"
)
==
0
||
strcasecmp
(
data_type
[
0
],
"BOOL"
)
==
0
||
strcasecmp
(
data_type
[
0
],
"NCHAR"
)
==
0
)
{
if
(
strcasecmp
(
data_type
[
0
],
"BINARY"
)
==
0
||
strcasecmp
(
data_type
[
0
],
"BOOL"
)
==
0
||
strcasecmp
(
data_type
[
0
],
"NCHAR"
)
==
0
)
{
g_Dbs
.
do_aggreFunc
=
false
;
}
...
...
@@ -5217,7 +5030,8 @@ void setParaFromArg(){
break
;
}
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
columns
[
i
].
dataType
,
data_type
[
i
],
MAX_TB_NAME_SIZE
);
tstrncpy
(
g_Dbs
.
db
[
0
].
superTbls
[
0
].
columns
[
i
].
dataType
,
data_type
[
i
],
MAX_TB_NAME_SIZE
);
g_Dbs
.
db
[
0
].
superTbls
[
0
].
columns
[
i
].
dataLen
=
g_args
.
len_of_binary
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
columnCount
++
;
}
...
...
@@ -5339,23 +5153,28 @@ int main(int argc, char *argv[]) {
if
(
g_Dbs
.
cfgDir
[
0
])
taos_options
(
TSDB_OPTION_CONFIGDIR
,
g_Dbs
.
cfgDir
);
(
void
)
insertTestProcess
();
}
else
if
(
QUERY_MODE
==
g_jsonType
)
{
if
(
g_queryInfo
.
cfgDir
[
0
])
taos_options
(
TSDB_OPTION_CONFIGDIR
,
g_queryInfo
.
cfgDir
);
if
(
g_queryInfo
.
cfgDir
[
0
])
taos_options
(
TSDB_OPTION_CONFIGDIR
,
g_queryInfo
.
cfgDir
);
(
void
)
queryTestProcess
();
}
else
if
(
SUBSCRIBE_MODE
==
g_jsonType
)
{
if
(
g_queryInfo
.
cfgDir
[
0
])
taos_options
(
TSDB_OPTION_CONFIGDIR
,
g_queryInfo
.
cfgDir
);
if
(
g_queryInfo
.
cfgDir
[
0
])
taos_options
(
TSDB_OPTION_CONFIGDIR
,
g_queryInfo
.
cfgDir
);
(
void
)
subscribeTestProcess
();
}
else
{
;
}
}
else
{
memset
(
&
g_Dbs
,
0
,
sizeof
(
SDbs
));
g_jsonType
=
INSERT_MODE
;
setParaFromArg
();
if
(
NULL
!=
g_args
.
sqlFile
)
{
TAOS
*
qtaos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
g_Dbs
.
db
[
0
].
dbName
,
g_Dbs
.
port
);
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
g_Dbs
.
db
[
0
].
dbName
,
g_Dbs
.
port
);
querySqlFile
(
qtaos
,
g_args
.
sqlFile
);
taos_close
(
qtaos
);
return
0
;
...
...
@@ -5376,8 +5195,14 @@ int main(int argc, char *argv[]) {
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows;
rInfo
->
superTblInfo
=
&
g_Dbs
.
db
[
0
].
superTbls
[
0
];
rInfo
->
taos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
g_Dbs
.
db
[
0
].
dbName
,
g_Dbs
.
port
);
strcpy
(
rInfo
->
tb_prefix
,
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblPrefix
);
rInfo
->
taos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
g_Dbs
.
password
,
g_Dbs
.
db
[
0
].
dbName
,
g_Dbs
.
port
);
strcpy
(
rInfo
->
tb_prefix
,
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblPrefix
);
strcpy
(
rInfo
->
fp
,
g_Dbs
.
resultFile
);
if
(
!
g_Dbs
.
use_metric
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录