Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
33765d5d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
33765d5d
编写于
4月 13, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
4月 13, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5796 from taosdata/feature/crash_gen_master
Moved over changes from feature/crash_gen to feature/crash_gen_master
上级
e2943da9
b6ee0c8f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
206 addition
and
70 deletion
+206
-70
tests/pytest/crash_gen/README.md
tests/pytest/crash_gen/README.md
+35
-9
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+9
-4
tests/pytest/crash_gen/service_manager.py
tests/pytest/crash_gen/service_manager.py
+162
-57
未找到文件。
tests/pytest/crash_gen/README.md
浏览文件 @
33765d5d
...
@@ -6,6 +6,25 @@ To effectively test and debug our TDengine product, we have developed a simple t
...
@@ -6,6 +6,25 @@ To effectively test and debug our TDengine product, we have developed a simple t
exercise various functions of the system in a randomized fashion, hoping to expose
exercise various functions of the system in a randomized fashion, hoping to expose
maximum number of problems, hopefully without a pre-determined scenario.
maximum number of problems, hopefully without a pre-determined scenario.
# Features
This tool can run as a test client with the following features:
1.
Any number of concurrent threads
1.
Any number of test steps/loops
1.
Auto-create and writing to multiple databases
1.
Ignore specific error codes
1.
Write small or large data blocks
1.
Auto-generate out-of-sequence data, if needed
1.
Verify the result of write operations
1.
Concurrent writing to a shadow database for later data verification
1.
User specified number of replicas to use, against clusters
This tool can also use to start a TDengine service, either in stand-alone mode or
cluster mode. The features include:
1.
User specified number of D-Nodes to create/use.
# Preparation
# Preparation
To run this tool, please ensure the followed preparation work is done first.
To run this tool, please ensure the followed preparation work is done first.
...
@@ -16,7 +35,7 @@ To run this tool, please ensure the followed preparation work is done first.
...
@@ -16,7 +35,7 @@ To run this tool, please ensure the followed preparation work is done first.
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
an environment if possible.
an environment if possible.
# Simple Execution
# Simple Execution
as Client Test Tool
To run the tool with the simplest method, follow the steps below:
To run the tool with the simplest method, follow the steps below:
...
@@ -28,19 +47,21 @@ To run the tool with the simplest method, follow the steps below:
...
@@ -28,19 +47,21 @@ To run the tool with the simplest method, follow the steps below:
That's it!
That's it!
# Running Clusters
# Running
Server-side
Clusters
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
can start a cluster quite easily with the following command:
can start a cluster quite easily with the following command:
```
```
$ cd tests/pytest/
$ cd tests/pytest/
$
./crash_gen.sh -e -o 3
$
rm -rf ../../build/cluster_dnode_?; ./crash_gen.sh -e -o 3 # first part optional
```
```
The
`-e`
option above tells the tool to start the service, and do not run any tests, while
The
`-e`
option above tells the tool to start the service, and do not run any tests, while
the
`-o 3`
option tells the tool to start 3 DNodes and join them together in a cluster.
the
`-o 3`
option tells the tool to start 3 DNodes and join them together in a cluster.
Obviously you can adjust the the number here.
Obviously you can adjust the the number here. The
`rm -rf`
command line is optional
to clean up previous cluster data, so that we can start from a clean state with no data
at all.
## Behind the Scenes
## Behind the Scenes
...
@@ -89,8 +110,9 @@ The exhaustive features of the tool is available through the `-h` option:
...
@@ -89,8 +110,9 @@ The exhaustive features of the tool is available through the `-h` option:
```
```
$ ./crash_gen.sh -h
$ ./crash_gen.sh -h
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS]
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
[-i NUM_REPLICAS] [-k] [-l] [-m] [-n]
[-o NUM_DNODES] [-p] [-r] [-s MAX_STEPS] [-t NUM_THREADS] [-v] [-w] [-x]
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
---------------------------------------------------------------------
---------------------------------------------------------------------
...
@@ -109,11 +131,14 @@ optional arguments:
...
@@ -109,11 +131,14 @@ optional arguments:
-e, --run-tdengine Run TDengine service in foreground (default: false)
-e, --run-tdengine Run TDengine service in foreground (default: false)
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
Ignore error codes, comma separated, 0x supported (default: None)
Ignore error codes, comma separated, 0x supported (default: None)
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
-i NUM_REPLICAS, --num-replicas NUM_REPLICAS
Maximum number of replicas to use, when testing against clusters. (default: 1)
Number (fixed) of replicas to use, when testing against clusters. (default: 1)
-k, --track-memory-leaks
Use Valgrind tool to track memory leaks (default: false)
-l, --larger-data Write larger amount of data during write operations (default: false)
-l, --larger-data Write larger amount of data during write operations (default: false)
-m, --mix-oos-data Mix out-of-sequence data into the test data stream (default: true)
-n, --dynamic-db-table-names
-n, --dynamic-db-table-names
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
Use non-fixed names for dbs/tables,
for -b,
useful for multi-instance executions (default: false)
-o NUM_DNODES, --num-dnodes NUM_DNODES
-o NUM_DNODES, --num-dnodes NUM_DNODES
Number of Dnodes to initialize, used with -e option. (default: 1)
Number of Dnodes to initialize, used with -e option. (default: 1)
-p, --per-thread-db-connection
-p, --per-thread-db-connection
...
@@ -124,6 +149,7 @@ optional arguments:
...
@@ -124,6 +149,7 @@ optional arguments:
-t NUM_THREADS, --num-threads NUM_THREADS
-t NUM_THREADS, --num-threads NUM_THREADS
Number of threads to run (default: 10)
Number of threads to run (default: 10)
-v, --verify-data Verify data written in a number of places by reading back (default: false)
-v, --verify-data Verify data written in a number of places by reading back (default: false)
-w, --use-shadow-db Use a shaddow database to verify data integrity (default: false)
-x, --continue-on-exception
-x, --continue-on-exception
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
```
```
...
...
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
33765d5d
...
@@ -1574,9 +1574,9 @@ class TaskCreateDb(StateTransitionTask):
...
@@ -1574,9 +1574,9 @@ class TaskCreateDb(StateTransitionTask):
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# was: self.execWtSql(wt, "create database db")
# was: self.execWtSql(wt, "create database db")
repStr
=
""
repStr
=
""
if
gConfig
.
max
_replicas
!=
1
:
if
gConfig
.
num
_replicas
!=
1
:
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
numReplica
=
gConfig
.
max
_replicas
# fixed, always
numReplica
=
gConfig
.
num
_replicas
# fixed, always
repStr
=
"replica {}"
.
format
(
numReplica
)
repStr
=
"replica {}"
.
format
(
numReplica
)
updatePostfix
=
"update 1"
if
gConfig
.
verify_data
else
""
# allow update only when "verify data" is active
updatePostfix
=
"update 1"
if
gConfig
.
verify_data
else
""
# allow update only when "verify data" is active
dbName
=
self
.
_db
.
getName
()
dbName
=
self
.
_db
.
getName
()
...
@@ -2394,11 +2394,16 @@ class MainExec:
...
@@ -2394,11 +2394,16 @@ class MainExec:
help
=
'Ignore error codes, comma separated, 0x supported (default: None)'
)
help
=
'Ignore error codes, comma separated, 0x supported (default: None)'
)
parser
.
add_argument
(
parser
.
add_argument
(
'-i'
,
'-i'
,
'--
max
-replicas'
,
'--
num
-replicas'
,
action
=
'store'
,
action
=
'store'
,
default
=
1
,
default
=
1
,
type
=
int
,
type
=
int
,
help
=
'Maximum number of replicas to use, when testing against clusters. (default: 1)'
)
help
=
'Number (fixed) of replicas to use, when testing against clusters. (default: 1)'
)
parser
.
add_argument
(
'-k'
,
'--track-memory-leaks'
,
action
=
'store_true'
,
help
=
'Use Valgrind tool to track memory leaks (default: false)'
)
parser
.
add_argument
(
parser
.
add_argument
(
'-l'
,
'-l'
,
'--larger-data'
,
'--larger-data'
,
...
...
tests/pytest/crash_gen/service_manager.py
浏览文件 @
33765d5d
...
@@ -19,6 +19,7 @@ from queue import Queue, Empty
...
@@ -19,6 +19,7 @@ from queue import Queue, Empty
from
.misc
import
Logging
,
Status
,
CrashGenError
,
Dice
,
Helper
,
Progress
from
.misc
import
Logging
,
Status
,
CrashGenError
,
Dice
,
Helper
,
Progress
from
.db
import
DbConn
,
DbTarget
from
.db
import
DbConn
,
DbTarget
import
crash_gen.settings
class
TdeInstance
():
class
TdeInstance
():
"""
"""
...
@@ -132,6 +133,7 @@ keep 36500
...
@@ -132,6 +133,7 @@ keep 36500
walLevel 1
walLevel 1
#
#
# maxConnections 100
# maxConnections 100
quorum 2
"""
"""
cfgContent
=
cfgTemplate
.
format_map
(
cfgValues
)
cfgContent
=
cfgTemplate
.
format_map
(
cfgValues
)
f
=
open
(
cfgFile
,
"w"
)
f
=
open
(
cfgFile
,
"w"
)
...
@@ -164,7 +166,12 @@ walLevel 1
...
@@ -164,7 +166,12 @@ walLevel 1
return
"127.0.0.1"
return
"127.0.0.1"
def
getServiceCmdLine
(
self
):
# to start the instance
def
getServiceCmdLine
(
self
):
# to start the instance
return
[
self
.
getExecFile
(),
'-c'
,
self
.
getCfgDir
()]
# used in subproce.Popen()
cmdLine
=
[]
if
crash_gen
.
settings
.
gConfig
.
track_memory_leaks
:
Logging
.
info
(
"Invoking VALGRIND on service..."
)
cmdLine
=
[
'valgrind'
,
'--leak-check=yes'
]
cmdLine
+=
[
"exec "
+
self
.
getExecFile
(),
'-c'
,
self
.
getCfgDir
()]
# used in subproce.Popen()
return
cmdLine
def
_getDnodes
(
self
,
dbc
):
def
_getDnodes
(
self
,
dbc
):
dbc
.
query
(
"show dnodes"
)
dbc
.
query
(
"show dnodes"
)
...
@@ -202,7 +209,7 @@ walLevel 1
...
@@ -202,7 +209,7 @@ walLevel 1
self
.
generateCfgFile
()
# service side generates config file, client does not
self
.
generateCfgFile
()
# service side generates config file, client does not
self
.
rotateLogs
()
self
.
rotateLogs
()
self
.
_smThread
.
start
(
self
.
getServiceCmdLine
()
)
self
.
_smThread
.
start
(
self
.
getServiceCmdLine
()
,
self
.
getLogDir
())
# May raise exceptions
def
stop
(
self
):
def
stop
(
self
):
self
.
_smThread
.
stop
()
self
.
_smThread
.
stop
()
...
@@ -225,7 +232,7 @@ class TdeSubProcess:
...
@@ -225,7 +232,7 @@ class TdeSubProcess:
# RET_SUCCESS = -4
# RET_SUCCESS = -4
def
__init__
(
self
):
def
__init__
(
self
):
self
.
subProcess
=
None
self
.
subProcess
=
None
# type: subprocess.Popen
# if tInst is None:
# if tInst is None:
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
# self._tInst = tInst # Default create at ServiceManagerThread
# self._tInst = tInst # Default create at ServiceManagerThread
...
@@ -263,7 +270,7 @@ class TdeSubProcess:
...
@@ -263,7 +270,7 @@ class TdeSubProcess:
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell
=
True
useShell
=
True
# Needed to pass environments into it
self
.
subProcess
=
subprocess
.
Popen
(
self
.
subProcess
=
subprocess
.
Popen
(
# ' '.join(cmdLine) if useShell else cmdLine,
# ' '.join(cmdLine) if useShell else cmdLine,
# shell=useShell,
# shell=useShell,
...
@@ -276,12 +283,12 @@ class TdeSubProcess:
...
@@ -276,12 +283,12 @@ class TdeSubProcess:
env
=
myEnv
env
=
myEnv
)
# had text=True, which interferred with reading EOF
)
# had text=True, which interferred with reading EOF
STOP_SIGNAL
=
signal
.
SIG
KILL
# signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
STOP_SIGNAL
=
signal
.
SIG
INT
# signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE
=
137
# ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
SIG_KILL_RETCODE
=
137
# ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
def
stop
(
self
):
def
stop
(
self
):
"""
"""
Stop a sub process,
and try to return a meaningful return code.
Stop a sub process,
DO NOT return anything, process all conditions INSIDE
Common POSIX signal values (from man -7 signal):
Common POSIX signal values (from man -7 signal):
SIGHUP 1
SIGHUP 1
...
@@ -301,40 +308,99 @@ class TdeSubProcess:
...
@@ -301,40 +308,99 @@ class TdeSubProcess:
"""
"""
if
not
self
.
subProcess
:
if
not
self
.
subProcess
:
Logging
.
error
(
"Sub process already stopped"
)
Logging
.
error
(
"Sub process already stopped"
)
return
# -1
return
retCode
=
self
.
subProcess
.
poll
()
# ret -N means killed with signal N, otherwise it's from exit(N)
retCode
=
self
.
subProcess
.
poll
()
# ret -N means killed with signal N, otherwise it's from exit(N)
if
retCode
:
# valid return code, process ended
if
retCode
:
# valid return code, process ended
retCode
=
-
retCode
# only if valid
#
retCode = -retCode # only if valid
Logging
.
warning
(
"TSP.stop(): process ended itself"
)
Logging
.
warning
(
"TSP.stop(): process ended itself"
)
self
.
subProcess
=
None
self
.
subProcess
=
None
return
retCode
return
# process still alive, let's interrupt it
# process still alive, let's interrupt it
Logging
.
info
(
"Terminate running process, send SIG_{} and wait..."
.
format
(
self
.
STOP_SIGNAL
))
self
.
_stopForSure
(
self
.
subProcess
,
self
.
STOP_SIGNAL
)
# success if no exception
self
.
subProcess
=
None
# sub process should end, then IPC queue should end, causing IO thread to end
# sub process should end, then IPC queue should end, causing IO thread to end
topSubProc
=
psutil
.
Process
(
self
.
subProcess
.
pid
)
for
child
in
topSubProc
.
children
(
recursive
=
True
):
# or parent.children() for recursive=False
@
classmethod
child
.
send_signal
(
self
.
STOP_SIGNAL
)
def
_stopForSure
(
cls
,
proc
:
subprocess
.
Popen
,
sig
:
int
):
time
.
sleep
(
0.2
)
# 200 ms
'''
# topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell)
Stop a process and all sub processes with a singal, and SIGKILL if necessary
'''
self
.
subProcess
.
send_signal
(
self
.
STOP_SIGNAL
)
# main sub process (likely the Shell)
def
doKillTdService
(
proc
:
subprocess
.
Popen
,
sig
:
int
):
self
.
subProcess
.
wait
(
20
)
Logging
.
info
(
"Killing sub-sub process {} with signal {}"
.
format
(
proc
.
pid
,
sig
))
retCode
=
self
.
subProcess
.
returncode
# should always be there
proc
.
send_signal
(
sig
)
# May throw subprocess.TimeoutExpired exception above, therefore
try
:
# The process is guranteed to have ended by now
retCode
=
proc
.
wait
(
20
)
self
.
subProcess
=
None
if
(
-
retCode
)
==
signal
.
SIGSEGV
:
# Crashed
if
retCode
==
self
.
SIG_KILL_RETCODE
:
Logging
.
warning
(
"Process {} CRASHED, please check CORE file!"
.
format
(
proc
.
pid
))
Logging
.
info
(
"TSP.stop(): sub proc KILLED, as expected"
)
elif
(
-
retCode
)
==
sig
:
elif
retCode
==
(
-
self
.
STOP_SIGNAL
):
Logging
.
info
(
"TD service terminated with expected return code {}"
.
format
(
sig
))
Logging
.
info
(
"TSP.stop(), sub process STOPPED, as expected"
)
else
:
elif
retCode
!=
0
:
# != (- signal.SIGINT):
Logging
.
warning
(
"TD service terminated, EXPECTING ret code {}, got {}"
.
format
(
sig
,
-
retCode
))
Logging
.
error
(
"TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}"
.
format
(
return
True
# terminated successfully
self
.
STOP_SIGNAL
,
retCode
))
except
subprocess
.
TimeoutExpired
as
err
:
else
:
Logging
.
warning
(
"Failed to kill sub-sub process {} with signal {}"
.
format
(
proc
.
pid
,
sig
))
Logging
.
info
(
"TSP.stop(): sub proc successfully terminated with SIG {}"
.
format
(
self
.
STOP_SIGNAL
))
return
False
# failed to terminate
return
-
retCode
def
doKillChild
(
child
:
psutil
.
Process
,
sig
:
int
):
Logging
.
info
(
"Killing sub-sub process {} with signal {}"
.
format
(
child
.
pid
,
sig
))
child
.
send_signal
(
sig
)
try
:
retCode
=
child
.
wait
(
20
)
if
(
-
retCode
)
==
signal
.
SIGSEGV
:
# Crashed
Logging
.
warning
(
"Process {} CRASHED, please check CORE file!"
.
format
(
child
.
pid
))
elif
(
-
retCode
)
==
sig
:
Logging
.
info
(
"Sub-sub process terminated with expected return code {}"
.
format
(
sig
))
else
:
Logging
.
warning
(
"Process terminated, EXPECTING ret code {}, got {}"
.
format
(
sig
,
-
retCode
))
return
True
# terminated successfully
except
psutil
.
TimeoutExpired
as
err
:
Logging
.
warning
(
"Failed to kill sub-sub process {} with signal {}"
.
format
(
child
.
pid
,
sig
))
return
False
# did not terminate
def
doKill
(
proc
:
subprocess
.
Popen
,
sig
:
int
):
pid
=
proc
.
pid
try
:
topSubProc
=
psutil
.
Process
(
pid
)
for
child
in
topSubProc
.
children
(
recursive
=
True
):
# or parent.children() for recursive=False
Logging
.
warning
(
"Unexpected child to be killed"
)
doKillChild
(
child
,
sig
)
except
psutil
.
NoSuchProcess
as
err
:
Logging
.
info
(
"Process not found, can't kill, pid = {}"
.
format
(
pid
))
return
doKillTdService
(
proc
,
sig
)
# TODO: re-examine if we need to kill the top process, which is always the SHELL for now
# try:
# proc.wait(1) # SHELL process here, may throw subprocess.TimeoutExpired exception
# # expRetCode = self.SIG_KILL_RETCODE if sig==signal.SIGKILL else (-sig)
# # if retCode == expRetCode:
# # Logging.info("Process terminated with expected return code {}".format(retCode))
# # else:
# # Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(expRetCode, retCode))
# # return True # success
# except subprocess.TimeoutExpired as err:
# Logging.warning("Failed to kill process {} with signal {}".format(pid, sig))
# return False # failed to kill
def
softKill
(
proc
,
sig
):
return
doKill
(
proc
,
sig
)
def
hardKill
(
proc
):
return
doKill
(
proc
,
signal
.
SIGKILL
)
pid
=
proc
.
pid
Logging
.
info
(
"Terminate running processes under {}, with SIG #{} and wait..."
.
format
(
pid
,
sig
))
if
softKill
(
proc
,
sig
):
return
# success
if
sig
!=
signal
.
SIGKILL
:
# really was soft above
if
hardKill
(
proc
):
return
raise
CrashGenError
(
"Failed to stop process, pid={}"
.
format
(
pid
))
class
ServiceManager
:
class
ServiceManager
:
PAUSE_BETWEEN_IPC_CHECK
=
1.2
# seconds between checks on STDOUT of sub process
PAUSE_BETWEEN_IPC_CHECK
=
1.2
# seconds between checks on STDOUT of sub process
...
@@ -560,7 +626,8 @@ class ServiceManagerThread:
...
@@ -560,7 +626,8 @@ class ServiceManagerThread:
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
# self._tInst = tInst or TdeInstance() # Need an instance
# self._tInst = tInst or TdeInstance() # Need an instance
self
.
_thread
=
None
# The actual thread, # type: threading.Thread
self
.
_thread
=
None
# The actual thread, # type: threading.Thread
self
.
_thread2
=
None
# watching stderr
self
.
_status
=
Status
(
Status
.
STATUS_STOPPED
)
# The status of the underlying service, actually.
self
.
_status
=
Status
(
Status
.
STATUS_STOPPED
)
# The status of the underlying service, actually.
def
__repr__
(
self
):
def
__repr__
(
self
):
...
@@ -568,11 +635,20 @@ class ServiceManagerThread:
...
@@ -568,11 +635,20 @@ class ServiceManagerThread:
self
.
getStatus
(),
self
.
_tdeSubProcess
)
self
.
getStatus
(),
self
.
_tdeSubProcess
)
def
getStatus
(
self
):
def
getStatus
(
self
):
'''
Get the status of the process being managed. (misnomer alert!)
'''
return
self
.
_status
return
self
.
_status
# Start the thread (with sub process), and wait for the sub service
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
# to become fully operational
def
start
(
self
,
cmdLine
):
def
start
(
self
,
cmdLine
:
str
,
logDir
:
str
):
'''
Request the manager thread to start a new sub process, and manage it.
:param cmdLine: the command line to invoke
:param logDir: the logging directory, to hold stdout/stderr files
'''
if
self
.
_thread
:
if
self
.
_thread
:
raise
RuntimeError
(
"Unexpected _thread"
)
raise
RuntimeError
(
"Unexpected _thread"
)
if
self
.
_tdeSubProcess
:
if
self
.
_tdeSubProcess
:
...
@@ -582,20 +658,30 @@ class ServiceManagerThread:
...
@@ -582,20 +658,30 @@ class ServiceManagerThread:
self
.
_status
.
set
(
Status
.
STATUS_STARTING
)
self
.
_status
.
set
(
Status
.
STATUS_STARTING
)
self
.
_tdeSubProcess
=
TdeSubProcess
()
self
.
_tdeSubProcess
=
TdeSubProcess
()
self
.
_tdeSubProcess
.
start
(
cmdLine
)
self
.
_tdeSubProcess
.
start
(
cmdLine
)
# TODO: verify process is running
self
.
_ipcQueue
=
Queue
()
self
.
_ipcQueue
=
Queue
()
self
.
_thread
=
threading
.
Thread
(
# First thread captures server OUTPUT
self
.
_thread
=
threading
.
Thread
(
# First thread captures server OUTPUT
target
=
self
.
svcOutputReader
,
target
=
self
.
svcOutputReader
,
args
=
(
self
.
_tdeSubProcess
.
getStdOut
(),
self
.
_ipcQueue
))
args
=
(
self
.
_tdeSubProcess
.
getStdOut
(),
self
.
_ipcQueue
,
logDir
))
self
.
_thread
.
daemon
=
True
# thread dies with the program
self
.
_thread
.
daemon
=
True
# thread dies with the program
self
.
_thread
.
start
()
self
.
_thread
.
start
()
time
.
sleep
(
0.01
)
if
not
self
.
_thread
.
is_alive
():
# What happened?
Logging
.
info
(
"Failed to started process to monitor STDOUT"
)
self
.
stop
()
raise
CrashGenError
(
"Failed to start thread to monitor STDOUT"
)
Logging
.
info
(
"Successfully started process to monitor STDOUT"
)
self
.
_thread2
=
threading
.
Thread
(
# 2nd thread captures server ERRORs
self
.
_thread2
=
threading
.
Thread
(
# 2nd thread captures server ERRORs
target
=
self
.
svcErrorReader
,
target
=
self
.
svcErrorReader
,
args
=
(
self
.
_tdeSubProcess
.
getStdErr
(),
self
.
_ipcQueue
))
args
=
(
self
.
_tdeSubProcess
.
getStdErr
(),
self
.
_ipcQueue
,
logDir
))
self
.
_thread2
.
daemon
=
True
# thread dies with the program
self
.
_thread2
.
daemon
=
True
# thread dies with the program
self
.
_thread2
.
start
()
self
.
_thread2
.
start
()
time
.
sleep
(
0.01
)
if
not
self
.
_thread2
.
is_alive
():
self
.
stop
()
raise
CrashGenError
(
"Failed to start thread to monitor STDERR"
)
# wait for service to start
# wait for service to start
for
i
in
range
(
0
,
100
):
for
i
in
range
(
0
,
100
):
...
@@ -643,7 +729,7 @@ class ServiceManagerThread:
...
@@ -643,7 +729,7 @@ class ServiceManagerThread:
Logging
.
info
(
"Service already stopped"
)
Logging
.
info
(
"Service already stopped"
)
return
return
if
self
.
getStatus
().
isStopping
():
if
self
.
getStatus
().
isStopping
():
Logging
.
info
(
"Service is already being stopped
"
)
Logging
.
info
(
"Service is already being stopped
, pid: {}"
.
format
(
self
.
_tdeSubProcess
.
getPid
())
)
return
return
# Linux will send Control-C generated SIGINT to the TDengine process
# Linux will send Control-C generated SIGINT to the TDengine process
# already, ref:
# already, ref:
...
@@ -653,14 +739,14 @@ class ServiceManagerThread:
...
@@ -653,14 +739,14 @@ class ServiceManagerThread:
self
.
_status
.
set
(
Status
.
STATUS_STOPPING
)
self
.
_status
.
set
(
Status
.
STATUS_STOPPING
)
# retCode = self._tdeSubProcess.stop()
# retCode = self._tdeSubProcess.stop()
try
:
#
try:
retCode
=
self
.
_tdeSubProcess
.
stop
()
#
retCode = self._tdeSubProcess.stop()
# print("Attempted to stop sub process, got return code: {}".format(retCode))
#
# print("Attempted to stop sub process, got return code: {}".format(retCode))
if
retCode
==
signal
.
SIGSEGV
:
# SGV
#
if retCode == signal.SIGSEGV : # SGV
Logging
.
error
(
"[[--ERROR--]]: TDengine service SEGV fault (check core file!)"
)
#
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
except
subprocess
.
TimeoutExpired
as
err
:
#
except subprocess.TimeoutExpired as err:
Logging
.
info
(
"Time out waiting for TDengine service process to exit"
)
#
Logging.info("Time out waiting for TDengine service process to exit")
else
:
if
not
self
.
_tdeSubProcess
.
stop
():
# everything withing
if
self
.
_tdeSubProcess
.
isRunning
():
# still running, should now never happen
if
self
.
_tdeSubProcess
.
isRunning
():
# still running, should now never happen
Logging
.
error
(
"FAILED to stop sub process, it is still running... pid = {}"
.
format
(
Logging
.
error
(
"FAILED to stop sub process, it is still running... pid = {}"
.
format
(
self
.
_tdeSubProcess
.
getPid
()))
self
.
_tdeSubProcess
.
getPid
()))
...
@@ -683,16 +769,18 @@ class ServiceManagerThread:
...
@@ -683,16 +769,18 @@ class ServiceManagerThread:
raise
RuntimeError
(
raise
RuntimeError
(
"SMT.Join(): Unexpected status: {}"
.
format
(
self
.
_status
))
"SMT.Join(): Unexpected status: {}"
.
format
(
self
.
_status
))
if
self
.
_thread
:
if
self
.
_thread
or
self
.
_thread2
:
self
.
_thread
.
join
()
if
self
.
_thread
:
self
.
_thread
=
None
self
.
_thread
.
join
()
self
.
_status
.
set
(
Status
.
STATUS_STOPPED
)
self
.
_thread
=
None
# STD ERR thread
if
self
.
_thread2
:
# STD ERR thread
self
.
_thread2
.
join
()
self
.
_thread2
.
join
()
self
.
_thread2
=
None
self
.
_thread2
=
None
else
:
else
:
print
(
"Joining empty thread, doing nothing"
)
print
(
"Joining empty thread, doing nothing"
)
self
.
_status
.
set
(
Status
.
STATUS_STOPPED
)
def
_trimQueue
(
self
,
targetSize
):
def
_trimQueue
(
self
,
targetSize
):
if
targetSize
<=
0
:
if
targetSize
<=
0
:
return
# do nothing
return
# do nothing
...
@@ -739,11 +827,22 @@ class ServiceManagerThread:
...
@@ -739,11 +827,22 @@ class ServiceManagerThread:
print
(
pBar
,
end
=
""
,
flush
=
True
)
print
(
pBar
,
end
=
""
,
flush
=
True
)
print
(
'
\b\b\b\b
'
,
end
=
""
,
flush
=
True
)
print
(
'
\b\b\b\b
'
,
end
=
""
,
flush
=
True
)
def
svcOutputReader
(
self
,
out
:
IO
,
queue
):
def
svcOutputReader
(
self
,
out
:
IO
,
queue
,
logDir
:
str
):
'''
The infinite routine that processes the STDOUT stream for the sub process being managed.
:param out: the IO stream object used to fetch the data from
:param queue: the queue where we dump the roughly parsed line-by-line data
:param logDir: where we should dump a verbatim output file
'''
os
.
makedirs
(
logDir
,
exist_ok
=
True
)
logFile
=
os
.
path
.
join
(
logDir
,
'stdout.log'
)
fOut
=
open
(
logFile
,
'wb'
)
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# print("This is the svcOutput Reader...")
# for line in out :
# for line in out :
for
line
in
iter
(
out
.
readline
,
b
''
):
for
line
in
iter
(
out
.
readline
,
b
''
):
fOut
.
write
(
line
)
# print("Finished reading a line: {}".format(line))
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
# print("Adding item to queue...")
try
:
try
:
...
@@ -772,10 +871,16 @@ class ServiceManagerThread:
...
@@ -772,10 +871,16 @@ class ServiceManagerThread:
# queue.put(line)
# queue.put(line)
# meaning sub process must have died
# meaning sub process must have died
Logging
.
info
(
"EOF for TDengine STDOUT: {}"
.
format
(
self
))
Logging
.
info
(
"EOF for TDengine STDOUT: {}"
.
format
(
self
))
out
.
close
()
out
.
close
()
# Close the stream
fOut
.
close
()
# Close the output file
def
svcErrorReader
(
self
,
err
:
IO
,
queue
):
def
svcErrorReader
(
self
,
err
:
IO
,
queue
,
logDir
:
str
):
os
.
makedirs
(
logDir
,
exist_ok
=
True
)
logFile
=
os
.
path
.
join
(
logDir
,
'stderr.log'
)
fErr
=
open
(
logFile
,
'wb'
)
for
line
in
iter
(
err
.
readline
,
b
''
):
for
line
in
iter
(
err
.
readline
,
b
''
):
fErr
.
write
(
line
)
Logging
.
info
(
"TDengine STDERR: {}"
.
format
(
line
))
Logging
.
info
(
"TDengine STDERR: {}"
.
format
(
line
))
Logging
.
info
(
"EOF for TDengine STDERR: {}"
.
format
(
self
))
Logging
.
info
(
"EOF for TDengine STDERR: {}"
.
format
(
self
))
err
.
close
()
err
.
close
()
\ No newline at end of file
fErr
.
close
()
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录