Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6768ae81
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6768ae81
编写于
4月 28, 2020
作者:
S
Steven Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Now each WorkerThread provides its own SQL execution, but still using a shared db connection
上级
884e7fe6
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
141 addition
and
113 deletion
+141
-113
src/connector/python/linux/python3/taos/cursor.py
src/connector/python/linux/python3/taos/cursor.py
+9
-0
tests/pytest/random_walk.py
tests/pytest/random_walk.py
+132
-113
未找到文件。
src/connector/python/linux/python3/taos/cursor.py
浏览文件 @
6768ae81
from
.cinterface
import
CTaosInterface
from
.error
import
*
querySeqNum
=
0
class
TDengineCursor
(
object
):
"""Database cursor which is used to manage the context of a fetch operation.
...
...
@@ -109,7 +111,14 @@ class TDengineCursor(object):
if
params
is
not
None
:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
res
=
CTaosInterface
.
query
(
self
.
_connection
.
_conn
,
stmt
)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if
res
==
0
:
if
CTaosInterface
.
fieldsCount
(
self
.
_connection
.
_conn
)
==
0
:
self
.
_affected_rows
+=
CTaosInterface
.
affectedRows
(
self
.
_connection
.
_conn
)
...
...
tests/pytest/random_walk.py
浏览文件 @
6768ae81
...
...
@@ -62,26 +62,35 @@ def runThread(workerThread):
class
WorkerThread
:
def
__init__
(
self
,
pool
,
tid
):
# note: main thread context!
def
__init__
(
self
,
pool
,
tid
,
dbState
):
# note: main thread context!
self
.
curStep
=
-
1
self
.
pool
=
pool
self
.
tid
=
tid
self
.
dbState
=
dbState
# self.threadIdent = threading.get_ident()
self
.
thread
=
threading
.
Thread
(
target
=
runThread
,
args
=
(
self
,))
self
.
stepGate
=
threading
.
Event
()
# Let us have a DB connection of our own
self
.
_dbConn
=
DbConn
()
def
start
(
self
):
self
.
thread
.
start
()
# AFTER the thread is recorded
def
run
(
self
):
# initialization after thread starts, in the thread context
# self.isSleeping = False
self
.
_dbConn
.
open
()
while
self
.
curStep
<
self
.
pool
.
maxSteps
:
# stepNo = self.pool.waitForStep() # Step to run
self
.
crossStepGate
()
# self.curStep will get incremented
self
.
doWork
()
# clean up
self
.
_dbConn
.
close
()
def
verifyThreadSelf
(
self
):
# ensure we are called by this own thread
if
(
threading
.
get_ident
()
!=
self
.
thread
.
ident
):
raise
RuntimeError
(
"Unexpectly called from other threads"
)
...
...
@@ -136,7 +145,10 @@ class WorkerThread:
def
doWork
(
self
):
logger
.
info
(
" Step {}, thread {}: "
.
format
(
self
.
curStep
,
self
.
tid
))
self
.
pool
.
dispatcher
.
doWork
()
self
.
pool
.
dispatcher
.
doWork
(
self
)
def
execSql
(
self
,
sql
):
return
self
.
dbState
.
execSql
(
sql
)
# We define a class to run a number of threads in locking steps.
...
...
@@ -158,10 +170,9 @@ class SteppingThreadPool:
# self.mainGate = threading.Condition()
# starting to run all the threads, in locking steps
def
run
(
self
):
# Create the threads
for
tid
in
range
(
0
,
self
.
numThreads
):
workerThread
=
WorkerThread
(
self
,
tid
)
def
run
(
self
):
for
tid
in
range
(
0
,
self
.
numThreads
):
# Create the threads
workerThread
=
WorkerThread
(
self
,
tid
,
dbState
)
self
.
threadList
.
append
(
workerThread
)
workerThread
.
start
()
# start, but should block immediately before step 0
...
...
@@ -169,15 +180,10 @@ class SteppingThreadPool:
self
.
curStep
=
-
1
# not started yet
while
(
self
.
curStep
<
self
.
maxSteps
):
logger
.
debug
(
"Main thread going to sleep"
)
# self.mainGate.acquire()
# self.mainGate.wait() # start snoozing
# self.mainGate.release
self
.
crossPoolBarrier
()
self
.
barrier
.
reset
()
# Other worker threads should now be at the "gate"
self
.
barrier
.
reset
()
# Other worker threads should now be at the "gate"
logger
.
debug
(
"Main thread waking up, tapping worker threads"
.
format
(
self
.
curStep
))
# Now not all threads had time to go to sleep
# time.sleep(0.01) # This is like forever
self
.
tapAllThreads
()
# The threads will run through many steps
...
...
@@ -192,49 +198,8 @@ class SteppingThreadPool:
self
.
curStep
+=
1
# we are about to get into next step. TODO: race condition here!
logger
.
debug
(
" "
)
# line break
logger
.
debug
(
"--> Step {} starts with main thread waking up"
.
format
(
self
.
curStep
))
# Now not all threads had time to go to sleep
self
.
barrier
.
wait
()
# allThreadWaiting = False
# with self.lock:
# self.numWaitingThreads += 1
# if ( self.numWaitingThreads == self.numThreads ):
# allThreadWaiting = True
# if (allThreadWaiting): # aha, pass the baton to the main thread
# logger.debug("All threads are now waiting")
# self.numWaitingThreads = 0 # do this 1st to avoid race condition
# # time.sleep(0.001) # thread yield, so main thread can be ready
# self.mainGate.acquire()
# self.mainGate.notify() # main thread would now start to run
# self.mainGate.release()
# time.sleep(0) # yield, maybe main thread can run for just a bit
# def waitForStep(self):
# shouldWait = True;
# with self.lock:
# # if ( self.numWaitingThreads == 0 ): # first one here
# # self.stepGate.acquire() # acquire the underlying lock
# self.numWaitingThreads += 1
# # if ( self.numWaitingThreads < self.numThreads ):
# # do nothing, we should wait
# if ( self.numWaitingThreads == self.numThreads ):
# shouldWait = False # we should now wake up
# elif ( self.numWaitingThreads > self.numThreads ):
# raise RuntimeError("Corrupt state")
# self.stepGate.acquire()
# if (shouldWait):
# self.stepGate.wait()
# else:
# self.numWaitingThreads = 0 # fresh start
# self.curStep += 1 # do this before letting all threads loose
# print("--> Starting step {}".format(self.curStep), end="\r\n") # before notify_all
# # self.stepGate.notify_all()
# self.wakeUpAll()
# self.stepGate.release()
# return self.curStep
def
tapAllThreads
(
self
):
# in a deterministic manner
wakeSeq
=
[]
...
...
@@ -254,23 +219,32 @@ class LinearQueue():
def
__init__
(
self
):
self
.
firstIndex
=
1
# 1st ever element
self
.
lastIndex
=
0
self
.
lock
=
threading
.
RLock
()
# our functions may call each other
self
.
_
lock
=
threading
.
RLock
()
# our functions may call each other
self
.
inUse
=
set
()
# the indexes that are in use right now
def
push
(
self
):
# Push to the tail (largest)
with
self
.
lock
:
if
(
self
.
firstIndex
>
self
.
lastIndex
):
# impossible, meaning it's empty
self
.
lastIndex
=
self
.
firstIndex
return
self
.
firstIndex
def
toText
(
self
):
return
"[{}..{}], in use: {}"
.
format
(
self
.
firstIndex
,
self
.
lastIndex
,
self
.
inUse
)
# Push (add new element, largest) to the tail, and mark it in use
def
push
(
self
):
with
self
.
_lock
:
# if ( self.isEmpty() ):
# self.lastIndex = self.firstIndex
# return self.firstIndex
# Otherwise we have something
self
.
lastIndex
+=
1
self
.
allocate
(
self
.
lastIndex
)
# self.inUse.add(self.lastIndex) # mark it in use immediately
return
self
.
lastIndex
def
pop
(
self
):
with
self
.
lock
:
with
self
.
_
lock
:
if
(
self
.
isEmpty
()
):
raise
RuntimeError
(
"Cannot pop an empty queue"
)
index
=
self
.
firstIndex
if
(
index
in
self
.
inUse
):
self
.
inUse
.
remove
(
index
)
# TODO: what about discard?
self
.
firstIndex
+=
1
return
index
...
...
@@ -278,113 +252,155 @@ class LinearQueue():
return
self
.
firstIndex
>
self
.
lastIndex
def
popIfNotEmpty
(
self
):
with
self
.
lock
:
with
self
.
_
lock
:
if
(
self
.
isEmpty
()):
return
0
return
self
.
pop
()
def
allocate
(
self
,
i
):
with
self
.
lock
:
with
self
.
_
lock
:
if
(
i
in
self
.
inUse
):
raise
RuntimeError
(
"Cannot re-use same index in queue: {}"
.
format
(
i
))
self
.
inUse
.
add
(
i
)
def
release
(
self
,
i
):
with
self
.
lock
:
with
self
.
_
lock
:
self
.
inUse
.
remove
(
i
)
# KeyError possible
def
size
(
self
):
return
self
.
lastIndex
+
1
-
self
.
firstIndex
def
pickAndAllocate
(
self
):
with
self
.
lock
:
if
(
self
.
isEmpty
()
):
return
None
with
self
.
_lock
:
cnt
=
0
# counting the interations
while
True
:
cnt
+=
1
if
(
cnt
>
self
.
size
()
*
10
):
# 10x iteration already
raise
RuntimeError
(
"Failed to allocate LinearQueue element"
)
# raise RuntimeError("Failed to allocate LinearQueue element")
return
None
ret
=
Dice
.
throwRange
(
self
.
firstIndex
,
self
.
lastIndex
+
1
)
if
(
not
ret
in
self
.
inUse
):
return
self
.
allocate
(
ret
)
self
.
allocate
(
ret
)
return
ret
class
DbConn
:
def
__init__
(
self
):
self
.
isOpen
=
False
def
open
(
self
):
# Open connection
if
(
self
.
isOpen
):
raise
RuntimeError
(
"Cannot re-open an existing DB connection"
)
cfgPath
=
"../../build/test/cfg"
conn
=
taos
.
connect
(
host
=
"127.0.0.1"
,
config
=
cfgPath
)
# TODO: make configurable
self
.
_tdSql
=
TDSql
()
self
.
_tdSql
.
init
(
conn
.
cursor
())
self
.
isOpen
=
True
def
resetDb
(
self
):
# reset the whole database, etc.
if
(
not
self
.
isOpen
):
raise
RuntimeError
(
"Cannot reset database until connection is open"
)
self
.
_tdSql
.
prepare
()
# Recreate database, etc.
# tdSql.execute('show databases')
def
close
(
self
):
if
(
not
self
.
isOpen
):
raise
RuntimeError
(
"Cannot clean up database until connection is open"
)
self
.
_tdSql
.
close
()
self
.
isOpen
=
False
def
execSql
(
self
,
sql
):
return
self
.
_tdSql
.
execute
(
sql
)
# State of the database as we believe it to be
class
DbState
():
def
__init__
(
self
):
self
.
tableNumQueue
=
LinearQueue
()
self
.
tick
=
datetime
.
datetime
(
2019
,
1
,
1
)
# initial date time tick
self
.
int
=
0
# initial integer
self
.
openDbServerConnection
()
self
.
lock
=
threading
.
RLock
()
self
.
_lastTick
=
datetime
.
datetime
(
2019
,
1
,
1
)
# initial date time tick
self
.
_lastInt
=
0
# next one is initial integer
self
.
_lock
=
threading
.
RLock
()
# self.openDbServerConnection()
self
.
_dbConn
=
DbConn
()
self
.
_dbConn
.
open
()
self
.
_dbConn
.
resetDb
()
# drop and recreate DB
def
pickAndAllocateTable
(
self
):
# pick any table, and "use" it
return
self
.
tableNumQueue
.
pickAndAllocate
()
def
addTable
(
self
):
with
self
.
_lock
:
tIndex
=
self
.
tableNumQueue
.
push
()
return
tIndex
def
releaseTable
(
self
,
i
):
# return the table back, so others can use it
self
.
tableNumQueue
.
release
(
i
)
def
getNextTick
(
self
):
with
self
.
lock
:
# prevent duplicate tick
self
.
t
ick
+=
datetime
.
timedelta
(
0
,
1
)
# add one second to it
return
self
.
t
ick
with
self
.
_
lock
:
# prevent duplicate tick
self
.
_lastT
ick
+=
datetime
.
timedelta
(
0
,
1
)
# add one second to it
return
self
.
_lastT
ick
def
getNextInt
(
self
):
with
self
.
lock
:
self
.
int
+=
1
return
self
.
int
def
openDbServerConnection
(
self
):
cfgPath
=
"../../build/test/cfg"
# was: tdDnodes.getSimCfgPath()
conn
=
taos
.
connect
(
host
=
"127.0.0.1"
,
config
=
cfgPath
)
# TODO: make configurable
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
prepare
()
# Recreate database, etc.
# tdSql.execute('show databases')
def
closeDbServerConnection
(
self
):
tdSql
.
close
()
tdLog
.
info
(
"Disconnecting from database server"
)
def
getTableNameToCreate
(
self
):
tblNum
=
self
.
tableNumQueue
.
push
()
return
"table_{}"
.
format
(
tblNum
)
with
self
.
_lock
:
self
.
_lastInt
+=
1
return
self
.
_lastInt
def
getTableNameToDelete
(
self
):
if
self
.
tableNumQueue
.
isEmpty
():
return
False
tblNum
=
self
.
tableNumQueue
.
pop
()
# TODO: race condition!
return
"table_{}"
.
format
(
tblNum
)
def
execSql
(
self
,
sql
):
# using the main DB connection
return
self
.
_dbConn
.
execSql
(
sql
)
def
cleanUp
(
self
):
self
.
_dbConn
.
close
()
class
Task
():
def
__init__
(
self
,
dbState
):
self
.
dbState
=
dbState
def
execute
(
self
):
def
execute
(
self
,
workerThread
):
raise
RuntimeError
(
"Must be overriden by child class"
)
def
execSql
(
self
,
sql
):
return
self
.
dbState
.
execute
(
sql
)
class
CreateTableTask
(
Task
):
def
execute
(
self
):
tableName
=
dbState
.
getTableNameToCreate
()
logger
.
info
(
" Creating a table {} ..."
.
format
(
tableName
))
tdSql
.
execute
(
"create table {} (ts timestamp, speed int)"
.
format
(
tableName
))
def
execute
(
self
,
wt
):
tIndex
=
dbState
.
addTable
()
logger
.
debug
(
" Creating a table {} ..."
.
format
(
tIndex
))
wt
.
execSql
(
"create table table_{} (ts timestamp, speed int)"
.
format
(
tIndex
))
logger
.
debug
(
" Table {} created."
.
format
(
tIndex
))
dbState
.
releaseTable
(
tIndex
)
class
DropTableTask
(
Task
):
def
execute
(
self
):
def
execute
(
self
,
wt
):
tableName
=
dbState
.
getTableNameToDelete
()
if
(
not
tableName
):
# May be "False"
logger
.
info
(
"Cannot generate a table to delete, skipping..."
)
logger
.
info
(
"
Cannot generate a table to delete, skipping..."
)
return
logger
.
info
(
" Dropping a table {} ..."
.
format
(
tableName
))
tdSql
.
execute
(
"drop table {}"
.
format
(
tableName
))
wt
.
execSql
(
"drop table {}"
.
format
(
tableName
))
class
AddDataTask
(
Task
):
def
execute
(
self
):
logger
.
info
(
" Adding some data..."
)
def
execute
(
self
,
wt
):
ds
=
self
.
dbState
tIndex
=
ds
.
pickTable
()
tdSql
.
execute
(
"insert into table_{} values ('{}', {});"
.
format
(
tIndex
,
ds
.
getNextTick
(),
ds
.
getNextInt
()))
ds
.
r
logger
.
info
(
" Adding some data... numQueue={}"
.
format
(
ds
.
tableNumQueue
.
toText
()))
tIndex
=
ds
.
pickAndAllocateTable
()
if
(
tIndex
==
None
):
logger
.
info
(
" No table found to add data, skipping..."
)
return
sql
=
"insert into table_{} values ('{}', {});"
.
format
(
tIndex
,
ds
.
getNextTick
(),
ds
.
getNextInt
())
logger
.
debug
(
" Executing SQL: {}"
.
format
(
sql
))
wt
.
execSql
(
sql
)
ds
.
releaseTable
(
tIndex
)
logger
.
debug
(
" Finished adding data"
)
# Deterministic random number generator
class
Dice
():
...
...
@@ -430,12 +446,15 @@ class WorkDispatcher():
]
def
throwDice
(
self
):
return
random
.
randint
(
0
,
len
(
self
.
tasks
)
-
1
)
max
=
len
(
self
.
tasks
)
-
1
dRes
=
random
.
randint
(
0
,
max
)
# logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
return
dRes
def
doWork
(
self
):
def
doWork
(
self
,
workerThread
):
dice
=
self
.
throwDice
()
task
=
self
.
tasks
[
dice
]
task
.
execute
()
task
.
execute
(
workerThread
)
if
__name__
==
"__main__"
:
logger
=
logging
.
getLogger
(
'myApp'
)
...
...
@@ -445,8 +464,8 @@ if __name__ == "__main__":
Dice
.
seed
(
0
)
# initial seeding of dice
dbState
=
DbState
()
threadPool
=
SteppingThreadPool
(
dbState
,
1
,
5
,
0
)
threadPool
=
SteppingThreadPool
(
dbState
,
3
,
5
,
0
)
threadPool
.
run
()
logger
.
info
(
"Finished running thread pool"
)
dbState
.
cl
oseDbServerConnection
()
dbState
.
cl
eanUp
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录