提交 5b4e7a6d 编写于 作者: S Steven Li

refactoring, added exec stats, re-orged task/state relationships

上级 24164655
...@@ -24,11 +24,13 @@ import copy ...@@ -24,11 +24,13 @@ import copy
import threading import threading
import random import random
import time
import logging import logging
import datetime import datetime
import textwrap import textwrap
from typing import List from typing import List
from typing import Dict
from util.log import * from util.log import *
from util.dnodes import * from util.dnodes import *
...@@ -158,16 +160,17 @@ class WorkerThread: ...@@ -158,16 +160,17 @@ class WorkerThread:
return self._tc.getDbState().getDbConn().query(sql) return self._tc.getDbState().getDbConn().query(sql)
class ThreadCoordinator: class ThreadCoordinator:
def __init__(self, pool, wd: WorkDispatcher, dbState): def __init__(self, pool, dbState):
self._curStep = -1 # first step is 0 self._curStep = -1 # first step is 0
self._pool = pool self._pool = pool
self._wd = wd # self._wd = wd
self._te = None # prepare for every new step self._te = None # prepare for every new step
self._dbState = dbState self._dbState = dbState
self._executedTasks: List[Task] = [] # in a given step self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things self._lock = threading.RLock() # sync access for a few things
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats()
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
...@@ -184,7 +187,8 @@ class ThreadCoordinator: ...@@ -184,7 +187,8 @@ class ThreadCoordinator:
# Coordinate all threads step by step # Coordinate all threads step by step
self._curStep = -1 # not started yet self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore maxSteps = gConfig.max_steps # type: ignore
while(self._curStep < maxSteps): startTime = time.time()
while(self._curStep < maxSteps-1): # maxStep==10, last curStep should be 9
print(".", end="", flush=True) print(".", end="", flush=True)
logger.debug("Main thread going to sleep") logger.debug("Main thread going to sleep")
...@@ -218,6 +222,8 @@ class ThreadCoordinator: ...@@ -218,6 +222,8 @@ class ThreadCoordinator:
self._pool.joinAll() # Get all threads to finish self._pool.joinAll() # Get all threads to finish
logger.info("All threads finished") logger.info("All threads finished")
self._execStats.logStats()
logger.info("Total Execution Time (task busy time, plus Python overhead): {:.2f} seconds".format(time.time() - startTime))
print("\r\nFinished") print("\r\nFinished")
def tapAllThreads(self): # in a deterministic manner def tapAllThreads(self): # in a deterministic manner
...@@ -241,11 +247,15 @@ class ThreadCoordinator: ...@@ -241,11 +247,15 @@ class ThreadCoordinator:
raise RuntimeError("Cannot fetch task when not running") raise RuntimeError("Cannot fetch task when not running")
# return self._wd.pickTask() # return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task # Alternatively, let's ask the DbState for the appropriate task
dbState = self.getDbState() # dbState = self.getDbState()
tasks = dbState.getTasksAtState() # tasks = dbState.getTasksAtState() # TODO: create every time?
i = Dice.throw(len(tasks)) # nTasks = len(tasks)
# return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # i = Dice.throw(nTasks)
return tasks[i].clone() # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary?
taskType = self.getDbState().pickTaskType() # pick a task type for current state
return taskType(self.getDbState(), self._execStats) # create a task from it
def resetExecutedTasks(self): def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread self._executedTasks = [] # should be under single thread
...@@ -261,7 +271,7 @@ class ThreadPool: ...@@ -261,7 +271,7 @@ class ThreadPool:
self.maxSteps = maxSteps self.maxSteps = maxSteps
self.funcSequencer = funcSequencer self.funcSequencer = funcSequencer
# Internal class variables # Internal class variables
self.dispatcher = WorkDispatcher(dbState) # self.dispatcher = WorkDispatcher(dbState) # Obsolete?
self.curStep = 0 self.curStep = 0
self.threadList = [] self.threadList = []
# self.stepGate = threading.Condition() # Gate to hold/sync all threads # self.stepGate = threading.Condition() # Gate to hold/sync all threads
...@@ -409,17 +419,19 @@ class DbConn: ...@@ -409,17 +419,19 @@ class DbConn:
# State of the database as we believe it to be # State of the database as we believe it to be
class DbState(): class DbState():
STATE_INVALID = -1 STATE_INVALID = -1
STATE_EMPTY = 1 # nothing there, no even a DB STATE_EMPTY = 0 # nothing there, no even a DB
STATE_DB_ONLY = 2 # we have a DB, but nothing else STATE_DB_ONLY = 1 # we have a DB, but nothing else
STATE_TABLE_ONLY = 3 # we have a table, but totally empty STATE_TABLE_ONLY = 2 # we have a table, but totally empty
STATE_HAS_DATA = 4 # we have some data in the table STATE_HAS_DATA = 3 # we have some data in the table
def __init__(self): def __init__(self):
self.tableNumQueue = LinearQueue() self.tableNumQueue = LinearQueue()
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
self._lastInt = 0 # next one is initial integer self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock() self._lock = threading.RLock()
self._state = self.STATE_INVALID self._state = self.STATE_INVALID
self._stateWeights = [1,3,5,10]
# self.openDbServerConnection() # self.openDbServerConnection()
self._dbConn = DbConn() self._dbConn = DbConn()
...@@ -478,25 +490,56 @@ class DbState(): ...@@ -478,25 +490,56 @@ class DbState():
def cleanUp(self): def cleanUp(self):
self._dbConn.close() self._dbConn.close()
def getTasksAtState(self): def getTaskTypesAtState(self):
tasks = [] allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
tasks.append(ReadFixedDataTask(self)) # always for everybody taskTypes = []
if ( self._state == self.STATE_EMPTY ): for tc in allTaskClasses:
tasks.append(CreateDbTask(self)) # t = tc(self) # create task object
tasks.append(CreateFixedTableTask(self)) if tc.canBeginFrom(self._state):
elif ( self._state == self.STATE_DB_ONLY ): taskTypes.append(tc)
tasks.append(DropDbTask(self)) if len(taskTypes) <= 0:
tasks.append(CreateFixedTableTask(self)) raise RuntimeError("No suitable task types found for state: {}".format(self._state))
tasks.append(AddFixedDataTask(self)) return taskTypes
elif ( self._state == self.STATE_TABLE_ONLY ):
tasks.append(DropFixedTableTask(self)) # tasks.append(ReadFixedDataTask(self)) # always for everybody
tasks.append(AddFixedDataTask(self)) # if ( self._state == self.STATE_EMPTY ):
elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust # tasks.append(CreateDbTask(self))
tasks.append(DropFixedTableTask(self)) # tasks.append(CreateFixedTableTask(self))
tasks.append(AddFixedDataTask(self)) # elif ( self._state == self.STATE_DB_ONLY ):
# tasks.append(DropDbTask(self))
# tasks.append(CreateFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# elif ( self._state == self.STATE_TABLE_ONLY ):
# tasks.append(DropFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
# tasks.append(DropFixedTableTask(self))
# tasks.append(AddFixedDataTask(self))
# else:
# raise RuntimeError("Unexpected DbState state: {}".format(self._state))
# return tasks
def pickTaskType(self):
taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
weights = []
for tt in taskTypes:
endState = tt.getEndState()
if endState != None :
weights.append(self._stateWeights[endState]) # TODO: change to a method
else: else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state)) weights.append(10) # read data task, default to 10: TODO: change to a constant
return tasks i = self._weighted_choice_sub(weights)
logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i]
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
return i
def transition(self, tasks): def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
...@@ -646,7 +689,7 @@ class Task(): ...@@ -646,7 +689,7 @@ class Task():
cls.taskSn += 1 cls.taskSn += 1
return cls.taskSn return cls.taskSn
def __init__(self, dbState: DbState): def __init__(self, dbState: DbState, execStats: ExecutionStats):
self._dbState = dbState self._dbState = dbState
self._workerThread = None self._workerThread = None
self._err = None self._err = None
...@@ -656,11 +699,13 @@ class Task(): ...@@ -656,11 +699,13 @@ class Task():
# Assign an incremental task serial number # Assign an incremental task serial number
self._taskNum = self.allocTaskNum() self._taskNum = self.allocTaskNum()
self._execStats = execStats
def isSuccess(self): def isSuccess(self):
return self._err == None return self._err == None
def clone(self): def clone(self): # TODO: why do we need this again?
newTask = self.__class__(self._dbState) newTask = self.__class__(self._dbState, self._execStats)
return newTask return newTask
def logDebug(self, msg): def logDebug(self, msg):
...@@ -681,6 +726,7 @@ class Task(): ...@@ -681,6 +726,7 @@ class Task():
self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
self._err = None self._err = None
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
...@@ -689,39 +735,164 @@ class Task(): ...@@ -689,39 +735,164 @@ class Task():
except: except:
self.logDebug("[=]Unexpected exception") self.logDebug("[=]Unexpected exception")
raise raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
def execSql(self, sql): def execSql(self, sql):
return self._dbState.execute(sql) return self._dbState.execute(sql)
class CreateDbTask(Task):
class ExecutionStats:
def __init__(self):
self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
self._tasksInProgress = 0
self._lock = threading.Lock()
self._firstTaskStartTime = None
self._accRunTime = 0.0 # accumulated run time
def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
if klassName not in self._execTimes:
self._execTimes[klassName] = [0, 0]
t = self._execTimes[klassName] # tuple for the data
t[0] += 1 # index 0 has the "total" execution times
if isSuccess:
t[1] += 1 # index 1 has the "success" execution times
def beginTaskType(self, klassName):
with self._lock:
if self._tasksInProgress == 0 : # starting a new round
self._firstTaskStartTime = time.time() # I am now the first task
self._tasksInProgress += 1
def endTaskType(self, klassName, isSuccess):
with self._lock:
self._tasksInProgress -= 1
if self._tasksInProgress == 0 : # all tasks have stopped
self._accRunTime += (time.time() - self._firstTaskStartTime)
self._firstTaskStartTime = None
def logStats(self):
logger.info("Logging task execution stats (success/total times)...")
execTimesAny = 0
for k, n in self._execTimes.items():
execTimesAny += n[1]
logger.info(" {0:<24}: {1}/{2}".format(k,n[1],n[0]))
logger.info("Total Tasks Executed (success or not): {} ".format(execTimesAny))
logger.info("Total Tasks In Progress at End: {}".format(self._tasksInProgress))
logger.info("Total Task Busy Time (elapsed time when any task is in progress): {:.2f} seconds".format(self._accRunTime))
class StateTransitionTask(Task):
# @classmethod
# def getAllTaskClasses(cls): # static
# return cls.__subclasses__()
@classmethod
def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected")
@classmethod
def getBeginStates(cls):
return cls.getInfo()[0]
@classmethod
def getEndState(cls):
return cls.getInfo()[1]
@classmethod
def canBeginFrom(cls, state):
return state in cls.getBeginStates()
def execute(self, wt: WorkerThread):
super().execute(wt)
class CreateDbTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
[DbState.STATE_EMPTY], # can begin from
DbState.STATE_DB_ONLY # end state
]
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db") wt.execSql("create database db")
class DropDbTask(Task): class DropDbTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
[DbState.STATE_DB_ONLY, DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
DbState.STATE_EMPTY
]
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db") wt.execSql("drop database db")
class CreateTableTask(Task): class CreateFixedTableTask(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): @classmethod
tIndex = self._dbState.addTable() def getInfo(cls):
self.logDebug("Creating a table {} ...".format(tIndex)) return [
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) [DbState.STATE_DB_ONLY],
self.logDebug("Table {} created.".format(tIndex)) DbState.STATE_TABLE_ONLY
self._dbState.releaseTable(tIndex) ]
class CreateFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName() tblName = self._dbState.getFixedTableName()
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
class ReadFixedDataTask(Task): class ReadFixedDataTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
None # meaning doesn't affect state
]
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName() tblName = self._dbState.getFixedTableName()
self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
# tdSql.query(" cars where tbname in ('carzero', 'carone')") # tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropFixedTableTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
DbState.STATE_DB_ONLY # meaning doesn't affect state
]
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("drop table db.{}".format(tblName))
class AddFixedDataTask(StateTransitionTask):
@classmethod
def getInfo(cls):
return [
[DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
DbState.STATE_HAS_DATA
]
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
#---------- Non State-Transition Related Tasks ----------#
class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tIndex = self._dbState.addTable()
self.logDebug("Creating a table {} ...".format(tIndex))
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
self.logDebug("Table {} created.".format(tIndex))
self._dbState.releaseTable(tIndex)
class DropTableTask(Task): class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete() tableName = self._dbState.getTableNameToDelete()
...@@ -731,10 +902,7 @@ class DropTableTask(Task): ...@@ -731,10 +902,7 @@ class DropTableTask(Task):
self.logInfo("Dropping a table db.{} ...".format(tableName)) self.logInfo("Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table db.{}".format(tableName)) wt.execSql("drop table db.{}".format(tableName))
class DropFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("drop table db.{}".format(tblName))
class AddDataTask(Task): class AddDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
...@@ -750,11 +918,6 @@ class AddDataTask(Task): ...@@ -750,11 +918,6 @@ class AddDataTask(Task):
ds.releaseTable(tIndex) ds.releaseTable(tIndex)
self.logDebug("Finished adding data") self.logDebug("Finished adding data")
class AddFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
# Deterministic random number generator # Deterministic random number generator
class Dice(): class Dice():
...@@ -789,28 +952,28 @@ class Dice(): ...@@ -789,28 +952,28 @@ class Dice():
# Anyone needing to carry out work should simply come here # Anyone needing to carry out work should simply come here
class WorkDispatcher(): # class WorkDispatcher():
def __init__(self, dbState): # def __init__(self, dbState):
# self.totalNumMethods = 2 # # self.totalNumMethods = 2
self.tasks = [ # self.tasks = [
CreateTableTask(dbState), # # CreateTableTask(dbState), # Obsolete
DropTableTask(dbState), # # DropTableTask(dbState),
AddDataTask(dbState), # # AddDataTask(dbState),
] # ]
def throwDice(self): # def throwDice(self):
max = len(self.tasks) - 1 # max = len(self.tasks) - 1
dRes = random.randint(0, max) # dRes = random.randint(0, max)
# logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) # # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
return dRes # return dRes
def pickTask(self): # def pickTask(self):
dice = self.throwDice() # dice = self.throwDice()
return self.tasks[dice] # return self.tasks[dice]
def doWork(self, workerThread): # def doWork(self, workerThread):
task = self.pickTask() # task = self.pickTask()
task.execute(workerThread) # task.execute(workerThread)
def main(): def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
...@@ -839,7 +1002,7 @@ def main(): ...@@ -839,7 +1002,7 @@ def main():
sys.exit() sys.exit()
global logger global logger
logger = logging.getLogger('myApp') logger = logging.getLogger('CrashGen')
if ( gConfig.debug ): if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO logger.setLevel(logging.DEBUG) # default seems to be INFO
ch = logging.StreamHandler() ch = logging.StreamHandler()
...@@ -849,7 +1012,7 @@ def main(): ...@@ -849,7 +1012,7 @@ def main():
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator( tc = ThreadCoordinator(
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0),
WorkDispatcher(dbState), # WorkDispatcher(dbState), # Obsolete?
dbState dbState
) )
tc.run() tc.run()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册