提交 2d76c467 编写于 作者: S Steven Li

Added per-thread-db-connection option, after getting Python argparse

上级 24ad275d
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import sys
import getopt import getopt
import argparse
import threading import threading
import random import random
...@@ -26,11 +27,11 @@ from util.sql import * ...@@ -26,11 +27,11 @@ from util.sql import *
import taos import taos
# Constants
LOGGING_LEVEL = logging.DEBUG
def runThread(workerThread): # Command-line/Environment Configurations
logger.info("Running Thread: {}".format(workerThread.tid)) gConfig = None # will set a bit later
def runThread(workerThread):
workerThread.run() workerThread.run()
# Used by one process to block till another is ready # Used by one process to block till another is ready
...@@ -63,36 +64,42 @@ def runThread(workerThread): ...@@ -63,36 +64,42 @@ def runThread(workerThread):
class WorkerThread: class WorkerThread:
def __init__(self, pool, tid, dbState): # note: main thread context! def __init__(self, pool, tid, dbState): # note: main thread context!
self.curStep = -1 self._curStep = -1
self.pool = pool self._pool = pool
self.tid = tid self._tid = tid
self.dbState = dbState self._dbState = dbState
# self.threadIdent = threading.get_ident() # self.threadIdent = threading.get_ident()
self.thread = threading.Thread(target=runThread, args=(self,)) self._thread = threading.Thread(target=runThread, args=(self,))
self.stepGate = threading.Event() self._stepGate = threading.Event()
# Let us have a DB connection of our own # Let us have a DB connection of our own
self._dbConn = DbConn() if ( gConfig.per_thread_db_connection ):
self._dbConn = DbConn()
def start(self): def start(self):
self.thread.start() # AFTER the thread is recorded self._thread.start() # AFTER the thread is recorded
def run(self): def run(self):
# initialization after thread starts, in the thread context # initialization after thread starts, in the thread context
# self.isSleeping = False # self.isSleeping = False
self._dbConn.open() logger.info("Starting to run thread: {}".format(self._tid))
if ( gConfig.per_thread_db_connection ):
self._dbConn.open()
# self._dbConn.resetDb()
while self.curStep < self.pool.maxSteps: while self._curStep < self._pool.maxSteps:
# stepNo = self.pool.waitForStep() # Step to run # stepNo = self.pool.waitForStep() # Step to run
self.crossStepGate() # self.curStep will get incremented self.crossStepGate() # self.curStep will get incremented
self.doWork() self.doWork()
# clean up # clean up
self._dbConn.close() if ( gConfig.per_thread_db_connection ):
self._dbConn.close()
def verifyThreadSelf(self): # ensure we are called by this own thread def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self.thread.ident ): if ( threading.get_ident() != self._thread.ident ):
raise RuntimeError("Unexpectly called from other threads") raise RuntimeError("Unexpectly called from other threads")
def verifyThreadMain(self): # ensure we are called by the main thread def verifyThreadMain(self): # ensure we are called by the main thread
...@@ -100,7 +107,7 @@ class WorkerThread: ...@@ -100,7 +107,7 @@ class WorkerThread:
raise RuntimeError("Unexpectly called from other threads") raise RuntimeError("Unexpectly called from other threads")
def verifyThreadAlive(self): def verifyThreadAlive(self):
if ( not self.thread.is_alive() ): if ( not self._thread.is_alive() ):
raise RuntimeError("Unexpected dead thread") raise RuntimeError("Unexpected dead thread")
# def verifyIsSleeping(self, isSleeping): # def verifyIsSleeping(self, isSleeping):
...@@ -113,30 +120,30 @@ class WorkerThread: ...@@ -113,30 +120,30 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves self.verifyThreadSelf() # only allowed by ourselves
# self.verifyIsSleeping(False) # has to be awake # self.verifyIsSleeping(False) # has to be awake
logger.debug("Worker thread {} about to cross pool barrier".format(self.tid)) logger.debug("Worker thread {} about to cross pool barrier".format(self._tid))
# self.isSleeping = True # TODO: maybe too early? # self.isSleeping = True # TODO: maybe too early?
self.pool.crossPoolBarrier() # wait for all other threads self._pool.crossPoolBarrier() # wait for all other threads
# Wait again at the "gate", waiting to be "tapped" # Wait again at the "gate", waiting to be "tapped"
logger.debug("Worker thread {} about to cross the step gate".format(self.tid)) logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
# self.stepGate.acquire() # acquire lock immediately # self.stepGate.acquire() # acquire lock immediately
self.stepGate.wait() self._stepGate.wait()
self.stepGate.clear() self._stepGate.clear()
# self.stepGate.release() # release # self.stepGate.release() # release
logger.debug("Worker thread {} woke up".format(self.tid)) logger.debug("Worker thread {} woke up".format(self._tid))
# Someone will wake us up here # Someone will wake us up here
self.curStep += 1 # off to a new step... self._curStep += 1 # off to a new step...
def tapStepGate(self): # give it a tap, release the thread waiting there def tapStepGate(self): # give it a tap, release the thread waiting there
self.verifyThreadAlive() self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread self.verifyThreadMain() # only allowed for main thread
# self.verifyIsSleeping(True) # has to be sleeping # self.verifyIsSleeping(True) # has to be sleeping
logger.debug("Tapping worker thread {}".format(self.tid)) logger.debug("Tapping worker thread {}".format(self._tid))
# self.stepGate.acquire() # self.stepGate.acquire()
# logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid))
self.stepGate.set() # wake up! self._stepGate.set() # wake up!
# logger.debug("Tapping worker thread {}, notified!".format(self.tid)) # logger.debug("Tapping worker thread {}, notified!".format(self.tid))
# self.isSleeping = False # No race condition for sure # self.isSleeping = False # No race condition for sure
# self.stepGate.release() # this finishes before .wait() can return # self.stepGate.release() # this finishes before .wait() can return
...@@ -144,12 +151,15 @@ class WorkerThread: ...@@ -144,12 +151,15 @@ class WorkerThread:
time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release
def doWork(self): def doWork(self):
logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) logger.info(" Step {}, thread {}: ".format(self._curStep, self._tid))
self.pool.dispatcher.doWork(self) self._pool.dispatcher.doWork(self)
def execSql(self, sql): def execSql(self, sql):
return self.dbState.execSql(sql) if ( gConfig.per_thread_db_connection ):
return self._dbConn.execSql(sql)
else:
return self._dbState.getDbConn().execSql(sql)
# We define a class to run a number of threads in locking steps. # We define a class to run a number of threads in locking steps.
class SteppingThreadPool: class SteppingThreadPool:
...@@ -188,7 +198,7 @@ class SteppingThreadPool: ...@@ -188,7 +198,7 @@ class SteppingThreadPool:
# The threads will run through many steps # The threads will run through many steps
for workerThread in self.threadList: for workerThread in self.threadList:
workerThread.thread.join() # slight hack, accessing members workerThread._thread.join() # slight hack, accessing members
logger.info("All threads finished") logger.info("All threads finished")
...@@ -240,10 +250,15 @@ class LinearQueue(): ...@@ -240,10 +250,15 @@ class LinearQueue():
def pop(self): def pop(self):
with self._lock: with self._lock:
if ( self.isEmpty() ): if ( self.isEmpty() ):
raise RuntimeError("Cannot pop an empty queue") # raise RuntimeError("Cannot pop an empty queue")
return False # TODO: None?
index = self.firstIndex index = self.firstIndex
if ( index in self.inUse ): if ( index in self.inUse ):
self.inUse.remove(index) # TODO: what about discard? return False
# if ( index in self.inUse ):
# self.inUse.remove(index) # TODO: what about discard?
self.firstIndex += 1 self.firstIndex += 1
return index return index
...@@ -259,13 +274,15 @@ class LinearQueue(): ...@@ -259,13 +274,15 @@ class LinearQueue():
def allocate(self, i): def allocate(self, i):
with self._lock: with self._lock:
# logger.debug("LQ allocating item {}".format(i))
if ( i in self.inUse ): if ( i in self.inUse ):
raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
self.inUse.add(i) self.inUse.add(i)
def release(self, i): def release(self, i):
with self._lock: with self._lock:
self.inUse.remove(i) # KeyError possible # logger.debug("LQ releasing item {}".format(i))
self.inUse.remove(i) # KeyError possible, TODO: why?
def size(self): def size(self):
return self.lastIndex + 1 - self.firstIndex return self.lastIndex + 1 - self.firstIndex
...@@ -287,6 +304,8 @@ class LinearQueue(): ...@@ -287,6 +304,8 @@ class LinearQueue():
class DbConn: class DbConn:
def __init__(self): def __init__(self):
self._conn = None
self._cursor = None
self.isOpen = False self.isOpen = False
def open(self): # Open connection def open(self): # Open connection
...@@ -294,16 +313,27 @@ class DbConn: ...@@ -294,16 +313,27 @@ class DbConn:
raise RuntimeError("Cannot re-open an existing DB connection") raise RuntimeError("Cannot re-open an existing DB connection")
cfgPath = "../../build/test/cfg" cfgPath = "../../build/test/cfg"
conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db')
# Open connection
self._tdSql = TDSql() self._tdSql = TDSql()
self._tdSql.init(conn.cursor()) self._tdSql.init(self._cursor)
self.isOpen = True self.isOpen = True
def resetDb(self): # reset the whole database, etc. def resetDb(self): # reset the whole database, etc.
if ( not self.isOpen ): if ( not self.isOpen ):
raise RuntimeError("Cannot reset database until connection is open") raise RuntimeError("Cannot reset database until connection is open")
self._tdSql.prepare() # Recreate database, etc. # self._tdSql.prepare() # Recreate database, etc.
self._cursor.execute('drop database if exists db')
self._cursor.execute('create database db')
# self._cursor.execute('use db')
# tdSql.execute('show databases') # tdSql.execute('show databases')
def close(self): def close(self):
...@@ -312,7 +342,9 @@ class DbConn: ...@@ -312,7 +342,9 @@ class DbConn:
self._tdSql.close() self._tdSql.close()
self.isOpen = False self.isOpen = False
def execSql(self, sql): def execSql(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
return self._tdSql.execute(sql) return self._tdSql.execute(sql)
# State of the database as we believe it to be # State of the database as we believe it to be
...@@ -328,6 +360,9 @@ class DbState(): ...@@ -328,6 +360,9 @@ class DbState():
self._dbConn.open() self._dbConn.open()
self._dbConn.resetDb() # drop and recreate DB self._dbConn.resetDb() # drop and recreate DB
def getDbConn(self):
return self._dbConn
def pickAndAllocateTable(self): # pick any table, and "use" it def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate() return self.tableNumQueue.pickAndAllocate()
...@@ -350,9 +385,10 @@ class DbState(): ...@@ -350,9 +385,10 @@ class DbState():
return self._lastInt return self._lastInt
def getTableNameToDelete(self): def getTableNameToDelete(self):
if self.tableNumQueue.isEmpty():
return False
tblNum = self.tableNumQueue.pop() # TODO: race condition! tblNum = self.tableNumQueue.pop() # TODO: race condition!
if ( not tblNum ): # maybe false
return False
return "table_{}".format(tblNum) return "table_{}".format(tblNum)
def execSql(self, sql): # using the main DB connection def execSql(self, sql): # using the main DB connection
...@@ -375,7 +411,7 @@ class CreateTableTask(Task): ...@@ -375,7 +411,7 @@ class CreateTableTask(Task):
def execute(self, wt): def execute(self, wt):
tIndex = dbState.addTable() tIndex = dbState.addTable()
logger.debug(" Creating a table {} ...".format(tIndex)) logger.debug(" Creating a table {} ...".format(tIndex))
wt.execSql("create table table_{} (ts timestamp, speed int)".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
logger.debug(" Table {} created.".format(tIndex)) logger.debug(" Table {} created.".format(tIndex))
dbState.releaseTable(tIndex) dbState.releaseTable(tIndex)
...@@ -385,8 +421,8 @@ class DropTableTask(Task): ...@@ -385,8 +421,8 @@ class DropTableTask(Task):
if ( not tableName ): # May be "False" if ( not tableName ): # May be "False"
logger.info(" Cannot generate a table to delete, skipping...") logger.info(" Cannot generate a table to delete, skipping...")
return return
logger.info(" Dropping a table {} ...".format(tableName)) logger.info(" Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table {}".format(tableName)) wt.execSql("drop table db.{}".format(tableName))
class AddDataTask(Task): class AddDataTask(Task):
def execute(self, wt): def execute(self, wt):
...@@ -396,7 +432,7 @@ class AddDataTask(Task): ...@@ -396,7 +432,7 @@ class AddDataTask(Task):
if ( tIndex == None ): if ( tIndex == None ):
logger.info(" No table found to add data, skipping...") logger.info(" No table found to add data, skipping...")
return return
sql = "insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
logger.debug(" Executing SQL: {}".format(sql)) logger.debug(" Executing SQL: {}".format(sql))
wt.execSql(sql) wt.execSql(sql)
ds.releaseTable(tIndex) ds.releaseTable(tIndex)
...@@ -441,7 +477,7 @@ class WorkDispatcher(): ...@@ -441,7 +477,7 @@ class WorkDispatcher():
# self.totalNumMethods = 2 # self.totalNumMethods = 2
self.tasks = [ self.tasks = [
CreateTableTask(dbState), CreateTableTask(dbState),
# DropTableTask(dbState), DropTableTask(dbState),
AddDataTask(dbState), AddDataTask(dbState),
] ]
...@@ -457,14 +493,24 @@ class WorkDispatcher(): ...@@ -457,14 +493,24 @@ class WorkDispatcher():
task.execute(workerThread) task.execute(workerThread)
if __name__ == "__main__": if __name__ == "__main__":
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
help='Use a single shared db connection (default: false)')
parser.add_argument('-d', '--debug', action='store_true',
help='Turn on DEBUG mode for more logging (default: false)')
gConfig = parser.parse_args()
logger = logging.getLogger('myApp') logger = logging.getLogger('myApp')
logger.setLevel(LOGGING_LEVEL) if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO
ch = logging.StreamHandler() ch = logging.StreamHandler()
logger.addHandler(ch) logger.addHandler(ch)
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
dbState = DbState() dbState = DbState()
threadPool = SteppingThreadPool(dbState, 3, 5, 0) threadPool = SteppingThreadPool(dbState, 5, 10, 0)
threadPool.run() threadPool.run()
logger.info("Finished running thread pool") logger.info("Finished running thread pool")
dbState.cleanUp() dbState.cleanUp()
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg # 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg # 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above # 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
# #
# RUNNING THIS SCRIPT # RUNNING THIS SCRIPT
# #
...@@ -36,5 +37,5 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 ...@@ -36,5 +37,5 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
# Then let us set up the library path so that our compiled SO file can be loaded by Python # Then let us set up the library path so that our compiled SO file can be loaded by Python
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib
# Now we are all let, and let's see if we can find a crash. # Now we are all let, and let's see if we can find a crash. Note we pass all params
./crash_gen.py ./crash_gen.py $@
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册