Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
4494203f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4494203f
编写于
3月 09, 2021
作者:
sangshuduo
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into hotfix/sangshuduo/TD-3215-bus-error-arm32
上级
ed134f3d
fd5fd885
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
302 addition
and
181 deletion
+302
-181
documentation20/cn/10.cluster/docs.md
documentation20/cn/10.cluster/docs.md
+8
-2
src/dnode/src/dnodeVnodes.c
src/dnode/src/dnodeVnodes.c
+8
-0
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+166
-154
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+67
-21
tests/pytest/crash_gen/db.py
tests/pytest/crash_gen/db.py
+35
-2
tests/pytest/crash_gen/misc.py
tests/pytest/crash_gen/misc.py
+10
-2
tests/pytest/crash_gen/settings.py
tests/pytest/crash_gen/settings.py
+8
-0
未找到文件。
documentation20/cn/10.cluster/docs.md
浏览文件 @
4494203f
...
...
@@ -225,7 +225,13 @@ SHOW MNODES;
## <a class="anchor" id="arbitrator"></a>Arbitrator的使用
如果副本数为偶数,当一个
vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了Arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含Arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到Arbitrator,那么节点B
就能正常工作。
如果副本数为偶数,当一个
vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B
就能正常工作。
TDengine提供一个执行程序,名为 tarbitrator,找任何一台Linux服务器运行它即可。请点击
[
安装包下载
](
https://www.taosdata.com/cn/all-downloads/
)
,在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数
`-p`
可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为Arbitrator的End Point。如果该参数配置了,当副本数为偶数时,系统将自动连接配置的Arbitrator。如果副本数为奇数,即使配置了Arbitrator,系统也不会去建立连接。
总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。
Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没有要求,只需要保证有网络连接,找任何一台 Linux 服务器运行它即可。以下简要描述安装配置的步骤:
1.
请点击
[
安装包下载
](
https://www.taosdata.com/cn/all-downloads/
)
,在 TDengine Arbitrator Linux 一节中,选择合适的版本下载并安装。
2.
该应用的命令行参数
`-p`
可以指定其对外服务的端口号,缺省是 6042。
3.
修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。)
4.
在配置文件中配置了的 Arbitrator,会出现在
`SHOW DNODES;`
指令的返回结果中,对应的 role 列的值会是“arb”。
src/dnode/src/dnodeVnodes.c
浏览文件 @
4494203f
...
...
@@ -198,6 +198,14 @@ void dnodeCleanupVnodes() {
static
void
dnodeProcessStatusRsp
(
SRpcMsg
*
pMsg
)
{
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"status rsp is received, error:%s"
,
tstrerror
(
pMsg
->
code
));
if
(
pMsg
->
code
==
TSDB_CODE_MND_DNODE_NOT_EXIST
)
{
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
dnodeGetClusterId
(
clusterId
);
if
(
clusterId
[
0
]
!=
'\0'
)
{
dError
(
"exit zombie dropped dnode"
);
exit
(
EXIT_FAILURE
);
}
}
taosTmrReset
(
dnodeSendStatusMsg
,
tsStatusInterval
*
1000
,
NULL
,
tsDnodeTmr
,
&
tsStatusTimer
);
return
;
}
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
4494203f
...
...
@@ -185,6 +185,7 @@ typedef struct SArguments_S {
bool
insert_only
;
bool
answer_yes
;
bool
debug_print
;
bool
verbose_print
;
char
*
output_file
;
int
mode
;
char
*
datatype
[
MAX_NUM_DATATYPE
+
1
];
...
...
@@ -489,6 +490,7 @@ SArguments g_args = {
false
,
// use_metric
false
,
// insert_only
false
,
// debug_print
false
,
// verbose_print
false
,
// answer_yes;
"./output.txt"
,
// output_file
0
,
// mode : sync or async
...
...
@@ -526,7 +528,11 @@ static SQueryMetaInfo g_queryInfo;
static
FILE
*
g_fpOfInsertResult
=
NULL
;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
///////////////////////////////////////////////////
void
printHelp
()
{
...
...
@@ -691,6 +697,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments
->
answer_yes
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
arguments
->
debug_print
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-gg"
)
==
0
)
{
arguments
->
verbose_print
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-O"
)
==
0
)
{
...
...
@@ -748,7 +756,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf
(
"
\n
"
);
}
printf
(
"# Insertion interval: %d
\n
"
,
arguments
->
insert_interval
);
printf
(
"# Number of
Columns per record:
%d
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Number of
records per req:
%d
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Number of Threads: %d
\n
"
,
arguments
->
num_of_threads
);
printf
(
"# Number of Tables: %d
\n
"
,
arguments
->
num_of_tables
);
printf
(
"# Number of Data per Table: %d
\n
"
,
arguments
->
num_of_DPT
);
...
...
@@ -805,7 +813,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
}
if
(
code
!=
0
)
{
debugPrint
(
"
DEBUG
%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
debugPrint
(
"%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
fprintf
(
stderr
,
"Failed to run %s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
taos_free_result
(
res
);
//taos_close(taos);
...
...
@@ -1986,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
exit
(
-
1
);
}
snprintf
(
superTbls
->
colsOfCreateChildTable
,
len
+
20
,
"(ts timestamp%s)"
,
cols
);
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
superTbls
->
colsOfCreateChildTable
);
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
superTbls
->
colsOfCreateChildTable
);
if
(
use_metric
)
{
char
tags
[
STRING_LEN
]
=
"
\0
"
;
...
...
@@ -2039,13 +2047,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
snprintf
(
command
,
BUFFER_SIZE
,
"create table if not exists %s.%s (ts timestamp%s) tags %s"
,
dbName
,
superTbls
->
sTblName
,
cols
,
tags
);
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
fprintf
(
stderr
,
"create supertable %s failed!
\n\n
"
,
superTbls
->
sTblName
);
return
-
1
;
}
debugPrint
(
"
DEBUG -
create supertable %s success!
\n\n
"
,
superTbls
->
sTblName
);
debugPrint
(
"create supertable %s success!
\n\n
"
,
superTbls
->
sTblName
);
}
return
0
;
}
...
...
@@ -2064,7 +2072,7 @@ static int createDatabases() {
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
if
(
g_Dbs
.
db
[
i
].
drop
)
{
sprintf
(
command
,
"drop database if exists %s;"
,
g_Dbs
.
db
[
i
].
dbName
);
debugPrint
(
"DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
taos_close
(
taos
);
return
-
1
;
...
...
@@ -2132,7 +2140,7 @@ static int createDatabases() {
"precision
\'
%s
\'
;"
,
g_Dbs
.
db
[
i
].
dbCfg
.
precision
);
}
debugPrint
(
"
DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
debugPrint
(
"%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
taos_close
(
taos
);
printf
(
"
\n
create database %s failed!
\n\n
"
,
g_Dbs
.
db
[
i
].
dbName
);
...
...
@@ -2140,11 +2148,11 @@ static int createDatabases() {
}
printf
(
"
\n
create database %s success!
\n\n
"
,
g_Dbs
.
db
[
i
].
dbName
);
debugPrint
(
"
DEBUG
%s() %d supertbl count:%d
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTblCount
);
debugPrint
(
"%s() %d supertbl count:%d
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTblCount
);
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
// describe super table, if exists
sprintf
(
command
,
"describe %s.%s;"
,
g_Dbs
.
db
[
i
].
dbName
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
);
debugPrint
(
"DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
superTblExists
=
TBL_NO_EXISTS
;
ret
=
createSuperTable
(
taos
,
g_Dbs
.
db
[
i
].
dbName
,
&
g_Dbs
.
db
[
i
].
superTbls
[
j
],
g_Dbs
.
use_metric
);
...
...
@@ -2232,7 +2240,7 @@ static void* createTable(void *sarg)
}
len
=
0
;
debugPrint
(
"DEBUG
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
buffer
,
NO_INSERT_TYPE
)){
free
(
buffer
);
return
NULL
;
...
...
@@ -2247,7 +2255,7 @@ static void* createTable(void *sarg)
}
if
(
0
!=
len
)
{
debugPrint
(
"DEBUG
%s() %d buffer: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() %d buffer: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
(
void
)
queryDbExec
(
winfo
->
taos
,
buffer
,
NO_INSERT_TYPE
);
}
...
...
@@ -2285,7 +2293,7 @@ int startMultiThreadCreateChildTable(
t_info
->
threadID
=
i
;
tstrncpy
(
t_info
->
db_name
,
db_name
,
MAX_DB_NAME_SIZE
);
t_info
->
superTblInfo
=
superTblInfo
;
debugPrint
(
"DEBUG
%s() %d db_name: %s
\n
"
,
__func__
,
__LINE__
,
db_name
);
verbosePrint
(
"
%s() %d db_name: %s
\n
"
,
__func__
,
__LINE__
,
db_name
);
t_info
->
taos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
...
...
@@ -2336,7 +2344,7 @@ static void createChildTables() {
continue
;
}
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
);
startMultiThreadCreateChildTable
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
,
...
...
@@ -2362,7 +2370,7 @@ static void createChildTables() {
len
=
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
")"
);
debugPrint
(
"DEBUG -
%s() LN%d: dbName: %s num of tb: %d schema: %s
\n
"
,
__func__
,
__LINE__
,
verbosePrint
(
"
%s() LN%d: dbName: %s num of tb: %d schema: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
dbName
,
g_args
.
num_of_tables
,
tblColsBuf
);
startMultiThreadCreateChildTable
(
tblColsBuf
,
...
...
@@ -3223,7 +3231,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf
(
"failed to read json, disorderRange not found"
);
goto
PARSE_OVER
;
}
cJSON
*
insertRows
=
cJSON_GetObjectItem
(
stbInfo
,
"insert_rows"
);
if
(
insertRows
&&
insertRows
->
type
==
cJSON_Number
)
{
...
...
@@ -3587,7 +3594,7 @@ PARSE_OVER:
}
static
bool
getInfoFromJsonFile
(
char
*
file
)
{
debugPrint
(
"
DEBUG -
%s %d %s
\n
"
,
__func__
,
__LINE__
,
file
);
debugPrint
(
"%s %d %s
\n
"
,
__func__
,
__LINE__
,
file
);
FILE
*
fp
=
fopen
(
file
,
"r"
);
if
(
!
fp
)
{
...
...
@@ -3746,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
return
(
-
1
);
}
}
dataLen
-=
2
;
dataLen
+=
snprintf
(
dataBuf
+
dataLen
,
maxLen
-
dataLen
,
")"
);
...
...
@@ -3776,7 +3784,6 @@ static void syncWriteForNumberOfTblInOneSql(
}
uint64_t
time_counter
=
winfo
->
start_time
;
int64_t
tmp_time
;
int
sampleUsePos
;
int64_t
st
=
0
;
...
...
@@ -3784,6 +3791,7 @@ static void syncWriteForNumberOfTblInOneSql(
for
(
int
i
=
0
;
i
<
superTblInfo
->
insertRows
;)
{
int32_t
tbl_id
=
0
;
for
(
int
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
)
{
int64_t
tmp_time
=
0
;
int
inserted
=
i
;
for
(
int
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
...
...
@@ -3813,7 +3821,7 @@ static void syncWriteForNumberOfTblInOneSql(
if
(
0
==
len
)
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
"insert into %s.%s%d using %s.%s tags %s values "
,
"insert into %s.%s%d using %s.%s tags %s values "
,
winfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
tbl_id
,
...
...
@@ -3823,7 +3831,7 @@ static void syncWriteForNumberOfTblInOneSql(
}
else
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
" %s.%s%d using %s.%s tags %s values "
,
" %s.%s%d using %s.%s tags %s values "
,
winfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
tbl_id
,
...
...
@@ -3836,13 +3844,13 @@ static void syncWriteForNumberOfTblInOneSql(
if
(
0
==
len
)
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
"insert into %s.%s values "
,
"insert into %s.%s values "
,
winfo
->
db_name
,
superTblInfo
->
childTblName
+
tbl_id
*
TSDB_TABLE_NAME_LEN
);
}
else
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
" %s.%s values "
,
" %s.%s values "
,
winfo
->
db_name
,
superTblInfo
->
childTblName
+
tbl_id
*
TSDB_TABLE_NAME_LEN
);
}
...
...
@@ -3850,14 +3858,14 @@ static void syncWriteForNumberOfTblInOneSql(
if
(
0
==
len
)
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
"insert into %s.%s%d values "
,
"insert into %s.%s%d values "
,
winfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
tbl_id
);
}
else
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
" %s.%s%d values "
,
" %s.%s%d values "
,
winfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
tbl_id
);
...
...
@@ -3892,7 +3900,7 @@ static void syncWriteForNumberOfTblInOneSql(
}
else
{
retLen
=
generateRowData
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
superTblInfo
);
}
if
(
retLen
<
0
)
{
...
...
@@ -3920,7 +3928,7 @@ static void syncWriteForNumberOfTblInOneSql(
send_to_server:
if
(
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
...
...
@@ -3937,7 +3945,7 @@ send_to_server:
int64_t
endTs
;
startTs
=
taosGetTimestampUs
();
debugPrint
(
"
DEBUG
%s() LN%d buff: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
debugPrint
(
"%s() LN%d buff: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
INSERT_TYPE
);
...
...
@@ -3950,16 +3958,16 @@ send_to_server:
if
(
delay
<
winfo
->
minDelay
)
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo
->
avgDelay
=
(
double
)
winfo
->
totalDelay
/
winfo
->
cntDelay
;
winfo
->
totalAffectedRows
+=
affectedRows
;
}
totalAffectedRows
+=
affectedRows
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRId64
", affected rows: %"
PRId64
"
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
//int64_t t2 = taosGetTimestampMs();
...
...
@@ -4090,15 +4098,18 @@ static void* syncWrite(void *sarg) {
uint64_t
st
=
0
;
uint64_t
et
=
0
;
for
(
int
i
=
0
;
i
<
g_args
.
num_of_DPT
;)
{
winfo
->
totalRowsInserted
=
0
;
winfo
->
totalAffectedRows
=
0
;
for
(
int
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
int
inserted
=
i
;
for
(
int
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
for
(
int
i
=
0
;
i
<
g_args
.
num_of_DPT
;)
{
int
tblInserted
=
i
;
int64_t
tmp_time
=
time_counter
;
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
"insert into %s.%s%d values
"
,
winfo
->
db_name
,
g_args
.
tb_prefix
,
tID
);
int
k
;
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
...
...
@@ -4122,31 +4133,34 @@ static void* syncWrite(void *sarg) {
}
pstr
+=
sprintf
(
pstr
,
" %s"
,
data
);
i
nserted
++
;
tblI
nserted
++
;
k
++
;
i
++
;
if
(
i
nserted
>=
g_args
.
num_of_DPT
)
if
(
tblI
nserted
>=
g_args
.
num_of_DPT
)
break
;
}
winfo
->
totalRowsInserted
+=
k
;
/* puts(buffer); */
int64_t
startTs
;
int64_t
endTs
;
startTs
=
taosGetTimestampUs
();
//queryDB(winfo->taos, buffer);
if
(
i
>
0
&&
g_args
.
insert_interval
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
}
if
(
g_args
.
insert_interval
)
{
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
debugPrint
(
"DEBUG -
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
}
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
1
);
verbosePrint
(
"%s() LN%d: affectedRows:%d
\n
"
,
__func__
,
__LINE__
,
affectedRows
);
if
(
0
<=
affectedRows
){
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
...
...
@@ -4156,27 +4170,31 @@ static void* syncWrite(void *sarg) {
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo
->
totalAffectedRows
+=
affectedRows
;
winfo
->
avgDelay
=
(
double
)
winfo
->
totalDelay
/
winfo
->
cntDelay
;
}
if
(
g_args
.
insert_interval
)
{
verbosePrint
(
"%s() LN%d: totalaffectedRows:%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
winfo
->
totalAffectedRows
);
if
(
g_args
.
insert_interval
)
{
et
=
taosGetTimestampMs
();
}
}
if
(
tID
==
winfo
->
end_table_id
)
{
i
=
inserted
;
time_counter
=
tmp_time
;
if
(
tblInserted
>=
g_args
.
num_of_DPT
)
{
break
;
}
}
}
// num_of_DPT
}
// tId
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
winfo
->
threadID
,
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
}
return
NULL
;
}
static
void
*
syncWriteWithStb
(
void
*
sarg
)
{
uint64_t
totalRowsInserted
=
0
;
uint64_t
totalAffectedRows
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
threadInfo
*
winfo
=
(
threadInfo
*
)
sarg
;
...
...
@@ -4232,27 +4250,44 @@ static void* syncWriteWithStb(void *sarg) {
return
NULL
;
}
int64_t
time_counter
=
winfo
->
start_time
;
uint64_t
st
=
0
;
uint64_t
et
=
0
;
debugPrint
(
"DEBUG - %s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
insertRows
);
winfo
->
totalRowsInserted
=
0
;
winfo
->
totalAffectedRows
=
0
;
for
(
int
i
=
0
;
i
<
superTblInfo
->
insertRows
;)
{
int
sampleUsePos
;
for
(
uint32_t
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
uint64_t
inserted
=
i
;
uint64_t
tmp_time
=
time_counter
;
debugPrint
(
"%s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
insertRows
);
int
sampleUsePos
=
samplePos
;
int
k
=
0
;
debugPrint
(
"DEBUG - %s() LN%d num_of_RPR=%d
\n
"
,
__func__
,
__LINE__
,
g_args
.
num_of_RPR
);
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
int
len
=
0
;
memset
(
buffer
,
0
,
superTblInfo
->
maxSqlLen
);
char
*
pstr
=
buffer
;
for
(
uint32_t
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
int64_t
start_time
=
winfo
->
start_time
;
if
(
AUTO_CREATE_SUBTBL
==
superTblInfo
->
autoCreateTable
)
{
for
(
int
i
=
0
;
i
<
superTblInfo
->
insertRows
;)
{
int64_t
tblInserted
=
i
;
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
sampleUsePos
=
samplePos
;
verbosePrint
(
"%s() LN%d num_of_RPR=%d
\n
"
,
__func__
,
__LINE__
,
g_args
.
num_of_RPR
);
memset
(
buffer
,
0
,
superTblInfo
->
maxSqlLen
);
int
len
=
0
;
char
*
pstr
=
buffer
;
if
(
AUTO_CREATE_SUBTBL
==
superTblInfo
->
autoCreateTable
)
{
char
*
tagsValBuf
=
NULL
;
if
(
0
==
superTblInfo
->
tagSource
)
{
tagsValBuf
=
generateTagVaulesForStb
(
superTblInfo
);
...
...
@@ -4275,27 +4310,29 @@ static void* syncWriteWithStb(void *sarg) {
superTblInfo
->
sTblName
,
tagsValBuf
);
tmfree
(
tagsValBuf
);
}
else
if
(
TBL_ALREADY_EXISTS
==
superTblInfo
->
childTblExists
)
{
}
else
if
(
TBL_ALREADY_EXISTS
==
superTblInfo
->
childTblExists
)
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
"insert into %s.%s values"
,
winfo
->
db_name
,
superTblInfo
->
childTblName
+
tID
*
TSDB_TABLE_NAME_LEN
);
}
else
{
}
else
{
len
+=
snprintf
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
"insert into %s.%s%d values"
,
winfo
->
db_name
,
superTblInfo
->
childTblPrefix
,
tID
);
}
}
int
k
;
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
int
retLen
=
0
;
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"sample"
,
strlen
(
"sample"
)))
{
retLen
=
getRowDataFromSample
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
start_time
+
superTblInfo
->
timeStampStep
*
i
,
superTblInfo
,
&
sampleUsePos
,
fp
,
...
...
@@ -4307,54 +4344,44 @@ static void* syncWriteWithStb(void *sarg) {
int
rand_num
=
rand_tinyint
()
%
100
;
if
(
0
!=
superTblInfo
->
disorderRatio
&&
rand_num
<
superTblInfo
->
disorderRatio
)
{
int64_t
d
=
tmp
_time
-
rand
()
%
superTblInfo
->
disorderRange
;
int64_t
d
=
start
_time
-
rand
()
%
superTblInfo
->
disorderRange
;
retLen
=
generateRowData
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
d
,
superTblInfo
->
maxSqlLen
-
len
,
d
,
superTblInfo
);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
}
else
{
retLen
=
generateRowData
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
start_time
+
superTblInfo
->
timeStampStep
*
i
,
superTblInfo
);
}
if
(
retLen
<
0
)
{
goto
free_and_statistics_2
;
}
}
/* len += retLen;
*/
inserted
++
;
len
+=
retLen
;
verbosePrint
(
"%s() LN%d retLen=%d len=%d k=%d buffer=%s
\n
"
,
__func__
,
__LINE__
,
retLen
,
len
,
k
,
buffer
);
tblInserted
++
;
k
++
;
totalRowsInserted
++
;
if
(
inserted
>
superTblInfo
->
insertRows
)
break
;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break;
*/
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
i
++
;
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
if
(
tblInserted
>=
superTblInfo
->
insertRows
)
break
;
}
winfo
->
totalRowsInserted
+=
k
;
if
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"taosc"
,
strlen
(
"taosc"
)))
{
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
if
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"taosc"
,
strlen
(
"taosc"
)))
{
int64_t
startTs
;
int64_t
endTs
;
startTs
=
taosGetTimestampUs
();
debugPrint
(
"DEBUG
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
INSERT_TYPE
);
if
(
0
>
affectedRows
){
...
...
@@ -4366,76 +4393,52 @@ static void* syncWriteWithStb(void *sarg) {
if
(
delay
<
winfo
->
minDelay
)
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}
totalAffectedRows
+=
affectedRows
;
winfo
->
totalAffectedRows
+=
affectedRows
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRId64
", affected rows: %"
PRId64
"
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
}
else
{
//int64_t t1 = taosGetTimestampMs();
}
else
{
int
retCode
=
postProceSql
(
g_Dbs
.
host
,
g_Dbs
.
port
,
buffer
);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if
(
0
!=
retCode
)
{
printf
(
"========restful return fail, threadID[%d]
\n
"
,
winfo
->
threadID
);
goto
free_and_statistics_2
;
}
}
if
(
g_args
.
insert_interval
)
{
et
=
taosGetTimestampMs
();
}
/*
if (loop_cnt) {
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
break;
}
*/
}
if
(
g_args
.
insert_interval
)
{
et
=
taosGetTimestampMs
();
}
if
(
tblInserted
>=
superTblInfo
->
insertRows
)
break
;
}
// num_of_DPT
if
(
tID
==
winfo
->
end_table_id
)
{
if
(
tID
==
winfo
->
end_table_id
)
{
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"sample"
,
strlen
(
"sample"
)))
{
samplePos
=
sampleUsePos
;
}
i
=
inserted
;
time_counter
=
tmp_time
;
}
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
}
// tID
free_and_statistics_2:
tmfree
(
buffer
);
tmfree
(
sampleDataBuf
);
tmfclose
(
fp
);
winfo
->
totalRowsInserted
=
totalRowsInserted
;
winfo
->
totalAffectedRows
=
totalAffectedRows
;
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
return
NULL
;
}
...
...
@@ -4453,7 +4456,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
char
*
data
=
calloc
(
1
,
MAX_DATA_SIZE
);
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
winfo
->
db_name
,
winfo
->
tb_prefix
,
winfo
->
start_table_id
);
if
(
winfo
->
counter
>=
winfo
->
superTblInfo
->
insertRows
)
{
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if
(
winfo
->
counter
>=
g_args
.
num_of_RPR
)
{
winfo
->
start_table_id
++
;
winfo
->
counter
=
0
;
}
...
...
@@ -4644,12 +4648,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if
(
superTblInfo
)
{
superTblInfo
->
totalAffectedRows
+=
t_info
->
totalAffectedRows
;
superTblInfo
->
totalRowsInserted
+=
t_info
->
totalRowsInserted
;
totalDelay
+=
t_info
->
totalDelay
;
cntDelay
+=
t_info
->
cntDelay
;
if
(
t_info
->
maxDelay
>
maxDelay
)
maxDelay
=
t_info
->
maxDelay
;
if
(
t_info
->
minDelay
<
minDelay
)
minDelay
=
t_info
->
minDelay
;
}
totalDelay
+=
t_info
->
totalDelay
;
cntDelay
+=
t_info
->
cntDelay
;
if
(
t_info
->
maxDelay
>
maxDelay
)
maxDelay
=
t_info
->
maxDelay
;
if
(
t_info
->
minDelay
<
minDelay
)
minDelay
=
t_info
->
minDelay
;
}
cntDelay
-=
1
;
...
...
@@ -4698,11 +4702,12 @@ void *readTable(void *sarg) {
}
int
num_of_DPT
;
if
(
rinfo
->
superTblInfo
)
{
/*
if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT
=
g_args
.
num_of_DPT
;
}
//
}
int
num_of_tables
=
rinfo
->
end_table_id
-
rinfo
->
start_table_id
+
1
;
int
totalData
=
num_of_DPT
*
num_of_tables
;
...
...
@@ -4836,7 +4841,7 @@ int insertTestProcess() {
if
(
ret
==
-
1
)
exit
(
EXIT_FAILURE
);
debugPrint
(
"
DEBUG -
%d result file: %s
\n
"
,
__LINE__
,
g_Dbs
.
resultFile
);
debugPrint
(
"%d result file: %s
\n
"
,
__LINE__
,
g_Dbs
.
resultFile
);
g_fpOfInsertResult
=
fopen
(
g_Dbs
.
resultFile
,
"a"
);
if
(
NULL
==
g_fpOfInsertResult
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
g_Dbs
.
resultFile
);
...
...
@@ -4992,7 +4997,7 @@ void *subQueryProcess(void *sarg) {
int64_t
st
=
0
;
int64_t
et
=
(
int64_t
)
g_queryInfo
.
subQueryInfo
.
rate
*
1000
;
while
(
1
)
{
if
(
g_queryInfo
.
subQueryInfo
.
rate
&&
(
et
-
st
)
<
g_queryInfo
.
subQueryInfo
.
rate
*
1000
)
{
if
(
g_queryInfo
.
subQueryInfo
.
rate
&&
(
et
-
st
)
<
(
int64_t
)
g_queryInfo
.
subQueryInfo
.
rate
*
1000
)
{
taosMsleep
(
g_queryInfo
.
subQueryInfo
.
rate
*
1000
-
(
et
-
st
));
// ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
}
...
...
@@ -5072,7 +5077,7 @@ static int queryTestProcess() {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
verbosePrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
(
void
)
queryDbExec
(
t_info
->
taos
,
sqlStr
,
NO_INSERT_TYPE
);
}
else
{
t_info
->
taos
=
NULL
;
...
...
@@ -5183,7 +5188,7 @@ void *subSubscribeProcess(void *sarg) {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
debugPrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
sqlStr
,
NO_INSERT_TYPE
)){
return
NULL
;
}
...
...
@@ -5249,7 +5254,7 @@ void *superSubscribeProcess(void *sarg) {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
debugPrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
sqlStr
,
NO_INSERT_TYPE
))
{
return
NULL
;
}
...
...
@@ -5509,7 +5514,7 @@ void setParaFromArg(){
"2017-07-14 10:40:00.000"
,
MAX_TB_NAME_SIZE
);
g_Dbs
.
db
[
0
].
superTbls
[
0
].
timeStampStep
=
10
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
insertRows
=
g_args
.
num_of_DPT
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
insertRows
=
g_args
.
num_of_DPT
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
maxSqlLen
=
TSDB_PAYLOAD_SIZE
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
columnCount
=
0
;
...
...
@@ -5614,8 +5619,15 @@ void querySqlFile(TAOS* taos, char* sqlFile)
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
debugPrint
(
"DEBUG
%s() LN%d cmd: %s
\n
"
,
__func__
,
__LINE__
,
cmd
);
verbosePrint
(
"
%s() LN%d cmd: %s
\n
"
,
__func__
,
__LINE__
,
cmd
);
queryDbExec
(
taos
,
cmd
,
NO_INSERT_TYPE
);
if
(
0
!=
queryDbExec
(
taos
,
cmd
,
NO_INSERT_TYPE
))
{
printf
(
"queryDbExec %s failed!
\n
"
,
cmd
);
tmfree
(
cmd
);
tmfree
(
line
);
tmfclose
(
fp
);
return
;
}
memset
(
cmd
,
0
,
MAX_SQL_SIZE
);
cmd_len
=
0
;
}
...
...
@@ -5702,7 +5714,7 @@ static void testCmdLine() {
int
main
(
int
argc
,
char
*
argv
[])
{
parse_args
(
argc
,
argv
,
&
g_args
);
debugPrint
(
"
DEBUG -
meta file: %s
\n
"
,
g_args
.
metaFile
);
debugPrint
(
"meta file: %s
\n
"
,
g_args
.
metaFile
);
if
(
g_args
.
metaFile
)
{
initOfInsertMeta
();
...
...
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
4494203f
...
...
@@ -35,16 +35,19 @@ import os
import
signal
import
traceback
import
resource
from
guppy
import
hpy
#
from guppy import hpy
import
gc
from
crash_gen.service_manager
import
ServiceManager
,
TdeInstance
from
crash_gen.misc
import
Logging
,
Status
,
CrashGenError
,
Dice
,
Helper
,
Progress
from
crash_gen.db
import
DbConn
,
MyTDSql
,
DbConnNative
,
DbManager
import
crash_gen.settings
import
taos
import
requests
crash_gen
.
settings
.
init
()
# Require Python 3
if
sys
.
version_info
[
0
]
<
3
:
raise
Exception
(
"Must be using Python 3"
)
...
...
@@ -259,6 +262,7 @@ class ThreadCoordinator:
self
.
_execStats
=
ExecutionStats
()
self
.
_runStatus
=
Status
.
STATUS_RUNNING
self
.
_initDbs
()
self
.
_stepStartTime
=
None
# Track how long it takes to execute each step
def
getTaskExecutor
(
self
):
return
self
.
_te
...
...
@@ -394,6 +398,10 @@ class ThreadCoordinator:
try
:
self
.
_syncAtBarrier
()
# For now just cross the barrier
Progress
.
emit
(
Progress
.
END_THREAD_STEP
)
if
self
.
_stepStartTime
:
stepExecTime
=
time
.
time
()
-
self
.
_stepStartTime
Progress
.
emitStr
(
'{:.3f}s/{}'
.
format
(
stepExecTime
,
DbConnNative
.
totalRequests
))
DbConnNative
.
resetTotalRequests
()
# reset to zero
except
threading
.
BrokenBarrierError
as
err
:
self
.
_execStats
.
registerFailure
(
"Aborted due to worker thread timeout"
)
Logging
.
error
(
"
\n
"
)
...
...
@@ -433,6 +441,7 @@ class ThreadCoordinator:
# Then we move on to the next step
Progress
.
emit
(
Progress
.
BEGIN_THREAD_STEP
)
self
.
_stepStartTime
=
time
.
time
()
self
.
_releaseAllWorkerThreads
(
transitionFailed
)
if
hasAbortedTask
or
transitionFailed
:
# abnormal ending, workers waiting at "gate"
...
...
@@ -691,7 +700,7 @@ class AnyState:
def
canDropDb
(
self
):
# If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more
if
gConfig
.
max_dbs
>
0
:
if
gConfig
.
max_dbs
>
0
or
gConfig
.
use_shadow_db
:
return
False
return
self
.
_info
[
self
.
CAN_DROP_DB
]
...
...
@@ -699,6 +708,8 @@ class AnyState:
return
self
.
_info
[
self
.
CAN_CREATE_FIXED_SUPER_TABLE
]
def
canDropFixedSuperTable
(
self
):
if
gConfig
.
use_shadow_db
:
# duplicate writes to shaddow DB, in which case let's disable dropping s-table
return
False
return
self
.
_info
[
self
.
CAN_DROP_FIXED_SUPER_TABLE
]
def
canAddData
(
self
):
...
...
@@ -1037,7 +1048,7 @@ class Database:
_clsLock
=
threading
.
Lock
()
# class wide lock
_lastInt
=
101
# next one is initial integer
_lastTick
=
0
_lastLaggingTick
=
0
# lagging tick, for
unsequenced insers
ions
_lastLaggingTick
=
0
# lagging tick, for
out-of-sequence (oos) data insert
ions
def
__init__
(
self
,
dbNum
:
int
,
dbc
:
DbConn
):
# TODO: remove dbc
self
.
_dbNum
=
dbNum
# we assign a number to databases, for our testing purpose
...
...
@@ -1093,21 +1104,24 @@ class Database:
t3
=
datetime
.
datetime
(
2012
,
1
,
1
)
# default "keep" is 10 years
t4
=
datetime
.
datetime
.
fromtimestamp
(
t3
.
timestamp
()
+
elSec2
)
# see explanation above
Logging
.
debug
(
"Setting up TICKS to start from: {}"
.
format
(
t4
))
Logging
.
info
(
"Setting up TICKS to start from: {}"
.
format
(
t4
))
return
t4
@
classmethod
def
getNextTick
(
cls
):
def
getNextTick
(
cls
):
'''
Fetch a timestamp tick, with some random factor, may not be unique.
'''
with
cls
.
_clsLock
:
# prevent duplicate tick
if
cls
.
_lastLaggingTick
==
0
or
cls
.
_lastTick
==
0
:
# not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps
tick
=
cls
.
setupLastTick
()
cls
.
_lastTick
=
tick
cls
.
_lastLaggingTick
=
tick
+
datetime
.
timedelta
(
0
,
-
10000
)
cls
.
_lastLaggingTick
=
tick
+
datetime
.
timedelta
(
0
,
-
60
*
2
)
# lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future
if
Dice
.
throw
(
20
)
==
0
:
#
1 in 20 chance, return lagging tick
cls
.
_lastLaggingTick
+=
datetime
.
timedelta
(
0
,
1
)
#
Go back in time 100 seconds
if
gConfig
.
mix_oos_data
and
Dice
.
throw
(
20
)
==
0
:
# if asked to do so, and
1 in 20 chance, return lagging tick
cls
.
_lastLaggingTick
+=
datetime
.
timedelta
(
0
,
1
)
#
pick the next sequence from the lagging tick sequence
return
cls
.
_lastLaggingTick
else
:
# regular
# add one second to it
...
...
@@ -1334,7 +1348,8 @@ class Task():
elif
self
.
_isErrAcceptable
(
errno2
,
err
.
__str__
()):
self
.
logDebug
(
"[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}"
.
format
(
errno2
,
err
,
wt
.
getDbConn
().
getLastSql
()))
print
(
"_"
,
end
=
""
,
flush
=
True
)
# print("_", end="", flush=True)
Progress
.
emit
(
Progress
.
ACCEPTABLE_ERROR
)
self
.
_err
=
err
else
:
# not an acceptable error
errMsg
=
"[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}"
.
format
(
...
...
@@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask):
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
numReplica
=
gConfig
.
max_replicas
# fixed, always
repStr
=
"replica {}"
.
format
(
numReplica
)
self
.
execWtSql
(
wt
,
"create database {} {}"
.
format
(
self
.
_db
.
getName
(),
repStr
)
)
updatePostfix
=
"update 1"
if
gConfig
.
verify_data
else
""
# allow update only when "verify data" is active
dbName
=
self
.
_db
.
getName
()
self
.
execWtSql
(
wt
,
"create database {} {} {} "
.
format
(
dbName
,
repStr
,
updatePostfix
)
)
if
dbName
==
"db_0"
and
gConfig
.
use_shadow_db
:
self
.
execWtSql
(
wt
,
"create database {} {} {} "
.
format
(
"db_s"
,
repStr
,
updatePostfix
)
)
class
TaskDropDb
(
StateTransitionTask
):
@
classmethod
...
...
@@ -1774,13 +1792,13 @@ class TdSuperTable:
])
# TODO: add more from 'top'
if
aggExpr
not
in
[
'stddev(speed)'
]:
#TODO: STDDEV not valid for super tables?!
sql
=
"select {} from {}.{}"
.
format
(
aggExpr
,
self
.
_dbName
,
self
.
getName
())
if
Dice
.
throw
(
3
)
==
0
:
# 1 in X chance
sql
=
sql
+
' GROUP BY color'
Progress
.
emit
(
Progress
.
QUERY_GROUP_BY
)
# Logging.info("Executing GROUP-BY query: " + sql)
ret
.
append
(
SqlQuery
(
sql
))
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql
=
"select {} from {}.{}"
.
format
(
aggExpr
,
self
.
_dbName
,
self
.
getName
())
if
Dice
.
throw
(
3
)
==
0
:
# 1 in X chance
sql
=
sql
+
' GROUP BY color'
Progress
.
emit
(
Progress
.
QUERY_GROUP_BY
)
# Logging.info("Executing GROUP-BY query: " + sql)
ret
.
append
(
SqlQuery
(
sql
))
return
ret
...
...
@@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask):
numRecords
=
self
.
LARGE_NUMBER_OF_RECORDS
if
gConfig
.
larger_data
else
self
.
SMALL_NUMBER_OF_RECORDS
fullTableName
=
db
.
getName
()
+
'.'
+
regTableName
sql
=
"
insert into {} values
"
.
format
(
fullTableName
)
sql
=
"
INSERT INTO {} VALUES
"
.
format
(
fullTableName
)
for
j
in
range
(
numRecords
):
# number of records per table
nextInt
=
db
.
getNextInt
()
nextTick
=
db
.
getNextTick
()
...
...
@@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try
:
sql
=
"
insert into {} values
('{}', {}, '{}');"
.
format
(
# removed: tags ('{}', {})
sql
=
"
INSERT INTO {} VALUES
('{}', {}, '{}');"
.
format
(
# removed: tags ('{}', {})
fullTableName
,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick
,
nextInt
,
nextColor
)
dbc
.
execute
(
sql
)
# Quick hack, attach an update statement here. TODO: create an "update" task
if
(
not
gConfig
.
use_shadow_db
)
and
Dice
.
throw
(
5
)
==
0
:
# 1 in N chance, plus not using shaddow DB
nextInt
=
db
.
getNextInt
()
nextColor
=
db
.
getNextColor
()
sql
=
"INSERt INTO {} VALUES ('{}', {}, '{}');"
.
format
(
# "INSERt" means "update" here
fullTableName
,
nextTick
,
nextInt
,
nextColor
)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc
.
execute
(
sql
)
except
:
# Any exception at all
if
gConfig
.
verify_data
:
self
.
unlockTable
(
fullTableName
)
...
...
@@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask):
random
.
shuffle
(
tblSeq
)
# now we have random sequence
for
i
in
tblSeq
:
if
(
i
in
self
.
activeTable
):
# wow already active
print
(
"x"
,
end
=
""
,
flush
=
True
)
# concurrent insertion
# print("x", end="", flush=True) # concurrent insertion
Progress
.
emit
(
Progress
.
CONCURRENT_INSERTION
)
else
:
self
.
activeTable
.
add
(
i
)
# marking it active
...
...
@@ -2373,6 +2404,11 @@ class MainExec:
'--larger-data'
,
action
=
'store_true'
,
help
=
'Write larger amount of data during write operations (default: false)'
)
parser
.
add_argument
(
'-m'
,
'--mix-oos-data'
,
action
=
'store_false'
,
help
=
'Mix out-of-sequence data into the test data stream (default: true)'
)
parser
.
add_argument
(
'-n'
,
'--dynamic-db-table-names'
,
...
...
@@ -2414,6 +2450,11 @@ class MainExec:
'--verify-data'
,
action
=
'store_true'
,
help
=
'Verify data written in a number of places by reading back (default: false)'
)
parser
.
add_argument
(
'-w'
,
'--use-shadow-db'
,
action
=
'store_true'
,
help
=
'Use a shaddow database to verify data integrity (default: false)'
)
parser
.
add_argument
(
'-x'
,
'--continue-on-exception'
,
...
...
@@ -2422,6 +2463,11 @@ class MainExec:
global
gConfig
gConfig
=
parser
.
parse_args
()
crash_gen
.
settings
.
gConfig
=
gConfig
# TODO: fix this hack, consolidate this global var
# Sanity check for arguments
if
gConfig
.
use_shadow_db
and
gConfig
.
max_dbs
>
1
:
raise
CrashGenError
(
"Cannot combine use-shadow-db with max-dbs of more than 1"
)
Logging
.
clsInit
(
gConfig
)
...
...
tests/pytest/crash_gen/db.py
浏览文件 @
4494203f
...
...
@@ -18,6 +18,8 @@ import datetime
import
traceback
# from .service_manager import TdeInstance
import
crash_gen.settings
class
DbConn
:
TYPE_NATIVE
=
"native-c"
TYPE_REST
=
"rest-api"
...
...
@@ -244,7 +246,7 @@ class MyTDSql:
self
.
_conn
.
close
()
# TODO: very important, cursor close does NOT close DB connection!
self
.
_cursor
.
close
()
def
_execInternal
(
self
,
sql
):
def
_execInternal
(
self
,
sql
):
startTime
=
time
.
time
()
# Logging.debug("Executing SQL: " + sql)
ret
=
self
.
_cursor
.
execute
(
sql
)
...
...
@@ -257,6 +259,27 @@ class MyTDSql:
cls
.
longestQuery
=
sql
cls
.
longestQueryTime
=
queryTime
cls
.
lqStartTime
=
startTime
# Now write to the shadow database
if
crash_gen
.
settings
.
gConfig
.
use_shadow_db
:
if
sql
[:
11
]
==
"INSERT INTO"
:
if
sql
[:
16
]
==
"INSERT INTO db_0"
:
sql2
=
"INSERT INTO db_s"
+
sql
[
16
:]
self
.
_cursor
.
execute
(
sql2
)
else
:
raise
CrashGenError
(
"Did not find db_0 in INSERT statement: {}"
.
format
(
sql
))
else
:
# not an insert statement
pass
if
sql
[:
12
]
==
"CREATE TABLE"
:
if
sql
[:
17
]
==
"CREATE TABLE db_0"
:
sql2
=
sql
.
replace
(
'db_0'
,
'db_s'
)
self
.
_cursor
.
execute
(
sql2
)
else
:
raise
CrashGenError
(
"Did not find db_0 in CREATE TABLE statement: {}"
.
format
(
sql
))
else
:
# not an insert statement
pass
return
ret
def
query
(
self
,
sql
):
...
...
@@ -302,12 +325,18 @@ class DbConnNative(DbConn):
_lock
=
threading
.
Lock
()
# _connInfoDisplayed = False # TODO: find another way to display this
totalConnections
=
0
# Not private
totalRequests
=
0
def
__init__
(
self
,
dbTarget
):
super
().
__init__
(
dbTarget
)
self
.
_type
=
self
.
TYPE_NATIVE
self
.
_conn
=
None
# self._cursor = None
# self._cursor = None
@
classmethod
def
resetTotalRequests
(
cls
):
with
cls
.
_lock
:
# force single threading for opening DB connections. # TODO: whaaat??!!!
cls
.
totalRequests
=
0
def
openByType
(
self
):
# Open connection
# global gContainer
...
...
@@ -356,6 +385,8 @@ class DbConnNative(DbConn):
Logging
.
debug
(
"[SQL] Executing SQL: {}"
.
format
(
sql
))
self
.
_lastSql
=
sql
nRows
=
self
.
_tdSql
.
execute
(
sql
)
cls
=
self
.
__class__
cls
.
totalRequests
+=
1
Logging
.
debug
(
"[SQL] Execution Result, nRows = {}, SQL = {}"
.
format
(
nRows
,
sql
))
...
...
@@ -369,6 +400,8 @@ class DbConnNative(DbConn):
Logging
.
debug
(
"[SQL] Executing SQL: {}"
.
format
(
sql
))
self
.
_lastSql
=
sql
nRows
=
self
.
_tdSql
.
query
(
sql
)
cls
=
self
.
__class__
cls
.
totalRequests
+=
1
Logging
.
debug
(
"[SQL] Query Result, nRows = {}, SQL = {}"
.
format
(
nRows
,
sql
))
...
...
tests/pytest/crash_gen/misc.py
浏览文件 @
4494203f
...
...
@@ -176,11 +176,13 @@ class Progress:
SERVICE_START_NAP
=
7
CREATE_TABLE_ATTEMPT
=
8
QUERY_GROUP_BY
=
9
CONCURRENT_INSERTION
=
10
ACCEPTABLE_ERROR
=
11
tokens
=
{
STEP_BOUNDARY
:
'.'
,
BEGIN_THREAD_STEP
:
'['
,
END_THREAD_STEP
:
']
'
,
BEGIN_THREAD_STEP
:
'
['
,
END_THREAD_STEP
:
']'
,
SERVICE_HEART_BEAT
:
'.Y.'
,
SERVICE_RECONNECT_START
:
'<r.'
,
SERVICE_RECONNECT_SUCCESS
:
'.r>'
,
...
...
@@ -188,8 +190,14 @@ class Progress:
SERVICE_START_NAP
:
'_zz'
,
CREATE_TABLE_ATTEMPT
:
'c'
,
QUERY_GROUP_BY
:
'g'
,
CONCURRENT_INSERTION
:
'x'
,
ACCEPTABLE_ERROR
:
'_'
,
}
@
classmethod
def
emit
(
cls
,
token
):
print
(
cls
.
tokens
[
token
],
end
=
""
,
flush
=
True
)
@
classmethod
def
emitStr
(
cls
,
str
):
print
(
'({})'
.
format
(
str
),
end
=
""
,
flush
=
True
)
tests/pytest/crash_gen/settings.py
0 → 100644
浏览文件 @
4494203f
from
__future__
import
annotations
import
argparse
gConfig
:
argparse
.
Namespace
def
init
():
global
gConfig
gConfig
=
[]
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录