Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7dd2af78
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7dd2af78
编写于
11月 21, 2022
作者:
H
Hui Li
提交者:
GitHub
11月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18004 from taosdata/enh/crash_gen
enh: enh crash_gen for 3.0 new functions
上级
37c1971c
bb6ca0c0
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
773 addition
and
29 deletion
+773
-29
tests/pytest/crash_gen.sh
tests/pytest/crash_gen.sh
+1
-1
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+700
-21
tests/pytest/crash_gen/shared/db.py
tests/pytest/crash_gen/shared/db.py
+72
-7
未找到文件。
tests/pytest/crash_gen.sh
浏览文件 @
7dd2af78
...
@@ -45,7 +45,7 @@ fi
...
@@ -45,7 +45,7 @@ fi
# Now getting ready to execute Python
# Now getting ready to execute Python
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
PYTHON_EXEC
=
python3
.8
PYTHON_EXEC
=
python3
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
# export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd)
# export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd)
...
...
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
7dd2af78
...
@@ -37,6 +37,7 @@ import requests
...
@@ -37,6 +37,7 @@ import requests
# from guppy import hpy
# from guppy import hpy
import
gc
import
gc
import
taos
import
taos
from
taos.tmq
import
*
from
.shared.types
import
TdColumns
,
TdTags
from
.shared.types
import
TdColumns
,
TdTags
...
@@ -419,10 +420,12 @@ class ThreadCoordinator:
...
@@ -419,10 +420,12 @@ class ThreadCoordinator:
except
threading
.
BrokenBarrierError
as
err
:
except
threading
.
BrokenBarrierError
as
err
:
self
.
_execStats
.
registerFailure
(
"Aborted due to worker thread timeout"
)
self
.
_execStats
.
registerFailure
(
"Aborted due to worker thread timeout"
)
Logging
.
error
(
"
\n
"
)
Logging
.
error
(
"
\n
"
)
Logging
.
error
(
"Main loop aborted, caused by worker thread(s) time-out of {} seconds"
.
format
(
Logging
.
error
(
"Main loop aborted, caused by worker thread(s) time-out of {} seconds"
.
format
(
ThreadCoordinator
.
WORKER_THREAD_TIMEOUT
))
ThreadCoordinator
.
WORKER_THREAD_TIMEOUT
))
Logging
.
error
(
"TAOS related threads blocked at (stack frames top-to-bottom):"
)
Logging
.
error
(
"TAOS related threads blocked at (stack frames top-to-bottom):"
)
ts
=
ThreadStacks
()
ts
=
ThreadStacks
()
ts
.
record_current_time
(
time
.
time
())
# record thread exit time at current moment
ts
.
print
(
filterInternal
=
True
)
ts
.
print
(
filterInternal
=
True
)
workerTimeout
=
True
workerTimeout
=
True
...
@@ -546,7 +549,12 @@ class ThreadCoordinator:
...
@@ -546,7 +549,12 @@ class ThreadCoordinator:
# pick a task type for current state
# pick a task type for current state
db
=
self
.
pickDatabase
()
db
=
self
.
pickDatabase
()
taskType
=
db
.
getStateMachine
().
pickTaskType
()
# dynamic name of class
if
Dice
.
throw
(
2
)
==
1
:
taskType
=
db
.
getStateMachine
().
pickTaskType
()
# dynamic name of class
else
:
taskType
=
db
.
getStateMachine
().
balance_pickTaskType
()
# and an method can get balance task types
pass
return
taskType
(
self
.
_execStats
,
db
)
# create a task from it
return
taskType
(
self
.
_execStats
,
db
)
# create a task from it
def
resetExecutedTasks
(
self
):
def
resetExecutedTasks
(
self
):
...
@@ -674,9 +682,15 @@ class AnyState:
...
@@ -674,9 +682,15 @@ class AnyState:
# only "under normal circumstances", as we may override it with the -b option
# only "under normal circumstances", as we may override it with the -b option
CAN_DROP_DB
=
2
CAN_DROP_DB
=
2
CAN_CREATE_FIXED_SUPER_TABLE
=
3
CAN_CREATE_FIXED_SUPER_TABLE
=
3
CAN_CREATE_STREAM
=
3
# super table must exists
CAN_CREATE_TOPIC
=
3
# super table must exists
CAN_CREATE_CONSUMERS
=
3
CAN_DROP_FIXED_SUPER_TABLE
=
4
CAN_DROP_FIXED_SUPER_TABLE
=
4
CAN_DROP_TOPIC
=
4
CAN_DROP_STREAM
=
4
CAN_ADD_DATA
=
5
CAN_ADD_DATA
=
5
CAN_READ_DATA
=
6
CAN_READ_DATA
=
6
CAN_DELETE_DATA
=
6
def
__init__
(
self
):
def
__init__
(
self
):
self
.
_info
=
self
.
getInfo
()
self
.
_info
=
self
.
getInfo
()
...
@@ -727,12 +741,30 @@ class AnyState:
...
@@ -727,12 +741,30 @@ class AnyState:
return
False
return
False
return
self
.
_info
[
self
.
CAN_DROP_FIXED_SUPER_TABLE
]
return
self
.
_info
[
self
.
CAN_DROP_FIXED_SUPER_TABLE
]
def
canCreateTopic
(
self
):
return
self
.
_info
[
self
.
CAN_CREATE_TOPIC
]
def
canDropTopic
(
self
):
return
self
.
_info
[
self
.
CAN_DROP_TOPIC
]
def
canCreateConsumers
(
self
):
return
self
.
_info
[
self
.
CAN_CREATE_CONSUMERS
]
def
canCreateStreams
(
self
):
return
self
.
_info
[
self
.
CAN_CREATE_STREAM
]
def
canDropStream
(
self
):
return
self
.
_info
[
self
.
CAN_DROP_STREAM
]
def
canAddData
(
self
):
def
canAddData
(
self
):
return
self
.
_info
[
self
.
CAN_ADD_DATA
]
return
self
.
_info
[
self
.
CAN_ADD_DATA
]
def
canReadData
(
self
):
def
canReadData
(
self
):
return
self
.
_info
[
self
.
CAN_READ_DATA
]
return
self
.
_info
[
self
.
CAN_READ_DATA
]
def
canDeleteData
(
self
):
return
self
.
_info
[
self
.
CAN_DELETE_DATA
]
def
assertAtMostOneSuccess
(
self
,
tasks
,
cls
):
def
assertAtMostOneSuccess
(
self
,
tasks
,
cls
):
sCnt
=
0
sCnt
=
0
for
task
in
tasks
:
for
task
in
tasks
:
...
@@ -902,7 +934,7 @@ class StateHasData(AnyState):
...
@@ -902,7 +934,7 @@ class StateHasData(AnyState):
):
# only if we didn't create one
):
# only if we didn't create one
# we shouldn't have dropped it
# we shouldn't have dropped it
self
.
assertNoTask
(
tasks
,
TaskDropDb
)
self
.
assertNoTask
(
tasks
,
TaskDropDb
)
if
(
not
self
.
hasTask
(
tasks
,
TaskCreateSuperTable
)
if
not
(
self
.
hasTask
(
tasks
,
TaskCreateSuperTable
)
):
# if we didn't create the table
):
# if we didn't create the table
# we should not have a task that drops it
# we should not have a task that drops it
self
.
assertNoTask
(
tasks
,
TaskDropSuperTable
)
self
.
assertNoTask
(
tasks
,
TaskDropSuperTable
)
...
@@ -974,14 +1006,21 @@ class StateMechine:
...
@@ -974,14 +1006,21 @@ class StateMechine:
# did not do this when openning connection, and this is NOT the worker
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
# thread, which does this on their own
dbc
.
use
(
dbName
)
dbc
.
use
(
dbName
)
if
not
dbc
.
hasTables
():
# no tables
if
not
dbc
.
hasTables
():
# no tables
Logging
.
debug
(
"[STT] DB_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
Logging
.
debug
(
"[STT] DB_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateDbOnly
()
return
StateDbOnly
()
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
sTable
=
self
.
_db
.
getFixedSuperTable
()
sTable
=
self
.
_db
.
getFixedSuperTable
()
if
sTable
.
hasRegTables
(
dbc
):
# no regular tables
if
sTable
.
hasRegTables
(
dbc
):
# no regular tables
# print("debug=====*\n"*100)
Logging
.
debug
(
"[STT] SUPER_TABLE_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
Logging
.
debug
(
"[STT] SUPER_TABLE_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateSuperTableOnly
()
return
StateSuperTableOnly
()
else
:
# has actual tables
else
:
# has actual tables
Logging
.
debug
(
"[STT] HAS_DATA found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
Logging
.
debug
(
"[STT] HAS_DATA found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
...
@@ -1051,6 +1090,28 @@ class StateMechine:
...
@@ -1051,6 +1090,28 @@ class StateMechine:
# Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
# Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return
taskTypes
[
i
]
return
taskTypes
[
i
]
def
balance_pickTaskType
(
self
):
# all the task types we can choose from at curent state
BasicTypes
=
self
.
getTaskTypes
()
weightsTypes
=
BasicTypes
.
copy
()
# this matrixs can balance the Frequency of TaskTypes
balance_TaskType_matrixs
=
{
'TaskDropDb'
:
5
,
'TaskDropTopics'
:
20
,
'TaskDropStreams'
:
10
,
'TaskDropStreamTables'
:
10
,
'TaskReadData'
:
50
,
'TaskDropSuperTable'
:
5
,
'TaskAlterTags'
:
3
,
'TaskAddData'
:
10
,
'TaskDeleteData'
:
10
,
'TaskCreateDb'
:
10
,
'TaskCreateStream'
:
3
,
'TaskCreateTopic'
:
3
,
'TaskCreateConsumers'
:
10
,
'TaskCreateSuperTable'
:
10
}
# TaskType : balance_matrixs of task
for
task
,
weights
in
balance_TaskType_matrixs
.
items
():
for
basicType
in
BasicTypes
:
if
basicType
.
__name__
==
task
:
for
_
in
range
(
weights
):
weightsTypes
.
append
(
basicType
)
task
=
random
.
sample
(
weightsTypes
,
1
)
return
task
[
0
]
# ref:
# ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def
_weighted_choice_sub
(
self
,
weights
)
->
int
:
def
_weighted_choice_sub
(
self
,
weights
)
->
int
:
...
@@ -1109,6 +1170,7 @@ class Database:
...
@@ -1109,6 +1170,7 @@ class Database:
return
"fs_table"
return
"fs_table"
def
getFixedSuperTable
(
self
)
->
TdSuperTable
:
def
getFixedSuperTable
(
self
)
->
TdSuperTable
:
return
TdSuperTable
(
self
.
getFixedSuperTableName
(),
self
.
getName
())
return
TdSuperTable
(
self
.
getFixedSuperTableName
(),
self
.
getName
())
# We aim to create a starting time tick, such that, whenever we run our test here once
# We aim to create a starting time tick, such that, whenever we run our test here once
...
@@ -1342,6 +1404,19 @@ class Task():
...
@@ -1342,6 +1404,19 @@ class Task():
0x2603
,
# Table does not exist, replaced by 2662 below
0x2603
,
# Table does not exist, replaced by 2662 below
0x260d
,
# Tags number not matched
0x260d
,
# Tags number not matched
0x2662
,
# Table does not exist #TODO: what about 2603 above?
0x2662
,
# Table does not exist #TODO: what about 2603 above?
0x2600
,
# database not specified, SQL: show stables , database droped , and show tables
0x032C
,
# Object is creating
0x032D
,
# Object is dropping
0x03D3
,
# Conflict transaction not completed
0x0707
,
# Query not ready , it always occur at replica 3
0x707
,
# Query not ready
0x396
,
# Database in creating status
0x386
,
# Database in droping status
0x03E1
,
# failed on tmq_subscribe ,topic not exist
0x03ed
,
# Topic must be dropped first, SQL: drop database db_0
0x0203
,
# Invalid value
0x03f0
,
# Stream already exist , topic already exists
...
@@ -1638,9 +1713,12 @@ class TaskCreateDb(StateTransitionTask):
...
@@ -1638,9 +1713,12 @@ class TaskCreateDb(StateTransitionTask):
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
numReplica
=
Config
.
getConfig
().
num_replicas
# fixed, always
numReplica
=
Config
.
getConfig
().
num_replicas
# fixed, always
repStr
=
"replica {}"
.
format
(
numReplica
)
repStr
=
"replica {}"
.
format
(
numReplica
)
updatePostfix
=
"update 1"
if
Config
.
getConfig
().
verify_data
else
""
# allow update only when "verify data" is active
updatePostfix
=
""
if
Config
.
getConfig
().
verify_data
else
""
# allow update only when "verify data" is active , 3.0 version default is update 1
vg_nums
=
random
.
randint
(
1
,
8
)
cache_model
=
Dice
.
choice
([
'none'
,
'last_row'
,
'last_value'
,
'both'
])
buffer
=
random
.
randint
(
3
,
128
)
dbName
=
self
.
_db
.
getName
()
dbName
=
self
.
_db
.
getName
()
self
.
execWtSql
(
wt
,
"create database {} {} {}
"
.
format
(
dbName
,
repStr
,
updatePostfix
)
)
self
.
execWtSql
(
wt
,
"create database {} {} {}
vgroups {} cachemodel '{}' buffer {} "
.
format
(
dbName
,
repStr
,
updatePostfix
,
vg_nums
,
cache_model
,
buffer
)
)
if
dbName
==
"db_0"
and
Config
.
getConfig
().
use_shadow_db
:
if
dbName
==
"db_0"
and
Config
.
getConfig
().
use_shadow_db
:
self
.
execWtSql
(
wt
,
"create database {} {} {} "
.
format
(
"db_s"
,
repStr
,
updatePostfix
)
)
self
.
execWtSql
(
wt
,
"create database {} {} {} "
.
format
(
"db_s"
,
repStr
,
updatePostfix
)
)
...
@@ -1654,9 +1732,211 @@ class TaskDropDb(StateTransitionTask):
...
@@ -1654,9 +1732,211 @@ class TaskDropDb(StateTransitionTask):
return
state
.
canDropDb
()
return
state
.
canDropDb
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
self
.
execWtSql
(
wt
,
"drop database {}"
.
format
(
self
.
_db
.
getName
()))
try
:
self
.
queryWtSql
(
wt
,
"drop database {}"
.
format
(
self
.
_db
.
getName
()))
# drop database maybe failed ,because topic exists
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
if
errno
in
[
0x0203
]:
# drop maybe failed
pass
Logging
.
debug
(
"[OPS] database dropped at {}"
.
format
(
time
.
time
()))
Logging
.
debug
(
"[OPS] database dropped at {}"
.
format
(
time
.
time
()))
class
TaskCreateStream
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canCreateStreams
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
dbname
=
self
.
_db
.
getName
()
sub_stream_name
=
dbname
+
'_sub_stream'
sub_stream_tb_name
=
'stream_tb_sub'
super_stream_name
=
dbname
+
'_super_stream'
super_stream_tb_name
=
'stream_tb_super'
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
Logging
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
stbname
=
sTable
.
getName
()
sub_tables
=
sTable
.
getRegTables
(
wt
.
getDbConn
())
aggExpr
=
Dice
.
choice
([
'count(*)'
,
'avg(speed)'
,
'sum(speed)'
,
'stddev(speed)'
,
'min(speed)'
,
'max(speed)'
,
'first(speed)'
,
'last(speed)'
,
'apercentile(speed, 10)'
,
'last_row(*)'
,
'twa(speed)'
])
stream_sql
=
''
# set default value
if
sub_tables
:
sub_tbname
=
sub_tables
[
0
]
# create stream with query above sub_table
stream_sql
=
'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '
.
\
format
(
sub_stream_name
,
dbname
,
sub_stream_tb_name
,
aggExpr
,
dbname
,
sub_tbname
)
else
:
stream_sql
=
'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '
.
\
format
(
super_stream_name
,
dbname
,
super_stream_tb_name
,
aggExpr
,
dbname
,
stbname
)
self
.
execWtSql
(
wt
,
stream_sql
)
Logging
.
debug
(
"[OPS] stream is creating at {}"
.
format
(
time
.
time
()))
class
TaskCreateTopic
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canCreateTopic
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
dbname
=
self
.
_db
.
getName
()
sub_topic_name
=
dbname
+
'_sub_topic'
super_topic_name
=
dbname
+
'_super_topic'
stable_topic
=
dbname
+
'_stable_topic'
db_topic
=
'database_'
+
dbname
+
'_topics'
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
Logging
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
# create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1;
stbname
=
sTable
.
getName
()
sub_tables
=
sTable
.
getRegTables
(
wt
.
getDbConn
())
scalarExpr
=
Dice
.
choice
([
'*'
,
'speed'
,
'color'
,
'abs(speed)'
,
'acos(speed)'
,
'asin(speed)'
,
'atan(speed)'
,
'ceil(speed)'
,
'cos(speed)'
,
'cos(speed)'
,
'floor(speed)'
,
'log(speed,2)'
,
'pow(speed,2)'
,
'round(speed)'
,
'sin(speed)'
,
'sqrt(speed)'
,
'char_length(color)'
,
'concat(color,color)'
,
'concat_ws(" ", color,color," ")'
,
'length(color)'
,
'lower(color)'
,
'ltrim(color)'
,
'substr(color , 2)'
,
'upper(color)'
,
'cast(speed as double)'
,
'cast(ts as bigint)'
])
topic_sql
=
''
# set default value
if
Dice
.
throw
(
3
)
==
0
:
# create topic : source data from sub query
if
sub_tables
:
# if not empty
sub_tbname
=
sub_tables
[
0
]
# create topic : source data from sub query of sub stable
topic_sql
=
'create topic {} as select {} FROM {}.{} ; '
.
format
(
sub_topic_name
,
scalarExpr
,
dbname
,
sub_tbname
)
else
:
# create topic : source data from sub query of stable
topic_sql
=
'create topic {} as select {} FROM {}.{} '
.
format
(
super_topic_name
,
scalarExpr
,
dbname
,
stbname
)
elif
Dice
.
throw
(
3
)
==
1
:
# create topic : source data from super table
topic_sql
=
'create topic {} AS STABLE {}.{} '
.
format
(
stable_topic
,
dbname
,
stbname
)
elif
Dice
.
throw
(
3
)
==
2
:
# create topic : source data from whole database
topic_sql
=
'create topic {} AS DATABASE {} '
.
format
(
db_topic
,
dbname
)
else
:
pass
# exec create topics
self
.
execWtSql
(
wt
,
"use {}"
.
format
(
dbname
))
self
.
execWtSql
(
wt
,
topic_sql
)
Logging
.
debug
(
"[OPS] db topic is creating at {}"
.
format
(
time
.
time
()))
class
TaskDropTopics
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canDropTopic
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
dbname
=
self
.
_db
.
getName
()
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
Logging
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
tblName
=
sTable
.
getName
()
if
sTable
.
hasTopics
(
wt
.
getDbConn
()):
sTable
.
dropTopics
(
wt
.
getDbConn
(),
dbname
,
None
)
# drop topics of database
sTable
.
dropTopics
(
wt
.
getDbConn
(),
dbname
,
tblName
)
# drop topics of stable
class
TaskDropStreams
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canDropStream
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# dbname = self._db.getName()
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
Logging
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if
sTable
.
hasStreams
(
wt
.
getDbConn
()):
sTable
.
dropStreams
(
wt
.
getDbConn
())
# drop stream of database
class
TaskDropStreamTables
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canDropStream
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# dbname = self._db.getName()
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
Logging
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
wt
.
execSql
(
"use db"
)
# should always be in place
# tblName = sTable.getName()
if
sTable
.
hasStreamTables
(
wt
.
getDbConn
()):
sTable
.
dropStreamTables
(
wt
.
getDbConn
())
# drop stream tables
class
TaskCreateConsumers
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canCreateConsumers
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
if
Config
.
getConfig
().
connector_type
==
'native'
:
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
if
sTable
.
hasTopics
(
wt
.
getDbConn
()):
sTable
.
createConsumer
(
wt
.
getDbConn
(),
random
.
randint
(
1
,
10
))
pass
else
:
print
(
" restful not support tmq consumers"
)
return
class
TaskCreateSuperTable
(
StateTransitionTask
):
class
TaskCreateSuperTable
(
StateTransitionTask
):
@
classmethod
@
classmethod
def
getEndState
(
cls
):
def
getEndState
(
cls
):
...
@@ -1673,7 +1953,7 @@ class TaskCreateSuperTable(StateTransitionTask):
...
@@ -1673,7 +1953,7 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
# wt.execSql("use db") # should always be in place
sTable
.
create
(
wt
.
getDbConn
(),
sTable
.
create
(
wt
.
getDbConn
(),
{
'ts'
:
TdDataType
.
TIMESTAMP
,
'speed'
:
TdDataType
.
INT
,
'color'
:
TdDataType
.
BINARY16
},
{
{
'ts'
:
TdDataType
.
TIMESTAMP
,
'speed'
:
TdDataType
.
INT
,
'color'
:
TdDataType
.
BINARY16
},
{
'b'
:
TdDataType
.
BINARY200
,
'f'
:
TdDataType
.
FLOAT
},
'b'
:
TdDataType
.
BINARY200
,
'f'
:
TdDataType
.
FLOAT
},
...
@@ -1688,14 +1968,17 @@ class TdSuperTable:
...
@@ -1688,14 +1968,17 @@ class TdSuperTable:
def
__init__
(
self
,
stName
,
dbName
):
def
__init__
(
self
,
stName
,
dbName
):
self
.
_stName
=
stName
self
.
_stName
=
stName
self
.
_dbName
=
dbName
self
.
_dbName
=
dbName
self
.
_consumerLists
=
{}
self
.
_ConsumerInsts
=
[]
def
getName
(
self
):
def
getName
(
self
):
return
self
.
_stName
return
self
.
_stName
def
drop
(
self
,
dbc
,
skipCheck
=
False
):
def
drop
(
self
,
dbc
,
skipCheck
=
False
):
dbName
=
self
.
_dbName
dbName
=
self
.
_dbName
if
self
.
exists
(
dbc
)
:
# if myself exists
if
self
.
exists
(
dbc
)
:
# if myself exists
fullTableName
=
dbName
+
'.'
+
self
.
_stName
fullTableName
=
dbName
+
'.'
+
self
.
_stName
dbc
.
execute
(
"DROP TABLE {}"
.
format
(
fullTableName
))
dbc
.
execute
(
"DROP TABLE {}"
.
format
(
fullTableName
))
else
:
else
:
if
not
skipCheck
:
if
not
skipCheck
:
...
@@ -1711,10 +1994,12 @@ class TdSuperTable:
...
@@ -1711,10 +1994,12 @@ class TdSuperTable:
dbName
=
self
.
_dbName
dbName
=
self
.
_dbName
dbc
.
execute
(
"USE "
+
dbName
)
dbc
.
execute
(
"USE "
+
dbName
)
fullTableName
=
dbName
+
'.'
+
self
.
_stName
fullTableName
=
dbName
+
'.'
+
self
.
_stName
if
dbc
.
existsSuperTable
(
self
.
_stName
):
if
dbc
.
existsSuperTable
(
self
.
_stName
):
if
dropIfExists
:
if
dropIfExists
:
dbc
.
execute
(
"DROP TABLE {}"
.
format
(
fullTableName
))
dbc
.
execute
(
"DROP TABLE {}"
.
format
(
fullTableName
))
else
:
# error
else
:
# error
raise
CrashGenError
(
"Cannot create super table, already exists: {}"
.
format
(
self
.
_stName
))
raise
CrashGenError
(
"Cannot create super table, already exists: {}"
.
format
(
self
.
_stName
))
...
@@ -1728,12 +2013,61 @@ class TdSuperTable:
...
@@ -1728,12 +2013,61 @@ class TdSuperTable:
)
)
else
:
else
:
sql
+=
" TAGS (dummy int) "
sql
+=
" TAGS (dummy int) "
dbc
.
execute
(
sql
)
dbc
.
execute
(
sql
)
def
createConsumer
(
self
,
dbc
,
Consumer_nums
):
def
generateConsumer
(
current_topic_list
):
conf
=
TaosTmqConf
()
conf
.
set
(
"group.id"
,
"tg2"
)
conf
.
set
(
"td.connect.user"
,
"root"
)
conf
.
set
(
"td.connect.pass"
,
"taosdata"
)
conf
.
set
(
"enable.auto.commit"
,
"true"
)
def
tmq_commit_cb_print
(
tmq
,
resp
,
offset
,
param
=
None
):
print
(
f
"commit:
{
resp
}
, tmq:
{
tmq
}
, offset:
{
offset
}
, param:
{
param
}
"
)
conf
.
set_auto_commit_cb
(
tmq_commit_cb_print
,
None
)
consumer
=
conf
.
new_consumer
()
topic_list
=
TaosTmqList
()
for
topic
in
current_topic_list
:
topic_list
.
append
(
topic
)
try
:
consumer
.
subscribe
(
topic_list
)
except
TmqError
as
e
:
pass
# consumer with random work life
time_start
=
time
.
time
()
while
1
:
res
=
consumer
.
poll
(
1000
)
if
time
.
time
()
-
time_start
>
random
.
randint
(
5
,
50
)
:
break
try
:
consumer
.
unsubscribe
()
except
TmqError
as
e
:
pass
return
# mulit Consumer
current_topic_list
=
self
.
getTopicLists
(
dbc
)
for
i
in
range
(
Consumer_nums
):
consumer_inst
=
threading
.
Thread
(
target
=
generateConsumer
,
args
=
(
current_topic_list
,))
self
.
_ConsumerInsts
.
append
(
consumer_inst
)
for
ConsumerInst
in
self
.
_ConsumerInsts
:
ConsumerInst
.
start
()
for
ConsumerInst
in
self
.
_ConsumerInsts
:
ConsumerInst
.
join
()
def
getTopicLists
(
self
,
dbc
:
DbConn
):
dbc
.
query
(
"show topics "
)
topics
=
dbc
.
getQueryResult
()
topicLists
=
[
v
[
0
]
for
v
in
topics
]
return
topicLists
def
getRegTables
(
self
,
dbc
:
DbConn
):
def
getRegTables
(
self
,
dbc
:
DbConn
):
dbName
=
self
.
_dbName
dbName
=
self
.
_dbName
try
:
try
:
dbc
.
query
(
"select TBNAME from {}.{}"
.
format
(
dbName
,
self
.
_stName
))
# TODO: analyze result set later
dbc
.
query
(
"select
distinct
TBNAME from {}.{}"
.
format
(
dbName
,
self
.
_stName
))
# TODO: analyze result set later
except
taos
.
error
.
ProgrammingError
as
err
:
except
taos
.
error
.
ProgrammingError
as
err
:
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
Logging
.
debug
(
"[=] Failed to get tables from super table: errno=0x{:X}, msg: {}"
.
format
(
errno2
,
err
))
Logging
.
debug
(
"[=] Failed to get tables from super table: errno=0x{:X}, msg: {}"
.
format
(
errno2
,
err
))
...
@@ -1743,7 +2077,75 @@ class TdSuperTable:
...
@@ -1743,7 +2077,75 @@ class TdSuperTable:
return
[
v
[
0
]
for
v
in
qr
]
# list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
return
[
v
[
0
]
for
v
in
qr
]
# list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def
hasRegTables
(
self
,
dbc
:
DbConn
):
def
hasRegTables
(
self
,
dbc
:
DbConn
):
return
dbc
.
query
(
"SELECT * FROM {}.{}"
.
format
(
self
.
_dbName
,
self
.
_stName
))
>
0
if
dbc
.
existsSuperTable
(
self
.
_stName
):
return
dbc
.
query
(
"SELECT * FROM {}.{}"
.
format
(
self
.
_dbName
,
self
.
_stName
))
>
0
else
:
return
False
def
hasStreamTables
(
self
,
dbc
:
DbConn
):
return
dbc
.
query
(
"show {}.stables like 'stream_tb%'"
.
format
(
self
.
_dbName
))
>
0
def
hasStreams
(
self
,
dbc
:
DbConn
):
return
dbc
.
query
(
"show streams"
)
>
0
def
hasTopics
(
self
,
dbc
:
DbConn
):
return
dbc
.
query
(
"show topics"
)
>
0
def
dropTopics
(
self
,
dbc
:
DbConn
,
dbname
=
None
,
stb_name
=
None
):
dbc
.
query
(
"show topics "
)
topics
=
dbc
.
getQueryResult
()
if
dbname
!=
None
and
stb_name
==
None
:
for
topic
in
topics
:
if
dbname
in
topic
[
0
]
and
topic
[
0
].
startswith
(
"database"
):
try
:
dbc
.
execute
(
'drop topic {}'
.
format
(
topic
[
0
]))
Logging
.
debug
(
"[OPS] topic {} is droping at {}"
.
format
(
topic
,
time
.
time
()))
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
if
errno
in
[
0x03EB
]:
# Topic subscribed cannot be dropped
pass
# for subsript in subscriptions:
else
:
pass
pass
return
True
elif
dbname
!=
None
and
stb_name
!=
None
:
for
topic
in
topics
:
if
topic
[
0
].
startswith
(
self
.
_dbName
)
and
topic
[
0
].
endswith
(
'topic'
):
dbc
.
execute
(
'drop topic {}'
.
format
(
topic
[
0
]))
Logging
.
debug
(
"[OPS] topic {} is droping at {}"
.
format
(
topic
,
time
.
time
()))
return
True
else
:
return
True
pass
def
dropStreams
(
self
,
dbc
:
DbConn
):
dbc
.
query
(
"show streams "
)
Streams
=
dbc
.
getQueryResult
()
for
Stream
in
Streams
:
if
Stream
[
0
].
startswith
(
self
.
_dbName
):
dbc
.
execute
(
'drop stream {}'
.
format
(
Stream
[
0
]))
return
not
dbc
.
query
(
"show streams "
)
>
0
def
dropStreamTables
(
self
,
dbc
:
DbConn
):
dbc
.
query
(
"show {}.stables like 'stream_tb%'"
.
format
(
self
.
_dbName
))
StreamTables
=
dbc
.
getQueryResult
()
for
StreamTable
in
StreamTables
:
if
self
.
dropStreams
(
dbc
):
dbc
.
execute
(
'drop table {}.{}'
.
format
(
self
.
_dbName
,
StreamTable
[
0
]))
return
not
dbc
.
query
(
"show {}.stables like 'stream_tb%'"
.
format
(
self
.
_dbName
))
def
ensureRegTable
(
self
,
task
:
Optional
[
Task
],
dbc
:
DbConn
,
regTableName
:
str
):
def
ensureRegTable
(
self
,
task
:
Optional
[
Task
],
dbc
:
DbConn
,
regTableName
:
str
):
'''
'''
...
@@ -1838,10 +2240,46 @@ class TdSuperTable:
...
@@ -1838,10 +2240,46 @@ class TdSuperTable:
# Run the query against the regular table first
# Run the query against the regular table first
doAggr
=
(
Dice
.
throw
(
2
)
==
0
)
# 1 in 2 chance
doAggr
=
(
Dice
.
throw
(
2
)
==
0
)
# 1 in 2 chance
if
not
doAggr
:
# don't do aggregate query, just simple one
if
not
doAggr
:
# don't do aggregate query, just simple one
commonExpr
=
Dice
.
choice
([
'*'
,
'abs(speed)'
,
'acos(speed)'
,
'asin(speed)'
,
'atan(speed)'
,
'ceil(speed)'
,
'cos(speed)'
,
'cos(speed)'
,
'floor(speed)'
,
'log(speed,2)'
,
'pow(speed,2)'
,
'round(speed)'
,
'sin(speed)'
,
'sqrt(speed)'
,
'char_length(color)'
,
'concat(color,color)'
,
'concat_ws(" ", color,color," ")'
,
'length(color)'
,
'lower(color)'
,
'ltrim(color)'
,
'substr(color , 2)'
,
'upper(color)'
,
'cast(speed as double)'
,
'cast(ts as bigint)'
,
# 'TO_ISO8601(color)',
# 'TO_UNIXTIMESTAMP(ts)',
'now()'
,
'timediff(ts,now)'
,
'timezone()'
,
'TIMETRUNCATE(ts,1s)'
,
'TIMEZONE()'
,
'TODAY()'
,
'distinct(color)'
]
)
ret
.
append
(
SqlQuery
(
# reg table
ret
.
append
(
SqlQuery
(
# reg table
"select {} from {}.{}"
.
format
(
'*'
,
self
.
_dbName
,
rTbName
)))
"select {} from {}.{}"
.
format
(
commonExpr
,
self
.
_dbName
,
rTbName
)))
ret
.
append
(
SqlQuery
(
# super table
ret
.
append
(
SqlQuery
(
# super table
"select {} from {}.{}"
.
format
(
'*'
,
self
.
_dbName
,
self
.
getName
())))
"select {} from {}.{}"
.
format
(
commonExpr
,
self
.
_dbName
,
self
.
getName
())))
else
:
# Aggregate query
else
:
# Aggregate query
aggExpr
=
Dice
.
choice
([
aggExpr
=
Dice
.
choice
([
'count(*)'
,
'count(*)'
,
...
@@ -1857,17 +2295,34 @@ class TdSuperTable:
...
@@ -1857,17 +2295,34 @@ class TdSuperTable:
'top(speed, 50)'
,
# TODO: not supported?
'top(speed, 50)'
,
# TODO: not supported?
'bottom(speed, 50)'
,
# TODO: not supported?
'bottom(speed, 50)'
,
# TODO: not supported?
'apercentile(speed, 10)'
,
# TODO: TD-1316
'apercentile(speed, 10)'
,
# TODO: TD-1316
# 'last_row(speed
)', # TODO: commented out per TD-3231, we should re-create
'last_row(*
)'
,
# TODO: commented out per TD-3231, we should re-create
# Transformation Functions
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
'spread(speed)'
,
'elapsed(ts)'
,
'mode(speed)'
,
'bottom(speed,1)'
,
'top(speed,1)'
,
'tail(speed,1)'
,
'unique(color)'
,
'csum(speed)'
,
'DERIVATIVE(speed,1s,1)'
,
'diff(speed,1)'
,
'irate(speed)'
,
'mavg(speed,3)'
,
'sample(speed,5)'
,
'STATECOUNT(speed,"LT",1)'
,
'STATEDURATION(speed,"LT",1)'
,
'twa(speed)'
])
# TODO: add more from 'top'
])
# TODO: add more from 'top'
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql
=
"select {} from {}.{}"
.
format
(
aggExpr
,
self
.
_dbName
,
self
.
getName
())
sql
=
"select {} from {}.{}"
.
format
(
aggExpr
,
self
.
_dbName
,
self
.
getName
())
if
Dice
.
throw
(
3
)
==
0
:
# 1 in X chance
if
Dice
.
throw
(
3
)
==
0
:
# 1 in X chance
sql
=
sql
+
' GROUP BY color'
partion_expr
=
Dice
.
choice
([
'color'
,
'tbname'
])
sql
=
sql
+
' partition BY '
+
partion_expr
+
' order by '
+
partion_expr
Progress
.
emit
(
Progress
.
QUERY_GROUP_BY
)
Progress
.
emit
(
Progress
.
QUERY_GROUP_BY
)
# Logging.info("Executing GROUP-BY query: " + sql)
# Logging.info("Executing GROUP-BY query: " + sql)
ret
.
append
(
SqlQuery
(
sql
))
ret
.
append
(
SqlQuery
(
sql
))
...
@@ -1974,6 +2429,7 @@ class TaskDropSuperTable(StateTransitionTask):
...
@@ -1974,6 +2429,7 @@ class TaskDropSuperTable(StateTransitionTask):
isSuccess
=
False
isSuccess
=
False
Logging
.
debug
(
"[DB] Acceptable error when dropping a table"
)
Logging
.
debug
(
"[DB] Acceptable error when dropping a table"
)
continue
# try to delete next regular table
continue
# try to delete next regular table
if
(
not
tickOutput
):
if
(
not
tickOutput
):
tickOutput
=
True
# Print only one time
tickOutput
=
True
# Print only one time
...
@@ -1985,6 +2441,8 @@ class TaskDropSuperTable(StateTransitionTask):
...
@@ -1985,6 +2441,8 @@ class TaskDropSuperTable(StateTransitionTask):
# Drop the super table itself
# Drop the super table itself
tblName
=
self
.
_db
.
getFixedSuperTableName
()
tblName
=
self
.
_db
.
getFixedSuperTableName
()
self
.
execWtSql
(
wt
,
"drop table {}.{}"
.
format
(
self
.
_db
.
getName
(),
tblName
))
self
.
execWtSql
(
wt
,
"drop table {}.{}"
.
format
(
self
.
_db
.
getName
(),
tblName
))
class
TaskAlterTags
(
StateTransitionTask
):
class
TaskAlterTags
(
StateTransitionTask
):
...
@@ -2234,6 +2692,220 @@ class TaskAddData(StateTransitionTask):
...
@@ -2234,6 +2692,220 @@ class TaskAddData(StateTransitionTask):
self
.
activeTable
.
discard
(
i
)
# not raising an error, unlike remove
self
.
activeTable
.
discard
(
i
)
# not raising an error, unlike remove
class
TaskDeleteData
(
StateTransitionTask
):
# Track which table is being actively worked on
activeTable
:
Set
[
int
]
=
set
()
# We use these two files to record operations to DB, useful for power-off tests
fAddLogReady
=
None
# type: Optional[io.TextIOWrapper]
fAddLogDone
=
None
# type: Optional[io.TextIOWrapper]
@
classmethod
def
prepToRecordOps
(
cls
):
if
Config
.
getConfig
().
record_ops
:
if
(
cls
.
fAddLogReady
is
None
):
Logging
.
info
(
"Recording in a file operations to be performed..."
)
cls
.
fAddLogReady
=
open
(
"add_log_ready.txt"
,
"w"
)
if
(
cls
.
fAddLogDone
is
None
):
Logging
.
info
(
"Recording in a file operations completed..."
)
cls
.
fAddLogDone
=
open
(
"add_log_done.txt"
,
"w"
)
@
classmethod
def
getEndState
(
cls
):
return
StateHasData
()
@
classmethod
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canDeleteData
()
def
_lockTableIfNeeded
(
self
,
fullTableName
,
extraMsg
=
''
):
if
Config
.
getConfig
().
verify_data
:
# Logging.info("Locking table: {}".format(fullTableName))
self
.
lockTable
(
fullTableName
)
# Logging.info("Table locked {}: {}".format(extraMsg, fullTableName))
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
else
:
# Logging.info("Skipping locking table")
pass
def
_unlockTableIfNeeded
(
self
,
fullTableName
):
if
Config
.
getConfig
().
verify_data
:
# Logging.info("Unlocking table: {}".format(fullTableName))
self
.
unlockTable
(
fullTableName
)
# Logging.info("Table unlocked: {}".format(fullTableName))
else
:
pass
# Logging.info("Skipping unlocking table")
def
_deleteData
(
self
,
db
:
Database
,
dbc
,
regTableName
,
te
:
TaskExecutor
):
# implied: NOT in batches
numRecords
=
self
.
LARGE_NUMBER_OF_RECORDS
if
Config
.
getConfig
().
larger_data
else
self
.
SMALL_NUMBER_OF_RECORDS
del_Records
=
int
(
numRecords
/
5
)
if
Dice
.
throw
(
2
)
==
0
:
for
j
in
range
(
del_Records
):
# number of records per table
intToWrite
=
db
.
getNextInt
()
nextTick
=
db
.
getNextTick
()
# nextColor = db.getNextColor()
if
Config
.
getConfig
().
record_ops
:
self
.
prepToRecordOps
()
if
self
.
fAddLogReady
is
None
:
raise
CrashGenError
(
"Unexpected empty fAddLogReady"
)
self
.
fAddLogReady
.
write
(
"Ready to delete {} to {}
\n
"
.
format
(
intToWrite
,
regTableName
))
self
.
fAddLogReady
.
flush
()
os
.
fsync
(
self
.
fAddLogReady
.
fileno
())
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName
=
db
.
getName
()
+
'.'
+
regTableName
self
.
_lockTableIfNeeded
(
fullTableName
)
# so that we are verify read-back. TODO: deal with exceptions before unlock
try
:
sql
=
"delete from {} where ts = '{}' ;"
.
format
(
# removed: tags ('{}', {})
fullTableName
,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick
)
# print(sql)
# Logging.info("Adding data: {}".format(sql))
dbc
.
execute
(
sql
)
# Logging.info("Data added: {}".format(sql))
intWrote
=
intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task
if
(
not
Config
.
getConfig
().
use_shadow_db
)
and
Dice
.
throw
(
5
)
==
0
:
# 1 in N chance, plus not using shaddow DB
intToUpdate
=
db
.
getNextInt
()
# Updated, but should not succeed
# nextColor = db.getNextColor()
sql
=
"delete from {} where ts = '{}' ;"
.
format
(
# "INSERt" means "update" here
fullTableName
,
nextTick
)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc
.
execute
(
sql
)
intWrote
=
intToUpdate
# We updated, seems TDengine non-cluster accepts this.
except
:
# Any exception at all
self
.
_unlockTableIfNeeded
(
fullTableName
)
raise
# Now read it back and verify, we might encounter an error if table is dropped
if
Config
.
getConfig
().
verify_data
:
# only if command line asks for it
try
:
dbc
.
query
(
"SELECT * from {}.{} WHERE ts='{}'"
.
format
(
db
.
getName
(),
regTableName
,
nextTick
))
result
=
dbc
.
getQueryResult
()
if
len
(
result
)
==
0
:
# means data has been delete
print
(
"D1"
,
end
=
""
)
# DF means delete failed
else
:
print
(
"DF"
,
end
=
""
)
# DF means delete failed
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
# if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
# print("D1",end="") # D1 means delete data success and only 1 record
if
errno
in
[
0x218
,
0x362
,
0x2662
]:
# table doesn't exist
# do nothing
pass
else
:
# Re-throw otherwise
raise
finally
:
self
.
_unlockTableIfNeeded
(
fullTableName
)
# Quite ugly, refactor lock/unlock
# Done with read-back verification, unlock the table now
# Successfully wrote the data into the DB, let's record it somehow
te
.
recordDataMark
(
intWrote
)
else
:
# delete all datas and verify datas ,expected table is empty
if
Config
.
getConfig
().
record_ops
:
self
.
prepToRecordOps
()
if
self
.
fAddLogReady
is
None
:
raise
CrashGenError
(
"Unexpected empty fAddLogReady"
)
self
.
fAddLogReady
.
write
(
"Ready to delete {} to {}
\n
"
.
format
(
intToWrite
,
regTableName
))
self
.
fAddLogReady
.
flush
()
os
.
fsync
(
self
.
fAddLogReady
.
fileno
())
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName
=
db
.
getName
()
+
'.'
+
regTableName
self
.
_lockTableIfNeeded
(
fullTableName
)
# so that we are verify read-back. TODO: deal with exceptions before unlock
try
:
sql
=
"delete from {} ;"
.
format
(
# removed: tags ('{}', {})
fullTableName
)
# Logging.info("Adding data: {}".format(sql))
dbc
.
execute
(
sql
)
# Logging.info("Data added: {}".format(sql))
# Quick hack, attach an update statement here. TODO: create an "update" task
if
(
not
Config
.
getConfig
().
use_shadow_db
)
and
Dice
.
throw
(
5
)
==
0
:
# 1 in N chance, plus not using shaddow DB
sql
=
"delete from {} ;"
.
format
(
# "INSERt" means "update" here
fullTableName
)
dbc
.
execute
(
sql
)
except
:
# Any exception at all
self
.
_unlockTableIfNeeded
(
fullTableName
)
raise
# Now read it back and verify, we might encounter an error if table is dropped
if
Config
.
getConfig
().
verify_data
:
# only if command line asks for it
try
:
dbc
.
query
(
"SELECT * from {}.{} WHERE ts='{}'"
.
format
(
db
.
getName
(),
regTableName
,
nextTick
))
result
=
dbc
.
getQueryResult
()
if
len
(
result
)
==
0
:
# means data has been delete
print
(
"DA"
,
end
=
""
)
else
:
print
(
"DF"
,
end
=
""
)
# DF means delete failed
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
# if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
# print("Da",end="") # Da means delete data success and for all datas
if
errno
in
[
0x218
,
0x362
,
0x2662
]:
# table doesn't exist
# do nothing
pass
else
:
# Re-throw otherwise
raise
finally
:
self
.
_unlockTableIfNeeded
(
fullTableName
)
# Quite ugly, refactor lock/unlock
# Done with read-back verification, unlock the table now
if
Config
.
getConfig
().
record_ops
:
if
self
.
fAddLogDone
is
None
:
raise
CrashGenError
(
"Unexpected empty fAddLogDone"
)
self
.
fAddLogDone
.
write
(
"Wrote {} to {}
\n
"
.
format
(
intWrote
,
regTableName
))
self
.
fAddLogDone
.
flush
()
os
.
fsync
(
self
.
fAddLogDone
.
fileno
())
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db
=
self
.
_db
dbc
=
wt
.
getDbConn
()
numTables
=
self
.
LARGE_NUMBER_OF_TABLES
if
Config
.
getConfig
().
larger_data
else
self
.
SMALL_NUMBER_OF_TABLES
numRecords
=
self
.
LARGE_NUMBER_OF_RECORDS
if
Config
.
getConfig
().
larger_data
else
self
.
SMALL_NUMBER_OF_RECORDS
tblSeq
=
list
(
range
(
numTables
))
random
.
shuffle
(
tblSeq
)
# now we have random sequence
for
i
in
tblSeq
:
if
(
i
in
self
.
activeTable
):
# wow already active
# print("x", end="", flush=True) # concurrent insertion
Progress
.
emit
(
Progress
.
CONCURRENT_INSERTION
)
else
:
self
.
activeTable
.
add
(
i
)
# marking it active
dbName
=
db
.
getName
()
sTable
=
db
.
getFixedSuperTable
()
regTableName
=
self
.
getRegTableName
(
i
)
# "db.reg_table_{}".format(i)
fullTableName
=
dbName
+
'.'
+
regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable
.
ensureRegTable
(
self
,
wt
.
getDbConn
(),
regTableName
)
# Ensure the table exists
# self._unlockTable(fullTableName)
self
.
_deleteData
(
db
,
dbc
,
regTableName
,
te
)
self
.
activeTable
.
discard
(
i
)
# not raising an error, unlike remove
class
ThreadStacks
:
# stack info for all threads
class
ThreadStacks
:
# stack info for all threads
def
__init__
(
self
):
def
__init__
(
self
):
...
@@ -2244,6 +2916,9 @@ class ThreadStacks: # stack info for all threads
...
@@ -2244,6 +2916,9 @@ class ThreadStacks: # stack info for all threads
shortTid
=
th
.
native_id
%
10000
#type: ignore
shortTid
=
th
.
native_id
%
10000
#type: ignore
self
.
_allStacks
[
shortTid
]
=
stack
# Was using th.native_id
self
.
_allStacks
[
shortTid
]
=
stack
# Was using th.native_id
def
record_current_time
(
self
,
current_time
):
self
.
current_time
=
current_time
def
print
(
self
,
filteredEndName
=
None
,
filterInternal
=
False
):
def
print
(
self
,
filteredEndName
=
None
,
filterInternal
=
False
):
for
shortTid
,
stack
in
self
.
_allStacks
.
items
():
# for each thread, stack frames top to bottom
for
shortTid
,
stack
in
self
.
_allStacks
.
items
():
# for each thread, stack frames top to bottom
lastFrame
=
stack
[
-
1
]
lastFrame
=
stack
[
-
1
]
...
@@ -2258,8 +2933,11 @@ class ThreadStacks: # stack info for all threads
...
@@ -2258,8 +2933,11 @@ class ThreadStacks: # stack info for all threads
continue
# ignore
continue
# ignore
# Now print
# Now print
print
(
"
\n
<----- Thread Info for LWP/ID: {} (most recent call last) <-----"
.
format
(
shortTid
))
print
(
"
\n
<----- Thread Info for LWP/ID: {} (most recent call last) <-----"
.
format
(
shortTid
))
lastSqlForThread
=
DbConn
.
fetchSqlForThread
(
shortTid
)
lastSqlForThread
=
DbConn
.
fetchSqlForThread
(
shortTid
)
print
(
"Last SQL statement attempted from thread {} is: {}"
.
format
(
shortTid
,
lastSqlForThread
))
last_sql_commit_time
=
DbConn
.
get_save_sql_time
(
shortTid
)
# time_cost = DbConn.get_time_cost()
print
(
"Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}"
.
format
(
shortTid
,
self
.
current_time
-
last_sql_commit_time
,
lastSqlForThread
))
stackFrame
=
0
stackFrame
=
0
for
frame
in
stack
:
# was using: reversed(stack)
for
frame
in
stack
:
# was using: reversed(stack)
# print(frame)
# print(frame)
...
@@ -2268,6 +2946,8 @@ class ThreadStacks: # stack info for all threads
...
@@ -2268,6 +2946,8 @@ class ThreadStacks: # stack info for all threads
print
(
" {}"
.
format
(
frame
.
line
))
print
(
" {}"
.
format
(
frame
.
line
))
stackFrame
+=
1
stackFrame
+=
1
print
(
"-----> End of Thread Info ----->
\n
"
)
print
(
"-----> End of Thread Info ----->
\n
"
)
if
self
.
current_time
-
last_sql_commit_time
>
100
:
# dead lock occured
print
(
"maybe dead locked of thread {} "
.
format
(
shortTid
))
class
ClientManager
:
class
ClientManager
:
def
__init__
(
self
):
def
__init__
(
self
):
...
@@ -2631,4 +3311,3 @@ class Container():
...
@@ -2631,4 +3311,3 @@ class Container():
return
return
self
.
_verifyValidProperty
(
name
)
self
.
_verifyValidProperty
(
name
)
self
.
_cargo
[
name
]
=
value
self
.
_cargo
[
name
]
=
value
tests/pytest/crash_gen/shared/db.py
浏览文件 @
7dd2af78
...
@@ -26,10 +26,13 @@ class DbConn:
...
@@ -26,10 +26,13 @@ class DbConn:
TYPE_NATIVE
=
"native-c"
TYPE_NATIVE
=
"native-c"
TYPE_REST
=
"rest-api"
TYPE_REST
=
"rest-api"
TYPE_INVALID
=
"invalid"
TYPE_INVALID
=
"invalid"
# class variables
# class variables
lastSqlFromThreads
:
dict
[
int
,
str
]
=
{}
# stored by thread id, obtained from threading.current_thread().ident%10000
lastSqlFromThreads
:
dict
[
int
,
str
]
=
{}
# stored by thread id, obtained from threading.current_thread().ident%10000
spendThreads
:
dict
[
int
,
float
]
=
{}
# stored by thread id, obtained from threading.current_thread().ident%10000
current_time
:
dict
[
int
,
float
]
=
{}
# save current time
@
classmethod
@
classmethod
def
saveSqlForCurrentThread
(
cls
,
sql
:
str
):
def
saveSqlForCurrentThread
(
cls
,
sql
:
str
):
'''
'''
...
@@ -37,15 +40,56 @@ class DbConn:
...
@@ -37,15 +40,56 @@ class DbConn:
run into a dead-lock situation, we can pick out the deadlocked thread, and use
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
that information to find what what SQL statement is stuck.
'''
'''
th
=
threading
.
current_thread
()
th
=
threading
.
current_thread
()
shortTid
=
th
.
native_id
%
10000
#type: ignore
shortTid
=
th
.
native_id
%
10000
#type: ignore
cls
.
lastSqlFromThreads
[
shortTid
]
=
sql
# Save this for later
cls
.
lastSqlFromThreads
[
shortTid
]
=
sql
# Save this for later
cls
.
record_save_sql_time
()
@
classmethod
@
classmethod
def
fetchSqlForThread
(
cls
,
shortTid
:
int
)
->
str
:
def
fetchSqlForThread
(
cls
,
shortTid
:
int
)
->
str
:
print
(
"======================="
)
if
shortTid
not
in
cls
.
lastSqlFromThreads
:
if
shortTid
not
in
cls
.
lastSqlFromThreads
:
raise
CrashGenError
(
"No last-attempted-SQL found for thread id: {}"
.
format
(
shortTid
))
raise
CrashGenError
(
"No last-attempted-SQL found for thread id: {}"
.
format
(
shortTid
))
return
cls
.
lastSqlFromThreads
[
shortTid
]
return
cls
.
lastSqlFromThreads
[
shortTid
]
@
classmethod
def
get_save_sql_time
(
cls
,
shortTid
:
int
):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
return
cls
.
current_time
[
shortTid
]
@
classmethod
def
record_save_sql_time
(
cls
):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th
=
threading
.
current_thread
()
shortTid
=
th
.
native_id
%
10000
#type: ignore
cls
.
current_time
[
shortTid
]
=
float
(
time
.
time
())
# Save this for later
@
classmethod
def
sql_exec_spend
(
cls
,
cost
:
float
):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th
=
threading
.
current_thread
()
shortTid
=
th
.
native_id
%
10000
#type: ignore
cls
.
spendThreads
[
shortTid
]
=
cost
# Save this for later
@
classmethod
def
get_time_cost
(
cls
)
->
float
:
th
=
threading
.
current_thread
()
shortTid
=
th
.
native_id
%
10000
#type: ignore
return
cls
.
spendThreads
.
get
(
shortTid
)
@
classmethod
@
classmethod
def
create
(
cls
,
connType
,
dbTarget
):
def
create
(
cls
,
connType
,
dbTarget
):
...
@@ -61,6 +105,7 @@ class DbConn:
...
@@ -61,6 +105,7 @@ class DbConn:
def
createNative
(
cls
,
dbTarget
)
->
DbConn
:
def
createNative
(
cls
,
dbTarget
)
->
DbConn
:
return
cls
.
create
(
cls
.
TYPE_NATIVE
,
dbTarget
)
return
cls
.
create
(
cls
.
TYPE_NATIVE
,
dbTarget
)
@
classmethod
@
classmethod
def
createRest
(
cls
,
dbTarget
)
->
DbConn
:
def
createRest
(
cls
,
dbTarget
)
->
DbConn
:
return
cls
.
create
(
cls
.
TYPE_REST
,
dbTarget
)
return
cls
.
create
(
cls
.
TYPE_REST
,
dbTarget
)
...
@@ -75,6 +120,7 @@ class DbConn:
...
@@ -75,6 +120,7 @@ class DbConn:
return
"[DbConn: type={}, target={}]"
.
format
(
self
.
_type
,
self
.
_dbTarget
)
return
"[DbConn: type={}, target={}]"
.
format
(
self
.
_type
,
self
.
_dbTarget
)
def
getLastSql
(
self
):
def
getLastSql
(
self
):
return
self
.
_lastSql
return
self
.
_lastSql
def
open
(
self
):
def
open
(
self
):
...
@@ -184,13 +230,19 @@ class DbConnRest(DbConn):
...
@@ -184,13 +230,19 @@ class DbConnRest(DbConn):
def
_doSql
(
self
,
sql
):
def
_doSql
(
self
,
sql
):
self
.
_lastSql
=
sql
# remember this, last SQL attempted
self
.
_lastSql
=
sql
# remember this, last SQL attempted
self
.
saveSqlForCurrentThread
(
sql
)
# Save in global structure too. #TODO: combine with above
self
.
saveSqlForCurrentThread
(
sql
)
# Save in global structure too. #TODO: combine with above
try
:
time_cost
=
-
1
time_start
=
time
.
time
()
try
:
r
=
requests
.
post
(
self
.
_url
,
r
=
requests
.
post
(
self
.
_url
,
data
=
sql
,
data
=
sql
,
auth
=
HTTPBasicAuth
(
'root'
,
'taosdata'
))
auth
=
HTTPBasicAuth
(
'root'
,
'taosdata'
))
except
:
except
:
print
(
"REST API Failure (TODO: more info here)"
)
print
(
"REST API Failure (TODO: more info here)"
)
self
.
sql_exec_spend
(
-
2
)
raise
raise
finally
:
time_cost
=
time
.
time
()
-
time_start
self
.
sql_exec_spend
(
time_cost
)
rj
=
r
.
json
()
rj
=
r
.
json
()
# Sanity check for the "Json Result"
# Sanity check for the "Json Result"
if
(
'status'
not
in
rj
):
if
(
'status'
not
in
rj
):
...
@@ -223,6 +275,8 @@ class DbConnRest(DbConn):
...
@@ -223,6 +275,8 @@ class DbConnRest(DbConn):
"[SQL-REST] Execution Result, nRows = {}, SQL = {}"
.
format
(
nRows
,
sql
))
"[SQL-REST] Execution Result, nRows = {}, SQL = {}"
.
format
(
nRows
,
sql
))
return
nRows
return
nRows
def
query
(
self
,
sql
):
# return rows affected
def
query
(
self
,
sql
):
# return rows affected
return
self
.
execute
(
sql
)
return
self
.
execute
(
sql
)
...
@@ -336,6 +390,7 @@ class MyTDSql:
...
@@ -336,6 +390,7 @@ class MyTDSql:
raise
raise
return
self
.
affectedRows
return
self
.
affectedRows
class
DbTarget
:
class
DbTarget
:
def
__init__
(
self
,
cfgPath
,
hostAddr
,
port
):
def
__init__
(
self
,
cfgPath
,
hostAddr
,
port
):
self
.
cfgPath
=
cfgPath
self
.
cfgPath
=
cfgPath
...
@@ -355,6 +410,7 @@ class DbConnNative(DbConn):
...
@@ -355,6 +410,7 @@ class DbConnNative(DbConn):
# _connInfoDisplayed = False # TODO: find another way to display this
# _connInfoDisplayed = False # TODO: find another way to display this
totalConnections
=
0
# Not private
totalConnections
=
0
# Not private
totalRequests
=
0
totalRequests
=
0
time_cost
=
-
1
def
__init__
(
self
,
dbTarget
):
def
__init__
(
self
,
dbTarget
):
super
().
__init__
(
dbTarget
)
super
().
__init__
(
dbTarget
)
...
@@ -413,8 +469,18 @@ class DbConnNative(DbConn):
...
@@ -413,8 +469,18 @@ class DbConnNative(DbConn):
"Cannot exec SQL unless db connection is open"
,
CrashGenError
.
DB_CONNECTION_NOT_OPEN
)
"Cannot exec SQL unless db connection is open"
,
CrashGenError
.
DB_CONNECTION_NOT_OPEN
)
Logging
.
debug
(
"[SQL] Executing SQL: {}"
.
format
(
sql
))
Logging
.
debug
(
"[SQL] Executing SQL: {}"
.
format
(
sql
))
self
.
_lastSql
=
sql
self
.
_lastSql
=
sql
time_cost
=
-
1
nRows
=
0
time_start
=
time
.
time
()
self
.
saveSqlForCurrentThread
(
sql
)
# Save in global structure too. #TODO: combine with above
self
.
saveSqlForCurrentThread
(
sql
)
# Save in global structure too. #TODO: combine with above
nRows
=
self
.
_tdSql
.
execute
(
sql
)
try
:
nRows
=
self
.
_tdSql
.
execute
(
sql
)
except
Exception
as
e
:
self
.
sql_exec_spend
(
-
2
)
finally
:
time_cost
=
time
.
time
()
-
time_start
self
.
sql_exec_spend
(
time_cost
)
cls
=
self
.
__class__
cls
=
self
.
__class__
cls
.
totalRequests
+=
1
cls
.
totalRequests
+=
1
Logging
.
debug
(
Logging
.
debug
(
...
@@ -494,4 +560,3 @@ class DbManager():
...
@@ -494,4 +560,3 @@ class DbManager():
self
.
_dbConn
.
close
()
self
.
_dbConn
.
close
()
self
.
_dbConn
=
None
self
.
_dbConn
=
None
Logging
.
debug
(
"DbManager closed DB connection..."
)
Logging
.
debug
(
"DbManager closed DB connection..."
)
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录