Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
024ca23c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
024ca23c
编写于
3月 09, 2022
作者:
W
wenzhouwww@live.cn
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
<fix> : fix crash_gen support restful mode by deploy taosadapter
上级
19a19951
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
47 addition
and
13 deletion
+47
-13
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+11
-9
tests/pytest/crash_gen/service_manager.py
tests/pytest/crash_gen/service_manager.py
+32
-0
tests/pytest/crash_gen/shared/db.py
tests/pytest/crash_gen/shared/db.py
+4
-4
未找到文件。
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
024ca23c
...
@@ -84,10 +84,10 @@ class WorkerThread:
...
@@ -84,10 +84,10 @@ class WorkerThread:
# self._thread = threading.Thread(target=runThread, args=(self,))
# self._thread = threading.Thread(target=runThread, args=(self,))
self
.
_thread
=
threading
.
Thread
(
target
=
self
.
run
)
self
.
_thread
=
threading
.
Thread
(
target
=
self
.
run
)
self
.
_stepGate
=
threading
.
Event
()
self
.
_stepGate
=
threading
.
Event
()
# Let us have a DB connection of our own
# Let us have a DB connection of our own
if
(
Config
.
getConfig
().
per_thread_db_connection
):
# type: ignore
if
(
Config
.
getConfig
().
per_thread_db_connection
):
# type: ignore
# print("connector_type = {}".format(gConfig.connector_type))
# print("connector_type = {}".format(Config.getConfig().connector_type))
tInst
=
gContainer
.
defTdeInstance
tInst
=
gContainer
.
defTdeInstance
if
Config
.
getConfig
().
connector_type
==
'native'
:
if
Config
.
getConfig
().
connector_type
==
'native'
:
self
.
_dbConn
=
DbConn
.
createNative
(
tInst
.
getDbTarget
())
self
.
_dbConn
=
DbConn
.
createNative
(
tInst
.
getDbTarget
())
...
@@ -963,7 +963,7 @@ class StateMechine:
...
@@ -963,7 +963,7 @@ class StateMechine:
# did not do this when openning connection, and this is NOT the worker
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
# thread, which does this on their own
dbc
.
use
(
dbName
)
dbc
.
use
(
dbName
)
if
not
dbc
.
hasTables
():
# no tables
if
not
dbc
.
hasTables
(
dbName
):
# no tables
Logging
.
debug
(
"[STT] DB_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
Logging
.
debug
(
"[STT] DB_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateDbOnly
()
return
StateDbOnly
()
...
@@ -1434,6 +1434,7 @@ class Task():
...
@@ -1434,6 +1434,7 @@ class Task():
# TODO: refactor away, just provide the dbConn
# TODO: refactor away, just provide the dbConn
def
execWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
def
execWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
""" Haha """
""" Haha """
# print("thread %d runing sql is : %s " %(wt._tid , sql) )
return
wt
.
execSql
(
sql
)
return
wt
.
execSql
(
sql
)
def
queryWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
def
queryWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
...
@@ -1686,6 +1687,7 @@ class TdSuperTable:
...
@@ -1686,6 +1687,7 @@ class TdSuperTable:
def
__init__
(
self
,
stName
,
dbName
):
def
__init__
(
self
,
stName
,
dbName
):
self
.
_stName
=
stName
self
.
_stName
=
stName
self
.
_dbName
=
dbName
self
.
_dbName
=
dbName
self
.
_fullTableName
=
dbName
+
'.'
+
stName
def
getName
(
self
):
def
getName
(
self
):
return
self
.
_stName
return
self
.
_stName
...
@@ -1697,11 +1699,11 @@ class TdSuperTable:
...
@@ -1697,11 +1699,11 @@ class TdSuperTable:
dbc
.
execute
(
"DROP TaBLE {}"
.
format
(
fullTableName
))
dbc
.
execute
(
"DROP TaBLE {}"
.
format
(
fullTableName
))
else
:
else
:
if
not
skipCheck
:
if
not
skipCheck
:
raise
CrashGenError
(
"Cannot drop non-existant super table: {}"
.
format
(
self
.
_
st
Name
))
raise
CrashGenError
(
"Cannot drop non-existant super table: {}"
.
format
(
self
.
_
fullTable
Name
))
def
exists
(
self
,
dbc
):
def
exists
(
self
,
dbc
):
dbc
.
execute
(
"USE "
+
self
.
_dbName
)
dbc
.
execute
(
"USE "
+
self
.
_dbName
)
return
dbc
.
existsSuperTable
(
self
.
_
st
Name
)
return
dbc
.
existsSuperTable
(
self
.
_
dbName
,
self
.
_fullTable
Name
)
# TODO: odd semantic, create() method is usually static?
# TODO: odd semantic, create() method is usually static?
def
create
(
self
,
dbc
,
cols
:
TdColumns
,
tags
:
TdTags
,
dropIfExists
=
False
):
def
create
(
self
,
dbc
,
cols
:
TdColumns
,
tags
:
TdTags
,
dropIfExists
=
False
):
...
@@ -1710,11 +1712,11 @@ class TdSuperTable:
...
@@ -1710,11 +1712,11 @@ class TdSuperTable:
dbName
=
self
.
_dbName
dbName
=
self
.
_dbName
dbc
.
execute
(
"USE "
+
dbName
)
dbc
.
execute
(
"USE "
+
dbName
)
fullTableName
=
dbName
+
'.'
+
self
.
_stName
fullTableName
=
dbName
+
'.'
+
self
.
_stName
if
dbc
.
existsSuperTable
(
self
.
_
st
Name
):
if
dbc
.
existsSuperTable
(
self
.
_
dbName
,
self
.
_fullTable
Name
):
if
dropIfExists
:
if
dropIfExists
:
dbc
.
execute
(
"DROP TAbLE {}"
.
format
(
fullTableName
))
dbc
.
execute
(
"DROP TAbLE {}"
.
format
(
fullTableName
))
else
:
# error
else
:
# error
raise
CrashGenError
(
"Cannot create super table, already exists: {}"
.
format
(
self
.
_
st
Name
))
raise
CrashGenError
(
"Cannot create super table, already exists: {}"
.
format
(
self
.
_
fullTable
Name
))
# Now let's create
# Now let's create
sql
=
"CREATE TABLE {} ({})"
.
format
(
sql
=
"CREATE TABLE {} ({})"
.
format
(
...
@@ -2491,7 +2493,7 @@ class MainExec:
...
@@ -2491,7 +2493,7 @@ class MainExec:
action
=
'store'
,
action
=
'store'
,
default
=
'native'
,
default
=
'native'
,
type
=
str
,
type
=
str
,
help
=
'Connector type to use: native, rest, or mixed (default:
10
)'
)
help
=
'Connector type to use: native, rest, or mixed (default:
native
)'
)
parser
.
add_argument
(
parser
.
add_argument
(
'-d'
,
'-d'
,
'--debug'
,
'--debug'
,
...
@@ -2552,7 +2554,7 @@ class MainExec:
...
@@ -2552,7 +2554,7 @@ class MainExec:
'-r'
,
'-r'
,
'--record-ops'
,
'--record-ops'
,
action
=
'store_true'
,
action
=
'store_true'
,
help
=
'Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)'
)
help
=
'Use a pair of always-fsynced fil
e
s to record operations performing + performed, for power-off tests (default: false)'
)
parser
.
add_argument
(
parser
.
add_argument
(
'-s'
,
'-s'
,
'--max-steps'
,
'--max-steps'
,
...
...
tests/pytest/crash_gen/service_manager.py
浏览文件 @
024ca23c
...
@@ -165,6 +165,9 @@ quorum 2
...
@@ -165,6 +165,9 @@ quorum 2
def
getExecFile
(
self
):
# .../taosd
def
getExecFile
(
self
):
# .../taosd
return
self
.
_buildDir
+
"/build/bin/taosd"
return
self
.
_buildDir
+
"/build/bin/taosd"
def
getAdapterFile
(
self
):
# .../taosadapter for restful
return
self
.
_buildDir
+
"/build/bin/taosadapter"
def
getRunDir
(
self
)
->
DirPath
:
# TODO: rename to "root dir" ?!
def
getRunDir
(
self
)
->
DirPath
:
# TODO: rename to "root dir" ?!
if
Config
.
getConfig
().
set_path
==
''
:
# use default path
if
Config
.
getConfig
().
set_path
==
''
:
# use default path
return
DirPath
(
self
.
_buildDir
+
self
.
_subdir
)
return
DirPath
(
self
.
_buildDir
+
self
.
_subdir
)
...
@@ -188,6 +191,31 @@ quorum 2
...
@@ -188,6 +191,31 @@ quorum 2
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return
[
"exec "
+
self
.
getExecFile
(),
'-c'
,
self
.
getCfgDir
()]
# used in subproce.Popen()
return
[
"exec "
+
self
.
getExecFile
(),
'-c'
,
self
.
getCfgDir
()]
# used in subproce.Popen()
def
getAdapterCmdLine
(
self
):
REST_PORT_INCREMENT
=
11
Adapter_ports
=
str
(
self
.
_port
+
REST_PORT_INCREMENT
)
AdapterCmds
=
[
self
.
getAdapterFile
()
+
' --port='
+
Adapter_ports
+
' --log.path='
+
self
.
getLogDir
()
+
' --taosConfigDir='
+
self
.
getCfgDir
()
+
' --collectd.enable=false'
+
' --influxdb.enable=false --node_exporter.enable=false'
+
' --opentsdb.enable=false --statsd.enable=false'
+
' --prometheus.enable=false --opentsdb_telnet.enable=false'
]
# get adapter cmd string
return
AdapterCmds
def
start_Adapter
(
self
,
cmdLine
):
# print('nohup '+' '.join(cmdLine)+ '>>taosadapter.log 2>&1 &')
cmds
=
'nohup '
+
' '
.
join
(
cmdLine
)
+
'>>taosadapter.log 2>&1 &'
ret
=
Popen
(
cmds
,
shell
=
True
,
stdout
=
PIPE
,
stderr
=
PIPE
,
)
time
.
sleep
(
0.1
)
# very brief wait, then let's check if sub process started successfully.
if
ret
.
poll
():
raise
CrashGenError
(
"Sub process failed to start with command line: {}"
.
format
(
cmdLine
))
return
ret
def
_getDnodes
(
self
,
dbc
):
def
_getDnodes
(
self
,
dbc
):
dbc
.
query
(
"show dnodes"
)
dbc
.
query
(
"show dnodes"
)
cols
=
dbc
.
getQueryResult
()
# id,end_point,vnodes,cores,status,role,create_time,offline reason
cols
=
dbc
.
getQueryResult
()
# id,end_point,vnodes,cores,status,role,create_time,offline reason
...
@@ -230,6 +258,10 @@ quorum 2
...
@@ -230,6 +258,10 @@ quorum 2
# self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
# self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
self
.
_subProcess
=
TdeSubProcess
(
self
.
getServiceCmdLine
(),
self
.
getLogDir
())
self
.
_subProcess
=
TdeSubProcess
(
self
.
getServiceCmdLine
(),
self
.
getLogDir
())
# run taosadapter by subprocess ,taosadapter is stateless with TDengine ,so it don't need monitor
self
.
start_Adapter
(
self
.
getAdapterCmdLine
())
print
(
' '
.
join
(
self
.
getAdapterCmdLine
()))
def
stop
(
self
):
def
stop
(
self
):
self
.
_subProcess
.
stop
()
self
.
_subProcess
.
stop
()
self
.
_subProcess
=
None
self
.
_subProcess
=
None
...
...
tests/pytest/crash_gen/shared/db.py
浏览文件 @
024ca23c
...
@@ -100,13 +100,13 @@ class DbConn:
...
@@ -100,13 +100,13 @@ class DbConn:
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return
dbName
in
dbs
# TODO: super weird type mangling seen, once here
return
dbName
in
dbs
# TODO: super weird type mangling seen, once here
def
existsSuperTable
(
self
,
stName
):
def
existsSuperTable
(
self
,
dbName
,
stName
):
self
.
query
(
"show
stables"
)
self
.
query
(
f
"show
{
dbName
}
.
stables"
)
sts
=
[
v
[
0
]
for
v
in
self
.
getQueryResult
()]
sts
=
[
v
[
0
]
for
v
in
self
.
getQueryResult
()]
return
stName
in
sts
return
stName
in
sts
def
hasTables
(
self
):
def
hasTables
(
self
,
dbName
):
return
self
.
query
(
"show
tables"
)
>
0
return
self
.
query
(
f
"show
{
dbName
}
.
tables"
)
>
0
def
execute
(
self
,
sql
):
def
execute
(
self
,
sql
):
''' Return the number of rows affected'''
''' Return the number of rows affected'''
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录