Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
82b21d6e
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
82b21d6e
编写于
10月 11, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
10月 11, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3585 from taosdata/feature/crash_gen
Added valgrind memory check to crash_gen tool
上级
16782bff
76dc969f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
17735 addition
and
246 deletion
+17735
-246
tests/pytest/crash_gen.sh
tests/pytest/crash_gen.sh
+28
-2
tests/pytest/crash_gen/crash_gen.py
tests/pytest/crash_gen/crash_gen.py
+458
-244
tests/pytest/crash_gen/valgrind_taos.supp
tests/pytest/crash_gen/valgrind_taos.supp
+17249
-0
未找到文件。
tests/pytest/crash_gen.sh
浏览文件 @
82b21d6e
...
...
@@ -42,11 +42,37 @@ TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
LIB_DIR
=
`
echo
$TAOSD_DIR
|rev|cut
-d
'/'
-f
3,4,5,6|rev
`
/lib
# Now getting ready to execute Python
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
PYTHON_EXEC
=
python3.8
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
export
PYTHONPATH
=
$(
pwd
)
/../../src/connector/python/linux/python3
export
PYTHONPATH
=
$(
pwd
)
/../../src/connector/python/linux/python3
:
$(
pwd
)
# Then let us set up the library path so that our compiled SO file can be loaded by Python
export
LD_LIBRARY_PATH
=
$LD_LIBRARY_PATH
:
$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
python3.8 ./crash_gen.py
$@
if
[[
$1
==
'--valgrind'
]]
;
then
shift
export
PYTHONMALLOC
=
malloc
VALGRIND_OUT
=
valgrind.out
VALGRIND_ERR
=
valgrind.err
# How to generate valgrind suppression file: https://stackoverflow.com/questions/17159578/generating-suppressions-for-memory-leaks
# valgrind --leak-check=full --gen-suppressions=all --log-fd=9 python3.8 ./crash_gen.py $@ 9>>memcheck.log
echo
Executing under VALGRIND, with STDOUT/ERR going to
$VALGRIND_OUT
and
$VALGRIND_ERR
, please watch them from a different terminal.
valgrind
\
--leak-check
=
yes
\
--suppressions
=
crash_gen/valgrind_taos.supp
\
$PYTHON_EXEC
\
./crash_gen/crash_gen.py
$@
>
$VALGRIND_OUT
2>
$VALGRIND_ERR
elif
[[
$1
==
'--helgrind'
]]
;
then
shift
valgrind
\
--tool
=
helgrind
\
$PYTHON_EXEC
\
./crash_gen/crash_gen.py
$@
else
$PYTHON_EXEC
./crash_gen/crash_gen.py
$@
fi
tests/pytest/crash_gen.py
→
tests/pytest/crash_gen
/crash_gen
.py
浏览文件 @
82b21d6e
...
...
@@ -15,7 +15,6 @@
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from
__future__
import
annotations
import
taos
import
crash_gen
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
...
...
@@ -42,6 +41,9 @@ import os
import
io
import
signal
import
traceback
import
resource
from
guppy
import
hpy
import
gc
try
:
import
psutil
...
...
@@ -53,14 +55,13 @@ except:
if
sys
.
version_info
[
0
]
<
3
:
raise
Exception
(
"Must be using Python 3"
)
# Global variables, tried to keep a small number.
# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
gConfig
=
argparse
.
Namespace
()
# Dummy value, will be replaced later
gSvcMgr
=
None
# TODO: refactor this hack, use dep injection
logger
=
None
logger
=
None
# type: Logger
def
runThread
(
wt
:
WorkerThread
):
wt
.
run
()
...
...
@@ -101,7 +102,7 @@ class WorkerThread:
else
:
raise
RuntimeError
(
"Unexpected connector type: {}"
.
format
(
gConfig
.
connector_type
))
self
.
_dbInUse
=
False
# if "use db" was executed already
#
self._dbInUse = False # if "use db" was executed already
def
logDebug
(
self
,
msg
):
logger
.
debug
(
" TRD[{}] {}"
.
format
(
self
.
_tid
,
msg
))
...
...
@@ -109,13 +110,13 @@ class WorkerThread:
def
logInfo
(
self
,
msg
):
logger
.
info
(
" TRD[{}] {}"
.
format
(
self
.
_tid
,
msg
))
def
dbInUse
(
self
):
return
self
.
_dbInUse
#
def dbInUse(self):
#
return self._dbInUse
def
useDb
(
self
):
if
(
not
self
.
_dbInUse
):
self
.
execSql
(
"use db"
)
self
.
_dbInUse
=
True
#
def useDb(self):
#
if (not self._dbInUse):
#
self.execSql("use db")
#
self._dbInUse = True
def
getTaskExecutor
(
self
):
return
self
.
_tc
.
getTaskExecutor
()
...
...
@@ -161,12 +162,12 @@ class WorkerThread:
logger
.
debug
(
"[TRD] Thread Coordinator not running any more, worker thread now stopping..."
)
break
# Before we fetch the task and run it, let's ensure we properly "use" the database
# Before we fetch the task and run it, let's ensure we properly "use" the database
(not needed any more)
try
:
if
(
gConfig
.
per_thread_db_connection
):
# most likely TRUE
if
not
self
.
_dbConn
.
isOpen
:
# might have been closed during server auto-restart
self
.
_dbConn
.
open
()
self
.
useDb
()
# might encounter exceptions. TODO: catch
#
self.useDb() # might encounter exceptions. TODO: catch
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
if
errno
in
[
0x383
,
0x386
,
0x00B
,
0x014
]
:
# invalid database, dropping, Unable to establish connection, Database not ready
...
...
@@ -181,14 +182,13 @@ class WorkerThread:
task
=
tc
.
fetchTask
()
# Execute such a task
logger
.
debug
(
"[TRD] Worker thread [{}] about to execute task: {}"
.
format
(
logger
.
debug
(
"[TRD] Worker thread [{}] about to execute task: {}"
.
format
(
self
.
_tid
,
task
.
__class__
.
__name__
))
task
.
execute
(
self
)
tc
.
saveExecutedTask
(
task
)
logger
.
debug
(
"[TRD] Worker thread [{}] finished executing task"
.
format
(
self
.
_tid
))
self
.
_dbInUse
=
False
# there may be changes between steps
#
self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died
def
verifyThreadSelf
(
self
):
# ensure we are called by this own thread
...
...
@@ -237,7 +237,7 @@ class WorkerThread:
def
getQueryResult
(
self
):
return
self
.
getDbConn
().
getQueryResult
()
def
getDbConn
(
self
):
def
getDbConn
(
self
)
->
DbConn
:
if
(
gConfig
.
per_thread_db_connection
):
return
self
.
_dbConn
else
:
...
...
@@ -255,7 +255,7 @@ class WorkerThread:
class
ThreadCoordinator
:
WORKER_THREAD_TIMEOUT
=
60
# one minute
def
__init__
(
self
,
pool
:
ThreadPool
,
dbManager
):
def
__init__
(
self
,
pool
:
ThreadPool
,
dbManager
:
DbManager
):
self
.
_curStep
=
-
1
# first step is 0
self
.
_pool
=
pool
# self._wd = wd
...
...
@@ -268,6 +268,7 @@ class ThreadCoordinator:
self
.
_pool
.
numThreads
+
1
)
# one barrier for all threads
self
.
_execStats
=
ExecutionStats
()
self
.
_runStatus
=
MainExec
.
STATUS_RUNNING
self
.
_initDbs
()
def
getTaskExecutor
(
self
):
return
self
.
_te
...
...
@@ -318,7 +319,7 @@ class ThreadCoordinator:
logger
.
debug
(
"[TRD] Main thread waking up at step {}, tapping worker threads"
.
format
(
self
.
_curStep
))
# Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's own task
self
.
tapAllThreads
()
# release all worker thread from their "gate"
self
.
tapAllThreads
()
# release all worker thread from their "gate
s
"
def
_syncAtBarrier
(
self
):
# Now main thread (that's us) is ready to enter a step
...
...
@@ -332,12 +333,16 @@ class ThreadCoordinator:
def
_doTransition
(
self
):
transitionFailed
=
False
try
:
sm
=
self
.
_dbManager
.
getStateMachine
()
logger
.
debug
(
"[STT] starting transitions"
)
# at end of step, transiton the DB state
sm
.
transition
(
self
.
_executedTasks
)
logger
.
debug
(
"[STT] transition ended"
)
# Due to limitation (or maybe not) of the Python library,
for
x
in
self
.
_dbs
:
db
=
x
# type: Database
sm
=
db
.
getStateMachine
()
logger
.
debug
(
"[STT] starting transitions for DB: {}"
.
format
(
db
.
getName
()))
# at end of step, transiton the DB state
tasksForDb
=
db
.
filterTasks
(
self
.
_executedTasks
)
sm
.
transition
(
tasksForDb
,
self
.
getDbManager
().
getDbConn
())
logger
.
debug
(
"[STT] transition ended for DB: {}"
.
format
(
db
.
getName
()))
# Due to limitation (or maybe not) of the TD Python library,
# we cannot share connections across threads
# Here we are in main thread, we cannot operate the connections created in workers
# Moving below to task loop
...
...
@@ -347,6 +352,7 @@ class ThreadCoordinator:
# t.useDb()
# t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread
except
taos
.
error
.
ProgrammingError
as
err
:
if
(
err
.
msg
==
'network unavailable'
):
# broken DB connection
logger
.
info
(
"DB connection broken, execution failed"
)
...
...
@@ -358,7 +364,7 @@ class ThreadCoordinator:
# end, and maybe signal them to stop
else
:
raise
return
transitionFailed
# return transitionFailed # Why did we have this??!!
self
.
resetExecutedTasks
()
# clear the tasks after we are done
# Get ready for next step
...
...
@@ -378,6 +384,14 @@ class ThreadCoordinator:
while
not
self
.
_runShouldEnd
(
transitionFailed
,
hasAbortedTask
,
workerTimeout
):
if
not
gConfig
.
debug
:
# print this only if we are not in debug mode
print
(
"."
,
end
=
""
,
flush
=
True
)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
# memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
# if (self._curStep % 10) == 3:
# h = hpy()
# print("\n")
# print(h.heap())
try
:
self
.
_syncAtBarrier
()
# For now just cross the barrier
...
...
@@ -407,6 +421,7 @@ class ThreadCoordinator:
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
# correct error scheme
errMsg
=
"Transition failed: errno=0x{:X}, msg: {}"
.
format
(
errno2
,
err
)
logger
.
info
(
errMsg
)
traceback
.
print_exc
()
self
.
_execStats
.
registerFailure
(
errMsg
)
# Then we move on to the next step
...
...
@@ -430,6 +445,19 @@ class ThreadCoordinator:
logger
.
info
(
"
\n
All worker threads finished"
)
self
.
_execStats
.
endExec
()
def
cleanup
(
self
):
# free resources
self
.
_pool
.
cleanup
()
self
.
_pool
=
None
self
.
_te
=
None
self
.
_dbManager
=
None
self
.
_executedTasks
=
None
self
.
_lock
=
None
self
.
_stepBarrier
=
None
self
.
_execStats
=
None
self
.
_runStatus
=
None
def
printStats
(
self
):
self
.
_execStats
.
printStats
()
...
...
@@ -458,23 +486,34 @@ class ThreadCoordinator:
def
isRunning
(
self
):
return
self
.
_te
is
not
None
def
_initDbs
(
self
):
''' Initialize multiple databases, invoked at __ini__() time '''
self
.
_dbs
=
[]
# type: List[Database]
dbc
=
self
.
getDbManager
().
getDbConn
()
if
gConfig
.
max_dbs
==
0
:
self
.
_dbs
.
append
(
Database
(
0
,
dbc
))
else
:
for
i
in
range
(
gConfig
.
max_dbs
):
self
.
_dbs
.
append
(
Database
(
i
,
dbc
))
def
pickDatabase
(
self
):
idxDb
=
0
if
gConfig
.
max_dbs
!=
0
:
idxDb
=
Dice
.
throw
(
gConfig
.
max_dbs
)
# 0 to N-1
db
=
self
.
_dbs
[
idxDb
]
# type: Database
return
db
def
fetchTask
(
self
)
->
Task
:
''' The thread coordinator (that's us) is responsible for fetching a task
to be executed next.
'''
if
(
not
self
.
isRunning
()):
# no task
raise
RuntimeError
(
"Cannot fetch task when not running"
)
# return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task
# dbState = self.getDbState()
# tasks = dbState.getTasksAtState() # TODO: create every time?
# nTasks = len(tasks)
# i = Dice.throw(nTasks)
# logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary?
# pick a task type for current state
taskType
=
self
.
getDbManager
().
getStateMachine
().
pickTaskType
()
return
taskType
(
self
.
getDbManager
(),
self
.
_execStats
)
# create a task from it
db
=
self
.
pickDatabase
()
taskType
=
db
.
getStateMachine
().
pickTaskType
()
# type: Task
return
taskType
(
self
.
_execStats
,
db
)
# create a task from it
def
resetExecutedTasks
(
self
):
self
.
_executedTasks
=
[]
# should be under single thread
...
...
@@ -510,6 +549,9 @@ class ThreadPool:
logger
.
debug
(
"Joining thread..."
)
workerThread
.
_thread
.
join
()
def
cleanup
(
self
):
self
.
threadList
=
None
# maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
...
...
@@ -632,17 +674,6 @@ class DbConn:
logger
.
debug
(
"[DB] data connection opened, type = {}"
.
format
(
self
.
_type
))
self
.
isOpen
=
True
def
resetDb
(
self
):
# reset the whole database, etc.
if
(
not
self
.
isOpen
):
raise
RuntimeError
(
"Cannot reset database until connection is open"
)
# self._tdSql.prepare() # Recreate database, etc.
self
.
execute
(
'drop database if exists db'
)
logger
.
debug
(
"Resetting DB, dropped database"
)
# self._cursor.execute('create database db')
# self._cursor.execute('use db')
# tdSql.execute('show databases')
def
queryScalar
(
self
,
sql
)
->
int
:
return
self
.
_queryAny
(
sql
)
...
...
@@ -654,7 +685,10 @@ class DbConn:
raise
RuntimeError
(
"Cannot query database until connection is open"
)
nRows
=
self
.
query
(
sql
)
if
nRows
!=
1
:
raise
RuntimeError
(
"Unexpected result for query: {}, rows = {}"
.
format
(
sql
,
nRows
))
raise
taos
.
error
.
ProgrammingError
(
"Unexpected result for query: {}, rows = {}"
.
format
(
sql
,
nRows
),
(
0x991
if
nRows
==
0
else
0x992
)
)
if
self
.
getResultRows
()
!=
1
or
self
.
getResultCols
()
!=
1
:
raise
RuntimeError
(
"Unexpected result set for query: {}"
.
format
(
sql
))
return
self
.
getQueryResult
()[
0
][
0
]
...
...
@@ -662,16 +696,32 @@ class DbConn:
def
use
(
self
,
dbName
):
self
.
execute
(
"use {}"
.
format
(
dbName
))
def
hasDatabases
(
self
):
return
self
.
query
(
"show databases"
)
>
1
# We now have a "log" database by default
def
existsDatabase
(
self
,
dbName
:
str
):
''' Check if a certain database exists '''
self
.
query
(
"show databases"
)
dbs
=
[
v
[
0
]
for
v
in
self
.
getQueryResult
()]
# ref: https://stackoverflow.com/questions/643823/python-list-transformation
# ret2 = dbName in dbs
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return
dbName
in
dbs
# TODO: super weird type mangling seen, once here
def
hasTables
(
self
):
return
self
.
query
(
"show tables"
)
>
0
def
execute
(
self
,
sql
):
''' Return the number of rows affected'''
raise
RuntimeError
(
"Unexpected execution, should be overriden"
)
def
safeExecute
(
self
,
sql
):
'''Safely execute any SQL query, returning True/False upon success/failure'''
try
:
self
.
execute
(
sql
)
return
True
# ignore num of results, return success
except
taos
.
error
.
ProgrammingError
as
err
:
return
False
# failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
def
query
(
self
,
sql
)
->
int
:
# return num rows returned
''' Return the number of rows affected'''
raise
RuntimeError
(
"Unexpected execution, should be overriden"
)
def
openByType
(
self
):
...
...
@@ -766,6 +816,13 @@ class DbConnRest(DbConn):
class
MyTDSql
:
# Class variables
_clsLock
=
threading
.
Lock
()
# class wide locking
longestQuery
=
None
# type: str
longestQueryTime
=
0.0
# seconds
lqStartTime
=
0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
def
__init__
(
self
,
hostAddr
,
cfgPath
):
# Make the DB connection
self
.
_conn
=
taos
.
connect
(
host
=
hostAddr
,
config
=
cfgPath
)
...
...
@@ -782,13 +839,28 @@ class MyTDSql:
# self.cursor.log(caller.filename + ".sql")
def
close
(
self
):
self
.
_cursor
.
close
()
# can we double close?
self
.
_conn
.
close
()
# TODO: very important, cursor close does NOT close DB connection!
self
.
_cursor
.
close
()
def
_execInternal
(
self
,
sql
):
startTime
=
time
.
time
()
ret
=
self
.
_cursor
.
execute
(
sql
)
# print("\nSQL success: {}".format(sql))
queryTime
=
time
.
time
()
-
startTime
# Record the query time
cls
=
self
.
__class__
if
queryTime
>
(
cls
.
longestQueryTime
+
0.01
)
:
with
cls
.
_clsLock
:
cls
.
longestQuery
=
sql
cls
.
longestQueryTime
=
queryTime
cls
.
lqStartTime
=
startTime
return
ret
def
query
(
self
,
sql
):
self
.
sql
=
sql
try
:
self
.
_
cursor
.
execute
(
sql
)
self
.
_
execInternal
(
sql
)
self
.
queryResult
=
self
.
_cursor
.
fetchall
()
self
.
queryRows
=
len
(
self
.
queryResult
)
self
.
queryCols
=
len
(
self
.
_cursor
.
description
)
...
...
@@ -802,7 +874,7 @@ class MyTDSql:
def
execute
(
self
,
sql
):
self
.
sql
=
sql
try
:
self
.
affectedRows
=
self
.
_
cursor
.
execute
(
sql
)
self
.
affectedRows
=
self
.
_
execInternal
(
sql
)
except
Exception
as
e
:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
...
...
@@ -922,7 +994,9 @@ class AnyState:
STATE_VAL_IDX
=
0
CAN_CREATE_DB
=
1
CAN_DROP_DB
=
2
# For below, if we can "drop the DB", but strictly speaking
# only "under normal circumstances", as we may override it with the -b option
CAN_DROP_DB
=
2
CAN_CREATE_FIXED_SUPER_TABLE
=
3
CAN_DROP_FIXED_SUPER_TABLE
=
4
CAN_ADD_DATA
=
5
...
...
@@ -935,6 +1009,8 @@ class AnyState:
# -1 hack to accomodate the STATE_INVALID case
return
self
.
_stateNames
[
self
.
_info
[
self
.
STATE_VAL_IDX
]
+
1
]
# Each sub state tells us the "info", about itself, so we can determine
# on things like canDropDB()
def
getInfo
(
self
):
raise
RuntimeError
(
"Must be overriden by child classes"
)
...
...
@@ -961,6 +1037,10 @@ class AnyState:
return
self
.
_info
[
self
.
CAN_CREATE_DB
]
def
canDropDb
(
self
):
# If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more
if
gConfig
.
max_dbs
>
0
:
return
False
return
self
.
_info
[
self
.
CAN_DROP_DB
]
def
canCreateFixedSuperTable
(
self
):
...
...
@@ -997,8 +1077,8 @@ class AnyState:
if
task
.
isSuccess
():
sCnt
+=
1
if
(
exists
and
sCnt
<=
0
):
raise
RuntimeError
(
"Unexpected zero success for task: {}"
.
format
(
cl
s
))
raise
RuntimeError
(
"Unexpected zero success for task type: {}, from tasks: {}"
.
format
(
cls
,
task
s
))
def
assertNoTask
(
self
,
tasks
,
cls
):
for
task
in
tasks
:
...
...
@@ -1145,13 +1225,16 @@ class StateHasData(AnyState):
class
StateMechine
:
def
__init__
(
self
,
dbConn
):
self
.
_dbConn
=
dbConn
self
.
_curState
=
self
.
_findCurrentState
()
# starting state
# transitition target probabilities, indexed with value of STATE_EMPTY,
# STATE_DB_ONLY, etc.
def
__init__
(
self
,
db
:
Database
):
self
.
_db
=
db
# transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
self
.
_stateWeights
=
[
1
,
2
,
10
,
40
]
def
init
(
self
,
dbc
:
DbConn
):
# late initailization, don't save the dbConn
self
.
_curState
=
self
.
_findCurrentState
(
dbc
)
# starting state
logger
.
debug
(
"Found Starting State: {}"
.
format
(
self
.
_curState
))
# TODO: seems no lnoger used, remove?
def
getCurrentState
(
self
):
return
self
.
_curState
...
...
@@ -1193,34 +1276,35 @@ class StateMechine:
typesToStrings
(
taskTypes
)))
return
taskTypes
def
_findCurrentState
(
self
):
dbc
=
self
.
_dbConn
def
_findCurrentState
(
self
,
dbc
:
DbConn
):
ts
=
time
.
time
()
# we use this to debug how fast/slow it is to do the various queries to find the current DB state
if
not
dbc
.
hasDatabases
():
# no database?!
dbName
=
self
.
_db
.
getName
()
if
not
dbc
.
existsDatabase
(
dbName
):
# dbc.hasDatabases(): # no database?!
logger
.
debug
(
"[STT] empty database found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateEmpty
()
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
dbc
.
use
(
"db"
)
dbc
.
use
(
dbName
)
if
not
dbc
.
hasTables
():
# no tables
logger
.
debug
(
"[STT] DB_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateDbOnly
()
sTable
=
DbManager
.
getFixedSuperTable
()
if
sTable
.
hasRegTables
(
dbc
):
# no regular tables
sTable
=
self
.
_db
.
getFixedSuperTable
()
if
sTable
.
hasRegTables
(
dbc
,
dbName
):
# no regular tables
logger
.
debug
(
"[STT] SUPER_TABLE_ONLY found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateSuperTableOnly
()
else
:
# has actual tables
logger
.
debug
(
"[STT] HAS_DATA found, between {} and {}"
.
format
(
ts
,
time
.
time
()))
return
StateHasData
()
def
transition
(
self
,
tasks
):
# We transition the system to a new state by examining the current state itself
def
transition
(
self
,
tasks
,
dbc
:
DbConn
):
if
(
len
(
tasks
)
==
0
):
# before 1st step, or otherwise empty
logger
.
debug
(
"[STT] Starting State: {}"
.
format
(
self
.
_curState
))
return
# do nothing
# this should show up in the server log, separating steps
self
.
_dbConn
.
execute
(
"show dnodes"
)
dbc
.
execute
(
"show dnodes"
)
# Generic Checks, first based on the start state
if
self
.
_curState
.
canCreateDb
():
...
...
@@ -1251,7 +1335,7 @@ class StateMechine:
# if self._state.canReadData():
# Nothing for sure
newState
=
self
.
_findCurrentState
()
newState
=
self
.
_findCurrentState
(
dbc
)
logger
.
debug
(
"[STT] New DB state determined: {}"
.
format
(
newState
))
# can old state move to new state through the tasks?
self
.
_curState
.
verifyTasksToState
(
tasks
,
newState
)
...
...
@@ -1283,49 +1367,53 @@ class StateMechine:
if
rnd
<
0
:
return
i
# Manager of the Database Data/Connection
class
Database
:
''' We use this to represent an actual TDengine database inside a service instance,
possibly in a cluster environment.
For now we use it to manage state transitions in that database
'''
_clsLock
=
threading
.
Lock
()
# class wide lock
_lastInt
=
101
# next one is initial integer
_lastTick
=
0
_lastLaggingTick
=
0
# lagging tick, for unsequenced insersions
def
__init__
(
self
,
dbNum
:
int
,
dbc
:
DbConn
):
# TODO: remove dbc
self
.
_dbNum
=
dbNum
# we assign a number to databases, for our testing purpose
self
.
_stateMachine
=
StateMechine
(
self
)
self
.
_stateMachine
.
init
(
dbc
)
self
.
_lock
=
threading
.
RLock
()
def
getStateMachine
(
self
)
->
StateMechine
:
return
self
.
_stateMachine
class
DbManager
():
def
__init__
(
self
,
resetDb
=
True
):
self
.
tableNumQueue
=
LinearQueue
()
# datetime.datetime(2019, 1, 1) # initial date time tick
self
.
_lastTick
=
self
.
setupLastTick
()
self
.
_lastInt
=
0
# next one is initial integer
self
.
_lock
=
threading
.
RLock
()
def
getDbNum
(
self
):
return
self
.
_dbNum
# self.openDbServerConnection()
self
.
_dbConn
=
DbConn
.
createNative
()
if
(
gConfig
.
connector_type
==
'native'
)
else
DbConn
.
createRest
()
try
:
self
.
_dbConn
.
open
()
# may throw taos.error.ProgrammingError: disconnected
except
taos
.
error
.
ProgrammingError
as
err
:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if
(
err
.
msg
==
'client disconnected'
):
# cannot open DB connection
print
(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions."
)
sys
.
exit
(
2
)
else
:
print
(
"Failed to connect to DB, errno = {}, msg: {}"
.
format
(
Helper
.
convertErrno
(
err
.
errno
),
err
.
msg
))
raise
except
BaseException
:
print
(
"[=] Unexpected exception"
)
raise
def
getName
(
self
):
return
"db_{}"
.
format
(
self
.
_dbNum
)
if
resetDb
:
self
.
_dbConn
.
resetDb
()
# drop and recreate DB
def
filterTasks
(
self
,
inTasks
:
List
[
Task
]):
# Pick out those belonging to us
outTasks
=
[]
for
task
in
inTasks
:
if
task
.
getDb
().
isSame
(
self
):
outTasks
.
append
(
task
)
return
outTasks
# Do this after dbConn is in proper shape
self
.
_stateMachine
=
StateMechine
(
self
.
_dbConn
)
def
isSame
(
self
,
other
):
return
self
.
_dbNum
==
other
.
_dbNum
def
getDbConn
(
self
):
return
self
.
_dbConn
def
exists
(
self
,
dbc
:
DbConn
):
return
dbc
.
existsDatabase
(
self
.
getName
())
def
getStateMachine
(
self
)
->
StateMechine
:
return
self
.
_stateMachine
@
classmethod
def
getFixedSuperTableName
(
cls
):
return
"fs_table"
# def getState(self):
# return self._stateMachine.getCurrentState()
@
classmethod
def
getFixedSuperTable
(
cls
)
->
TdSuperTable
:
return
TdSuperTable
(
cls
.
getFixedSuperTableName
())
# We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp
...
...
@@ -1333,7 +1421,8 @@ class DbManager():
# by a factor of 500.
# TODO: what if it goes beyond 10 years into the future
# TODO: fix the error as result of above: "tsdb timestamp is out of range"
def
setupLastTick
(
self
):
@
classmethod
def
setupLastTick
(
cls
):
t1
=
datetime
.
datetime
(
2020
,
6
,
1
)
t2
=
datetime
.
datetime
.
now
()
# maybe a very large number, takes 69 years to exceed Python int range
...
...
@@ -1347,33 +1436,22 @@ class DbManager():
logger
.
info
(
"Setting up TICKS to start from: {}"
.
format
(
t4
))
return
t4
def
pickAndAllocateTable
(
self
):
# pick any table, and "use" it
return
self
.
tableNumQueue
.
pickAndAllocate
()
def
addTable
(
self
):
with
self
.
_lock
:
tIndex
=
self
.
tableNumQueue
.
push
()
return
tIndex
@
classmethod
def
getFixedSuperTableName
(
cls
):
return
"fs_table"
@
classmethod
def
getFixedSuperTable
(
cls
):
return
TdSuperTable
(
cls
.
getFixedSuperTableName
())
def
releaseTable
(
self
,
i
):
# return the table back, so others can use it
self
.
tableNumQueue
.
release
(
i
)
def
getNextTick
(
self
):
with
self
.
_lock
:
# prevent duplicate tick
if
Dice
.
throw
(
20
)
==
0
:
# 1 in 20 chance
return
self
.
_lastTick
+
datetime
.
timedelta
(
0
,
-
100
)
# Go back in time 100 seconds
def
getNextTick
(
cls
):
with
cls
.
_clsLock
:
# prevent duplicate tick
if
cls
.
_lastLaggingTick
==
0
:
# 10k at 1/20 chance, should be enough to avoid overlaps
cls
.
_lastLaggingTick
=
cls
.
setupLastTick
()
+
datetime
.
timedelta
(
0
,
-
10000
)
if
cls
.
_lastTick
==
0
:
# should be quite a bit into the future
cls
.
_lastTick
=
cls
.
setupLastTick
()
if
Dice
.
throw
(
20
)
==
0
:
# 1 in 20 chance, return lagging tick
cls
.
_lastLaggingTick
+=
datetime
.
timedelta
(
0
,
1
)
# Go back in time 100 seconds
return
cls
.
_lastLaggingTick
else
:
# regular
# add one second to it
self
.
_lastTick
+=
datetime
.
timedelta
(
0
,
1
)
return
self
.
_lastTick
cls
.
_lastTick
+=
datetime
.
timedelta
(
0
,
1
)
return
cls
.
_lastTick
def
getNextInt
(
self
):
with
self
.
_lock
:
...
...
@@ -1389,6 +1467,55 @@ class DbManager():
# print("Float obtained: {}".format(ret))
return
ret
class
DbManager
():
''' This is a wrapper around DbConn(), to make it easier to use.
TODO: rename this to DbConnManager
'''
def
__init__
(
self
):
self
.
tableNumQueue
=
LinearQueue
()
# TODO: delete?
# self.openDbServerConnection()
self
.
_dbConn
=
DbConn
.
createNative
()
if
(
gConfig
.
connector_type
==
'native'
)
else
DbConn
.
createRest
()
try
:
self
.
_dbConn
.
open
()
# may throw taos.error.ProgrammingError: disconnected
except
taos
.
error
.
ProgrammingError
as
err
:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if
(
err
.
msg
==
'client disconnected'
):
# cannot open DB connection
print
(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions."
)
sys
.
exit
(
2
)
else
:
print
(
"Failed to connect to DB, errno = {}, msg: {}"
.
format
(
Helper
.
convertErrno
(
err
.
errno
),
err
.
msg
))
raise
except
BaseException
:
print
(
"[=] Unexpected exception"
)
raise
# Do this after dbConn is in proper shape
# Moved to Database()
# self._stateMachine = StateMechine(self._dbConn)
def
getDbConn
(
self
):
return
self
.
_dbConn
# TODO: not used any more, to delete
def
pickAndAllocateTable
(
self
):
# pick any table, and "use" it
return
self
.
tableNumQueue
.
pickAndAllocate
()
# TODO: Not used any more, to delete
def
addTable
(
self
):
with
self
.
_lock
:
tIndex
=
self
.
tableNumQueue
.
push
()
return
tIndex
# Not used any more, to delete
def
releaseTable
(
self
,
i
):
# return the table back, so others can use it
self
.
tableNumQueue
.
release
(
i
)
# TODO: not used any more, delete
def
getTableNameToDelete
(
self
):
tblNum
=
self
.
tableNumQueue
.
pop
()
# TODO: race condition!
if
(
not
tblNum
):
# maybe false
...
...
@@ -1399,7 +1526,6 @@ class DbManager():
def
cleanUp
(
self
):
self
.
_dbConn
.
close
()
class
TaskExecutor
():
class
BoundedList
:
def
__init__
(
self
,
size
=
10
):
...
...
@@ -1465,6 +1591,10 @@ class TaskExecutor():
class
Task
():
''' A generic "Task" to be executed. For now we decide that there is no
need to embed a DB connection here, we use whatever the Worker Thread has
instead. But a task is always associated with a DB
'''
taskSn
=
100
@
classmethod
...
...
@@ -1473,10 +1603,9 @@ class Task():
# logger.debug("Allocating taskSN: {}".format(Task.taskSn))
return
Task
.
taskSn
def
__init__
(
self
,
dbManager
:
DbManager
,
execStats
:
ExecutionStats
):
self
.
_dbManager
=
dbManager
def
__init__
(
self
,
execStats
:
ExecutionStats
,
db
:
Database
):
self
.
_workerThread
=
None
self
.
_err
=
None
self
.
_err
=
None
# type: Exception
self
.
_aborted
=
False
self
.
_curStep
=
None
self
.
_numRows
=
None
# Number of rows affected
...
...
@@ -1486,6 +1615,7 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum))
self
.
_execStats
=
execStats
self
.
_db
=
db
# A task is always associated/for a specific DB
def
isSuccess
(
self
):
return
self
.
_err
is
None
...
...
@@ -1494,9 +1624,12 @@ class Task():
return
self
.
_aborted
def
clone
(
self
):
# TODO: why do we need this again?
newTask
=
self
.
__class__
(
self
.
_
dbManager
,
self
.
_execStats
)
newTask
=
self
.
__class__
(
self
.
_
execStats
,
self
.
_db
)
return
newTask
def
getDb
(
self
):
return
self
.
_db
def
logDebug
(
self
,
msg
):
self
.
_workerThread
.
logDebug
(
"Step[{}.{}] {}"
.
format
(
...
...
@@ -1515,6 +1648,7 @@ class Task():
def
_isErrAcceptable
(
self
,
errno
,
msg
):
if
errno
in
[
0x05
,
# TSDB_CODE_RPC_NOT_READY
0x0B
,
# Unable to establish connection, more details in TD-1648
# 0x200, # invalid SQL, TODO: re-examine with TD-934
0x217
,
# "db not selected", client side defined error code
0x218
,
# "Table does not exist" client side defined error code
...
...
@@ -1557,9 +1691,12 @@ class Task():
self
.
logDebug
(
"[-] executing task {}..."
.
format
(
self
.
__class__
.
__name__
))
self
.
_err
=
None
self
.
_err
=
None
# TODO: type hint mess up?
self
.
_execStats
.
beginTaskType
(
self
.
__class__
.
__name__
)
# mark beginning
errno2
=
None
# Now pick a database, and stick with it for the duration of the task execution
dbName
=
self
.
_db
.
getName
()
try
:
self
.
_executeInternal
(
te
,
wt
)
# TODO: no return value?
except
taos
.
error
.
ProgrammingError
as
err
:
...
...
@@ -1597,7 +1734,7 @@ class Task():
self
.
_err
=
e
self
.
_aborted
=
True
traceback
.
print_exc
()
except
BaseException
:
except
BaseException
:
# TODO: what is this again??!!
self
.
logDebug
(
"[=] Unexpected exception, SQL: {}"
.
format
(
wt
.
getDbConn
().
getLastSql
()))
...
...
@@ -1609,10 +1746,9 @@ class Task():
# TODO: merge with above.
self
.
_execStats
.
incExecCount
(
self
.
__class__
.
__name__
,
self
.
isSuccess
(),
errno2
)
def
execSql
(
self
,
sql
):
return
self
.
_dbManager
.
execute
(
sql
)
# TODO: refactor away, just provide the dbConn
def
execWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
""" Haha """
return
wt
.
execSql
(
sql
)
def
queryWtSql
(
self
,
wt
:
WorkerThread
,
sql
):
# execute an SQL on the worker thread
...
...
@@ -1714,7 +1850,11 @@ class ExecutionStats:
"| Total Elapsed Time (from wall clock): {:.3f} seconds"
.
format
(
self
.
_elapsedTime
))
logger
.
info
(
"| Top numbers written: {}"
.
format
(
TaskExecutor
.
getBoundedList
()))
logger
.
info
(
"| Total Number of Active DB Native Connections: {}"
.
format
(
DbConnNative
.
totalConnections
))
logger
.
info
(
"| Active DB Native Connections (now): {}"
.
format
(
DbConnNative
.
totalConnections
))
logger
.
info
(
"| Longest native query time: {:.3f} seconds, started: {}"
.
format
(
MyTDSql
.
longestQueryTime
,
time
.
strftime
(
"%x %X"
,
time
.
localtime
(
MyTDSql
.
lqStartTime
)))
)
logger
.
info
(
"| Longest native query: {}"
.
format
(
MyTDSql
.
longestQuery
))
logger
.
info
(
"----------------------------------------------------------------------"
)
...
...
@@ -1764,9 +1904,15 @@ class TaskCreateDb(StateTransitionTask):
def
canBeginFrom
(
cls
,
state
:
AnyState
):
return
state
.
canCreateDb
()
# Actually creating the database(es)
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# self.execWtSql(wt, "create database db replica {}".format(Dice.throw(3)+1))
self
.
execWtSql
(
wt
,
"create database db"
)
# was: self.execWtSql(wt, "create database db")
repStr
=
""
if
gConfig
.
max_replicas
!=
1
:
numReplica
=
Dice
.
throw
(
gConfig
.
max_replicas
)
+
1
# 1,2 ... N
repStr
=
"replica {}"
.
format
(
numReplica
)
self
.
execWtSql
(
wt
,
"create database {} {}"
.
format
(
self
.
_db
.
getName
(),
repStr
)
)
class
TaskDropDb
(
StateTransitionTask
):
@
classmethod
...
...
@@ -1778,10 +1924,9 @@ class TaskDropDb(StateTransitionTask):
return
state
.
canDropDb
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
self
.
execWtSql
(
wt
,
"drop database
db"
)
self
.
execWtSql
(
wt
,
"drop database
{}"
.
format
(
self
.
_db
.
getName
())
)
logger
.
debug
(
"[OPS] database dropped at {}"
.
format
(
time
.
time
()))
class
TaskCreateSuperTable
(
StateTransitionTask
):
@
classmethod
def
getEndState
(
cls
):
...
...
@@ -1792,13 +1937,14 @@ class TaskCreateSuperTable(StateTransitionTask):
return
state
.
canCreateFixedSuperTable
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
if
not
wt
.
dbInUse
():
# no DB yet, to the best of our knowledge
if
not
self
.
_db
.
exists
(
wt
.
getDbConn
()):
logger
.
debug
(
"Skipping task, no DB yet"
)
return
sTable
=
self
.
_db
Manager
.
getFixedSuperTable
()
sTable
=
self
.
_db
.
getFixedSuperTable
()
# type: TdSuperTable
# wt.execSql("use db") # should always be in place
sTable
.
create
(
wt
.
getDbConn
(),
{
'ts'
:
'timestamp'
,
'speed'
:
'int'
},
{
'b'
:
'binary(200)'
,
'f'
:
'float'
})
sTable
.
create
(
wt
.
getDbConn
(),
self
.
_db
.
getName
(),
{
'ts'
:
'timestamp'
,
'speed'
:
'int'
},
{
'b'
:
'binary(200)'
,
'f'
:
'float'
})
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that
# automatically
...
...
@@ -1811,17 +1957,20 @@ class TdSuperTable:
def
getName
(
self
):
return
self
.
_stName
def
create
(
self
,
dbc
,
cols
:
dict
,
tags
:
dict
):
sql
=
"CREATE TABLE db.{} ({}) TAGS ({})"
.
format
(
# TODO: odd semantic, create() method is usually static?
def
create
(
self
,
dbc
,
dbName
,
cols
:
dict
,
tags
:
dict
):
'''Creating a super table'''
sql
=
"CREATE TABLE {}.{} ({}) TAGS ({})"
.
format
(
dbName
,
self
.
_stName
,
","
.
join
([
'%s %s'
%
(
k
,
v
)
for
(
k
,
v
)
in
cols
.
items
()]),
","
.
join
([
'%s %s'
%
(
k
,
v
)
for
(
k
,
v
)
in
tags
.
items
()])
)
dbc
.
execute
(
sql
)
def
getRegTables
(
self
,
dbc
:
DbConn
):
def
getRegTables
(
self
,
dbc
:
DbConn
,
dbName
:
str
):
try
:
dbc
.
query
(
"select TBNAME from
db.{}"
.
format
(
self
.
_stName
))
# TODO: analyze result set later
dbc
.
query
(
"select TBNAME from
{}.{}"
.
format
(
dbName
,
self
.
_stName
))
# TODO: analyze result set later
except
taos
.
error
.
ProgrammingError
as
err
:
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
logger
.
debug
(
"[=] Failed to get tables from super table: errno=0x{:X}, msg: {}"
.
format
(
errno2
,
err
))
...
...
@@ -1830,20 +1979,20 @@ class TdSuperTable:
qr
=
dbc
.
getQueryResult
()
return
[
v
[
0
]
for
v
in
qr
]
# list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def
hasRegTables
(
self
,
dbc
:
DbConn
):
return
dbc
.
query
(
"SELECT * FROM
db.{}"
.
format
(
self
.
_stName
))
>
0
def
hasRegTables
(
self
,
dbc
:
DbConn
,
dbName
:
str
):
return
dbc
.
query
(
"SELECT * FROM
{}.{}"
.
format
(
dbName
,
self
.
_stName
))
>
0
def
ensureTable
(
self
,
dbc
:
DbConn
,
regTableName
:
str
):
sql
=
"select tbname from
db.{} where tbname in ('{}')"
.
format
(
self
.
_stName
,
regTableName
)
def
ensureTable
(
self
,
dbc
:
DbConn
,
dbName
:
str
,
regTableName
:
str
):
sql
=
"select tbname from
{}.{} where tbname in ('{}')"
.
format
(
dbName
,
self
.
_stName
,
regTableName
)
if
dbc
.
query
(
sql
)
>=
1
:
# reg table exists already
return
sql
=
"CREATE TABLE {}
USING
{} tags ({})"
.
format
(
regTableName
,
self
.
_stName
,
self
.
_getTagStrForSql
(
dbc
)
sql
=
"CREATE TABLE {}
.{} USING {}.
{} tags ({})"
.
format
(
dbName
,
regTableName
,
dbName
,
self
.
_stName
,
self
.
_getTagStrForSql
(
dbc
,
dbName
)
)
dbc
.
execute
(
sql
)
def
_getTagStrForSql
(
self
,
dbc
)
:
tags
=
self
.
_getTags
(
dbc
)
def
_getTagStrForSql
(
self
,
dbc
,
dbName
:
str
)
:
tags
=
self
.
_getTags
(
dbc
,
dbName
)
tagStrs
=
[]
for
tagName
in
tags
:
tagType
=
tags
[
tagName
]
...
...
@@ -1857,34 +2006,34 @@ class TdSuperTable:
raise
RuntimeError
(
"Unexpected tag type: {}"
.
format
(
tagType
))
return
", "
.
join
(
tagStrs
)
def
_getTags
(
self
,
dbc
)
->
dict
:
dbc
.
query
(
"DESCRIBE {}
"
.
format
(
self
.
_stName
))
def
_getTags
(
self
,
dbc
,
dbName
)
->
dict
:
dbc
.
query
(
"DESCRIBE {}
.{}"
.
format
(
dbName
,
self
.
_stName
))
stCols
=
dbc
.
getQueryResult
()
# print(stCols)
ret
=
{
row
[
0
]:
row
[
1
]
for
row
in
stCols
if
row
[
3
]
==
'TAG'
}
# name:type
# print("Tags retrieved: {}".format(ret))
return
ret
def
addTag
(
self
,
dbc
,
tagName
,
tagType
):
if
tagName
in
self
.
_getTags
(
dbc
):
# already
def
addTag
(
self
,
dbc
,
dbName
,
tagName
,
tagType
):
if
tagName
in
self
.
_getTags
(
dbc
,
dbName
):
# already
return
# sTable.addTag("extraTag", "int")
sql
=
"alter table
db.{} add tag {} {}"
.
format
(
self
.
_stName
,
tagName
,
tagType
)
sql
=
"alter table
{}.{} add tag {} {}"
.
format
(
dbName
,
self
.
_stName
,
tagName
,
tagType
)
dbc
.
execute
(
sql
)
def
dropTag
(
self
,
dbc
,
tagName
):
if
not
tagName
in
self
.
_getTags
(
dbc
):
# don't have this tag
def
dropTag
(
self
,
dbc
,
dbName
,
tagName
):
if
not
tagName
in
self
.
_getTags
(
dbc
,
dbName
):
# don't have this tag
return
sql
=
"alter table
db.{} drop tag {}"
.
format
(
self
.
_stName
,
tagName
)
sql
=
"alter table
{}.{} drop tag {}"
.
format
(
dbName
,
self
.
_stName
,
tagName
)
dbc
.
execute
(
sql
)
def
changeTag
(
self
,
dbc
,
oldTag
,
newTag
):
tags
=
self
.
_getTags
(
dbc
)
def
changeTag
(
self
,
dbc
,
dbName
,
oldTag
,
newTag
):
tags
=
self
.
_getTags
(
dbc
,
dbName
)
if
not
oldTag
in
tags
:
# don't have this tag
return
if
newTag
in
tags
:
# already have this tag
return
sql
=
"alter table
db.{} change tag {} {}"
.
format
(
self
.
_stName
,
oldTag
,
newTag
)
sql
=
"alter table
{}.{} change tag {} {}"
.
format
(
dbName
,
self
.
_stName
,
oldTag
,
newTag
)
dbc
.
execute
(
sql
)
class
TaskReadData
(
StateTransitionTask
):
...
...
@@ -1897,19 +2046,21 @@ class TaskReadData(StateTransitionTask):
return
state
.
canReadData
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
sTable
=
self
.
_db
Manager
.
getFixedSuperTable
()
sTable
=
self
.
_db
.
getFixedSuperTable
()
if
random
.
randrange
(
5
)
==
0
:
# 1 in 5 chance, simulate a broken connection.
TODO: break connection in all situations
# 1 in 5 chance, simulate a broken connection.
if
random
.
randrange
(
5
)
==
0
:
#
TODO: break connection in all situations
wt
.
getDbConn
().
close
()
wt
.
getDbConn
().
open
()
print
(
"_r"
,
end
=
""
,
flush
=
True
)
dbc
=
wt
.
getDbConn
()
for
rTbName
in
sTable
.
getRegTables
(
dbc
):
# regular tables
dbName
=
self
.
_db
.
getName
()
for
rTbName
in
sTable
.
getRegTables
(
dbc
,
dbName
):
# regular tables
aggExpr
=
Dice
.
choice
([
'*'
,
'count(*)'
,
'avg(speed)'
,
'*'
,
'count(*)'
,
'avg(speed)'
,
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)'
,
'stddev(speed)'
,
...
...
@@ -1931,10 +2082,10 @@ class TaskReadData(StateTransitionTask):
])
try
:
# Run the query against the regular table first
dbc
.
execute
(
"select {} from
db.{}"
.
format
(
aggExpr
,
rTbName
))
dbc
.
execute
(
"select {} from
{}.{}"
.
format
(
aggExpr
,
dbName
,
rTbName
))
# Then run it against the super table
if
aggExpr
not
in
[
'stddev(speed)'
]:
#TODO: STDDEV not valid for super tables?!
dbc
.
execute
(
"select {} from
db.{}"
.
format
(
aggExpr
,
sTable
.
getName
()))
dbc
.
execute
(
"select {} from
{}.{}"
.
format
(
aggExpr
,
dbName
,
sTable
.
getName
()))
except
taos
.
error
.
ProgrammingError
as
err
:
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
logger
.
debug
(
"[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}"
.
format
(
errno2
,
err
,
dbc
.
getLastSql
()))
...
...
@@ -1950,27 +2101,25 @@ class TaskDropSuperTable(StateTransitionTask):
return
state
.
canDropFixedSuperTable
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# 1/2 chance, we'll drop the regular tables one by one, in a randomized
# sequence
# 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
if
Dice
.
throw
(
2
)
==
0
:
# print("_7_", end="", flush=True)
tblSeq
=
list
(
range
(
2
+
(
self
.
LARGE_NUMBER_OF_TABLES
if
gConfig
.
larger_data
else
self
.
SMALL_NUMBER_OF_TABLES
)))
random
.
shuffle
(
tblSeq
)
tickOutput
=
False
# if we have spitted out a "d" character for "drop regular table"
isSuccess
=
True
for
i
in
tblSeq
:
regTableName
=
self
.
getRegTableName
(
i
)
# "db.reg_table_{}".format(i)
regTableName
=
self
.
getRegTableName
(
i
)
# "db.reg_table_{}".format(i)
try
:
self
.
execWtSql
(
wt
,
"drop table {}
"
.
format
(
regTableName
))
# nRows always 0, like MySQL
self
.
execWtSql
(
wt
,
"drop table {}
.{}"
.
format
(
self
.
_db
.
getName
(),
regTableName
))
# nRows always 0, like MySQL
except
taos
.
error
.
ProgrammingError
as
err
:
# correcting for strange error number scheme
errno2
=
Helper
.
convertErrno
(
err
.
errno
)
if
(
errno2
in
[
0x362
]):
# mnode invalid table name
isSuccess
=
False
logger
.
debug
(
"[DB] Acceptable error when dropping a table"
)
logger
.
debug
(
"[DB] Acceptable error when dropping a table"
)
continue
# try to delete next regular table
if
(
not
tickOutput
):
...
...
@@ -1981,8 +2130,8 @@ class TaskDropSuperTable(StateTransitionTask):
print
(
"f"
,
end
=
""
,
flush
=
True
)
# Drop the super table itself
tblName
=
self
.
_db
Manager
.
getFixedSuperTableName
()
self
.
execWtSql
(
wt
,
"drop table
db.{}"
.
format
(
tblName
))
tblName
=
self
.
_db
.
getFixedSuperTableName
()
self
.
execWtSql
(
wt
,
"drop table
{}.{}"
.
format
(
self
.
_db
.
getName
(),
tblName
))
class
TaskAlterTags
(
StateTransitionTask
):
...
...
@@ -1997,19 +2146,20 @@ class TaskAlterTags(StateTransitionTask):
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
# tblName = self._dbManager.getFixedSuperTableName()
dbc
=
wt
.
getDbConn
()
sTable
=
self
.
_dbManager
.
getFixedSuperTable
()
sTable
=
self
.
_db
.
getFixedSuperTable
()
dbName
=
self
.
_db
.
getName
()
dice
=
Dice
.
throw
(
4
)
if
dice
==
0
:
sTable
.
addTag
(
dbc
,
"extraTag"
,
"int"
)
sTable
.
addTag
(
dbc
,
dbName
,
"extraTag"
,
"int"
)
# sql = "alter table db.{} add tag extraTag int".format(tblName)
elif
dice
==
1
:
sTable
.
dropTag
(
dbc
,
"extraTag"
)
sTable
.
dropTag
(
dbc
,
dbName
,
"extraTag"
)
# sql = "alter table db.{} drop tag extraTag".format(tblName)
elif
dice
==
2
:
sTable
.
dropTag
(
dbc
,
"newTag"
)
sTable
.
dropTag
(
dbc
,
dbName
,
"newTag"
)
# sql = "alter table db.{} drop tag newTag".format(tblName)
else
:
# dice == 3
sTable
.
changeTag
(
dbc
,
"extraTag"
,
"newTag"
)
sTable
.
changeTag
(
dbc
,
dbName
,
"extraTag"
,
"newTag"
)
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class
TaskRestartService
(
StateTransitionTask
):
...
...
@@ -2074,7 +2224,9 @@ class TaskAddData(StateTransitionTask):
return
state
.
canAddData
()
def
_executeInternal
(
self
,
te
:
TaskExecutor
,
wt
:
WorkerThread
):
ds
=
self
.
_dbManager
# Quite DANGEROUS here, may result in multi-thread client access
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db
=
self
.
_db
dbc
=
wt
.
getDbConn
()
tblSeq
=
list
(
range
(
self
.
LARGE_NUMBER_OF_TABLES
if
gConfig
.
larger_data
else
self
.
SMALL_NUMBER_OF_TABLES
))
random
.
shuffle
(
tblSeq
)
...
...
@@ -2084,23 +2236,25 @@ class TaskAddData(StateTransitionTask):
else
:
self
.
activeTable
.
add
(
i
)
# marking it active
sTable
=
d
s
.
getFixedSuperTable
()
sTable
=
d
b
.
getFixedSuperTable
()
regTableName
=
self
.
getRegTableName
(
i
)
# "db.reg_table_{}".format(i)
sTable
.
ensureTable
(
wt
.
getDbConn
(),
regTableName
)
# Ensure the table exists
sTable
.
ensureTable
(
wt
.
getDbConn
(),
db
.
getName
(),
regTableName
)
# Ensure the table exists
for
j
in
range
(
self
.
LARGE_NUMBER_OF_RECORDS
if
gConfig
.
larger_data
else
self
.
SMALL_NUMBER_OF_RECORDS
):
# number of records per table
nextInt
=
ds
.
getNextInt
()
nextInt
=
db
.
getNextInt
()
nextTick
=
db
.
getNextTick
()
if
gConfig
.
record_ops
:
self
.
prepToRecordOps
()
self
.
fAddLogReady
.
write
(
"Ready to write {} to {}
\n
"
.
format
(
nextInt
,
regTableName
))
self
.
fAddLogReady
.
flush
()
os
.
fsync
(
self
.
fAddLogReady
)
sql
=
"insert into {} values ('{}', {});"
.
format
(
# removed: tags ('{}', {})
sql
=
"insert into {}.{} values ('{}', {});"
.
format
(
# removed: tags ('{}', {})
db
.
getName
(),
regTableName
,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
ds
.
getNextTick
()
,
nextInt
)
self
.
execWtSql
(
wt
,
sql
)
nextTick
,
nextInt
)
dbc
.
execute
(
sql
)
# Successfully wrote the data into the DB, let's record it
# somehow
te
.
recordDataMark
(
nextInt
)
...
...
@@ -2110,6 +2264,27 @@ class TaskAddData(StateTransitionTask):
nextInt
,
regTableName
))
self
.
fAddLogDone
.
flush
()
os
.
fsync
(
self
.
fAddLogDone
)
# Now read it back and verify, we might encounter an error if table is dropped
if
gConfig
.
verify_data
:
# only if command line asks for it
try
:
readBack
=
dbc
.
queryScalar
(
"SELECT speed from {}.{} WHERE ts= '{}'"
.
format
(
db
.
getName
(),
regTableName
,
nextTick
))
if
readBack
!=
nextInt
:
raise
taos
.
error
.
ProgrammingError
(
"Failed to read back same data, wrote: {}, read: {}"
.
format
(
nextInt
,
readBack
),
0x999
)
except
taos
.
error
.
ProgrammingError
as
err
:
errno
=
Helper
.
convertErrno
(
err
.
errno
)
if
errno
in
[
0x991
,
0x992
]
:
# not a single result
raise
taos
.
error
.
ProgrammingError
(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.
format
(
nextTick
,
nextInt
,
"Empty Result"
if
errno
==
0x991
else
"Multiple Result"
),
errno
)
# Re-throw no matter what
raise
self
.
activeTable
.
discard
(
i
)
# not raising an error, unlike remove
...
...
@@ -2178,7 +2353,7 @@ class SvcManager:
self
.
inSigHandler
=
False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self
.
svcMgrThread
=
None
self
.
svcMgrThread
=
None
# type: ServiceManagerThread
self
.
_lock
=
threading
.
Lock
()
self
.
_isRestarting
=
False
...
...
@@ -2265,13 +2440,12 @@ class SvcManager:
time
.
sleep
(
2.0
)
proc
.
kill
()
# print("Process: {}".format(proc.name()))
self
.
svcMgrThread
=
ServiceManagerThread
()
# create the object
print
(
"Attempting to start TAOS service started, printing out output..."
)
self
.
svcMgrThread
.
start
()
self
.
svcMgrThread
.
procIpcBatch
(
trimToTarget
=
10
,
forceOutput
=
True
)
# for printing 10 lines
self
.
svcMgrThread
.
procIpcBatch
(
trimToTarget
=
10
,
forceOutput
=
True
)
# for printing 10 lines
print
(
"TAOS service started"
)
def
stopTaosService
(
self
,
outputLines
=
20
):
...
...
@@ -2320,7 +2494,7 @@ class ServiceManagerThread:
MAX_QUEUE_SIZE
=
10000
def
__init__
(
self
):
self
.
_tdeSubProcess
=
None
self
.
_tdeSubProcess
=
None
# type: TdeSubProcess
self
.
_thread
=
None
self
.
_status
=
None
...
...
@@ -2351,13 +2525,13 @@ class ServiceManagerThread:
self
.
_tdeSubProcess
.
start
()
self
.
_ipcQueue
=
Queue
()
self
.
_thread
=
threading
.
Thread
(
self
.
_thread
=
threading
.
Thread
(
# First thread captures server OUTPUT
target
=
self
.
svcOutputReader
,
args
=
(
self
.
_tdeSubProcess
.
getStdOut
(),
self
.
_ipcQueue
))
self
.
_thread
.
daemon
=
True
# thread dies with the program
self
.
_thread
.
start
()
self
.
_thread2
=
threading
.
Thread
(
self
.
_thread2
=
threading
.
Thread
(
# 2nd thread captures server ERRORs
target
=
self
.
svcErrorReader
,
args
=
(
self
.
_tdeSubProcess
.
getStdErr
(),
self
.
_ipcQueue
))
self
.
_thread2
.
daemon
=
True
# thread dies with the program
...
...
@@ -2690,40 +2864,39 @@ class ClientManager:
self
.
inSigHandler
=
False
def
_printLastNumbers
(
self
):
# to verify data durability
dbManager
=
DbManager
(
resetDb
=
False
)
dbc
=
dbManager
.
getDbConn
()
if
dbc
.
query
(
"show databases"
)
<=
1
:
# no database (we have a default called "log")
return
dbc
.
execute
(
"use db"
)
if
dbc
.
query
(
"show tables"
)
==
0
:
# no tables
return
sTbName
=
dbManager
.
getFixedSuperTableName
()
# get all regular tables
# TODO: analyze result set later
dbc
.
query
(
"select TBNAME from db.{}"
.
format
(
sTbName
))
rTables
=
dbc
.
getQueryResult
()
bList
=
TaskExecutor
.
BoundedList
()
for
rTbName
in
rTables
:
# regular tables
dbc
.
query
(
"select speed from db.{}"
.
format
(
rTbName
[
0
]))
numbers
=
dbc
.
getQueryResult
()
for
row
in
numbers
:
# print("<{}>".format(n), end="", flush=True)
bList
.
add
(
row
[
0
])
print
(
"Top numbers in DB right now: {}"
.
format
(
bList
))
print
(
"TDengine client execution is about to start in 2 seconds..."
)
time
.
sleep
(
2.0
)
dbManager
=
None
# release?
def
prepare
(
self
):
self
.
_printLastNumbers
()
# TODO: need to revise how we verify data durability
# def _printLastNumbers(self): # to verify data durability
# dbManager = DbManager()
# dbc = dbManager.getDbConn()
# if dbc.query("show databases") <= 1: # no database (we have a default called "log")
# return
# dbc.execute("use db")
# if dbc.query("show tables") == 0: # no tables
# return
# sTbName = dbManager.getFixedSuperTableName()
# # get all regular tables
# # TODO: analyze result set later
# dbc.query("select TBNAME from db.{}".format(sTbName))
# rTables = dbc.getQueryResult()
# bList = TaskExecutor.BoundedList()
# for rTbName in rTables: # regular tables
# dbc.query("select speed from db.{}".format(rTbName[0]))
# numbers = dbc.getQueryResult()
# for row in numbers:
# # print("<{}>".format(n), end="", flush=True)
# bList.add(row[0])
# print("Top numbers in DB right now: {}".format(bList))
# print("TDengine client execution is about to start in 2 seconds...")
# time.sleep(2.0)
# dbManager = None # release?
def
run
(
self
,
svcMgr
):
self
.
_printLastNumbers
()
# self._printLastNumbers()
global
gConfig
dbManager
=
DbManager
()
# Regular function
thPool
=
ThreadPool
(
gConfig
.
num_threads
,
gConfig
.
max_steps
)
...
...
@@ -2734,15 +2907,37 @@ class ClientManager:
# print("TC failed = {}".format(self.tc.isFailed()))
if
svcMgr
:
# gConfig.auto_start_service:
svcMgr
.
stopTaosService
()
svcMgr
=
None
# Print exec status, etc., AFTER showing messages from the server
self
.
conclude
()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
# Linux return code: ref https://shapeshed.com/unix-exit-codes/
return
1
if
self
.
tc
.
isFailed
()
else
0
ret
=
1
if
self
.
tc
.
isFailed
()
else
0
self
.
tc
.
cleanup
()
# Release global variables
gConfig
=
None
gSvcMgr
=
None
logger
=
None
# Release variables here
self
.
tc
=
None
thPool
=
None
dbManager
=
None
gc
.
collect
()
# force garbage collection
# h = hpy()
# print("\n----- Final Python Heap -----\n")
# print(h.heap())
return
ret
def
conclude
(
self
):
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
self
.
tc
.
printStats
()
self
.
tc
.
getDbManager
().
cleanUp
()
class
MainExec
:
STATUS_STARTING
=
1
...
...
@@ -2878,6 +3073,13 @@ def main():
'--auto-start-service'
,
action
=
'store_true'
,
help
=
'Automatically start/stop the TDengine service (default: false)'
)
parser
.
add_argument
(
'-b'
,
'--max-dbs'
,
action
=
'store'
,
default
=
0
,
type
=
int
,
help
=
'Maximum number of DBs to keep, set to disable dropping DB. (default: 0)'
)
parser
.
add_argument
(
'-c'
,
'--connector-type'
,
...
...
@@ -2895,6 +3097,13 @@ def main():
'--run-tdengine'
,
action
=
'store_true'
,
help
=
'Run TDengine service in foreground (default: false)'
)
parser
.
add_argument
(
'-i'
,
'--max-replicas'
,
action
=
'store'
,
default
=
1
,
type
=
int
,
help
=
'Maximum number of replicas to use, when testing against clusters. (default: 1)'
)
parser
.
add_argument
(
'-l'
,
'--larger-data'
,
...
...
@@ -2924,6 +3133,11 @@ def main():
default
=
5
,
type
=
int
,
help
=
'Number of threads to run (default: 10)'
)
parser
.
add_argument
(
'-v'
,
'--verify-data'
,
action
=
'store_true'
,
help
=
'Verify data written in a number of places by reading back (default: false)'
)
parser
.
add_argument
(
'-x'
,
'--continue-on-exception'
,
...
...
tests/pytest/crash_gen/valgrind_taos.supp
0 → 100644
浏览文件 @
82b21d6e
因为 它太大了无法显示 source diff 。你可以改为
查看blob
。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录