提交 806a9ae5 编写于 作者: S Steven Li

Commit to replicate TD-430

上级 86b36782
...@@ -74,10 +74,10 @@ class WorkerThread: ...@@ -74,10 +74,10 @@ class WorkerThread:
self._dbConn = DbConn() self._dbConn = DbConn()
def logDebug(self, msg): def logDebug(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg)) logger.info(" TRD[{}] {}".format(self._tid, msg))
def logInfo(self, msg): def logInfo(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg)) logger.info(" TRD[{}] {}".format(self._tid, msg))
def getTaskExecutor(self): def getTaskExecutor(self):
...@@ -106,16 +106,19 @@ class WorkerThread: ...@@ -106,16 +106,19 @@ class WorkerThread:
while True: while True:
tc = self._tc # Thread Coordinator, the overall master tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
# logger.debug("Thread task loop exited barrier...") logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped self.crossStepGate() # then per-thread gate, after being tapped
# logger.debug("Thread task loop exited step gate...") logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning(): if not self._tc.isRunning():
logger.debug("Thread Coordinator not running any more, worker thread now stopping...") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask() task = tc.fetchTask()
logger.debug("[TRD] Worker thread [{}] about to execute task".format(self._tid))
task.execute(self) task.execute(self)
tc.saveExecutedTask(task) tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
def verifyThreadSelf(self): # ensure we are called by this own thread def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ): if ( threading.get_ident() != self._thread.ident ):
...@@ -135,7 +138,7 @@ class WorkerThread: ...@@ -135,7 +138,7 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped" # Wait again at the "gate", waiting to be "tapped"
# logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
self._stepGate.wait() self._stepGate.wait()
self._stepGate.clear() self._stepGate.clear()
...@@ -145,7 +148,7 @@ class WorkerThread: ...@@ -145,7 +148,7 @@ class WorkerThread:
self.verifyThreadAlive() self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread self.verifyThreadMain() # only allowed for main thread
# logger.debug("Tapping worker thread {}".format(self._tid)) logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up! self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit time.sleep(0) # let the released thread run a bit
...@@ -192,8 +195,9 @@ class ThreadCoordinator: ...@@ -192,8 +195,9 @@ class ThreadCoordinator:
self._execStats.startExec() # start the stop watch self._execStats.startExec() # start the stop watch
failed = False failed = False
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9
print(".", end="", flush=True) if not gConfig.debug:
logger.debug("Main thread going to sleep") print(".", end="", flush=True) # print this only if we are not in debug mode
logger.debug("[TRD] Main thread going to sleep")
# Now ready to enter a step # Now ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
...@@ -226,7 +230,7 @@ class ThreadCoordinator: ...@@ -226,7 +230,7 @@ class ThreadCoordinator:
if not failed: # only if not failed if not failed: # only if not failed
self._te = TaskExecutor(self._curStep) self._te = TaskExecutor(self._curStep)
logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads() self.tapAllThreads()
logger.debug("Main thread ready to finish up...") logger.debug("Main thread ready to finish up...")
...@@ -253,7 +257,7 @@ class ThreadCoordinator: ...@@ -253,7 +257,7 @@ class ThreadCoordinator:
wakeSeq.append(i) wakeSeq.append(i)
else: else:
wakeSeq.insert(0, i) wakeSeq.insert(0, i)
logger.info("Waking up threads: {}".format(str(wakeSeq))) logger.info("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value # TODO: set dice seed to a deterministic value
for i in wakeSeq: for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
...@@ -473,7 +477,7 @@ class AnyState: ...@@ -473,7 +477,7 @@ class AnyState:
self._info = self.getInfo() self._info = self.getInfo()
def __str__(self): def __str__(self):
return self._stateNames[self._info[self.STATE_VAL_IDX] - 1] # -1 hack to accomodate the STATE_INVALID case return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
def getInfo(self): def getInfo(self):
raise RuntimeError("Must be overriden by child classes") raise RuntimeError("Must be overriden by child classes")
...@@ -481,6 +485,9 @@ class AnyState: ...@@ -481,6 +485,9 @@ class AnyState:
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
raise RuntimeError("Must be overriden by child classes") raise RuntimeError("Must be overriden by child classes")
def getValIndex(self):
return self._info[self.STATE_VAL_IDX]
def getValue(self): def getValue(self):
return self._info[self.STATE_VAL_IDX] return self._info[self.STATE_VAL_IDX]
def canCreateDb(self): def canCreateDb(self):
...@@ -502,7 +509,7 @@ class AnyState: ...@@ -502,7 +509,7 @@ class AnyState:
if not isinstance(task, cls): if not isinstance(task, cls):
continue continue
if task.isSuccess(): if task.isSuccess():
task.logDebug("Task success found") # task.logDebug("Task success found")
sCnt += 1 sCnt += 1
if ( sCnt >= 2 ): if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
...@@ -669,7 +676,7 @@ class DbState(): ...@@ -669,7 +676,7 @@ class DbState():
else: else:
raise raise
except: except:
print("[=]Unexpected exception") print("[=] Unexpected exception")
raise raise
self._dbConn.resetDb() # drop and recreate DB self._dbConn.resetDb() # drop and recreate DB
self._state = StateEmpty() # initial state, the result of above self._state = StateEmpty() # initial state, the result of above
...@@ -711,15 +718,26 @@ class DbState(): ...@@ -711,15 +718,26 @@ class DbState():
def cleanUp(self): def cleanUp(self):
self._dbConn.close() self._dbConn.close()
def getTaskTypesAtState(self): # May be slow, use cautionsly...
def getTaskTypesAtState(self):
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
taskTypes = [] firstTaskTypes = []
for tc in allTaskClasses: for tc in allTaskClasses:
# t = tc(self) # create task object # t = tc(self) # create task object
if tc.canBeginFrom(self._state): if tc.canBeginFrom(self._state):
taskTypes.append(tc) firstTaskTypes.append(tc)
# now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
taskTypes = firstTaskTypes.copy() # have to have these
for task1 in firstTaskTypes: # each task type gathered so far
endState = task1.getEndState() # figure the end state
if endState == None:
continue
for tc in allTaskClasses: # what task can further begin from there?
if tc.canBeginFrom(endState) and (endState not in firstTaskTypes):
taskTypes.append(tc) # gather it
if len(taskTypes) <= 0: if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._state)) raise RuntimeError("No suitable task types found for state: {}".format(self._state))
return taskTypes return taskTypes
# tasks.append(ReadFixedDataTask(self)) # always for everybody # tasks.append(ReadFixedDataTask(self)) # always for everybody
...@@ -746,7 +764,7 @@ class DbState(): ...@@ -746,7 +764,7 @@ class DbState():
for tt in taskTypes: for tt in taskTypes:
endState = tt.getEndState() endState = tt.getEndState()
if endState != None : if endState != None :
weights.append(self._stateWeights[endState]) # TODO: change to a method weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
else: else:
weights.append(10) # read data task, default to 10: TODO: change to a constant weights.append(10) # read data task, default to 10: TODO: change to a constant
i = self._weighted_choice_sub(weights) i = self._weighted_choice_sub(weights)
...@@ -763,13 +781,17 @@ class DbState(): ...@@ -763,13 +781,17 @@ class DbState():
def _findCurrentState(self): def _findCurrentState(self):
dbc = self._dbConn dbc = self._dbConn
if dbc.query("show databases") == 0 : # no database?! if dbc.query("show databases") == 0 : # no database?!
# logger.debug("Found EMPTY state")
return StateEmpty() return StateEmpty()
dbc.execute("use db") # did not do this when openning connection dbc.execute("use db") # did not do this when openning connection
if dbc.query("show tables") == 0 : # no tables if dbc.query("show tables") == 0 : # no tables
# logger.debug("Found DB ONLY state")
return StateDbOnly() return StateDbOnly()
if dbc.query("SELECT * FROM {}".format(self.getFixedTableName()) ) == 0 : # no data if dbc.query("SELECT * FROM db.{}".format(self.getFixedTableName()) ) == 0 : # no data
# logger.debug("Found TABLE_ONLY state")
return StateTableOnly() return StateTableOnly()
else: else:
# logger.debug("Found HAS_DATA state")
return StateHasData() return StateHasData()
def transition(self, tasks): def transition(self, tasks):
...@@ -828,10 +850,11 @@ class Task(): ...@@ -828,10 +850,11 @@ class Task():
@classmethod @classmethod
def allocTaskNum(cls): def allocTaskNum(cls):
cls.taskSn += 1 Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
return cls.taskSn # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn
def __init__(self, dbState: DbState, execStats: ExecutionStats): def __init__(self, dbState: DbState, execStats: ExecutionStats):
self._dbState = dbState self._dbState = dbState
self._workerThread = None self._workerThread = None
self._err = None self._err = None
...@@ -840,6 +863,7 @@ class Task(): ...@@ -840,6 +863,7 @@ class Task():
# Assign an incremental task serial number # Assign an incremental task serial number
self._taskNum = self.allocTaskNum() self._taskNum = self.allocTaskNum()
# logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats self._execStats = execStats
...@@ -851,10 +875,10 @@ class Task(): ...@@ -851,10 +875,10 @@ class Task():
return newTask return newTask
def logDebug(self, msg): def logDebug(self, msg):
self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def logInfo(self, msg): def logInfo(self, msg):
self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
...@@ -872,10 +896,10 @@ class Task(): ...@@ -872,10 +896,10 @@ class Task():
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
self.logDebug("[=]Taos Execution exception: {0}".format(err)) self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
self._err = err self._err = err
except: except:
self.logDebug("[=]Unexpected exception") self.logDebug("[=] Unexpected exception")
raise raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
...@@ -962,7 +986,7 @@ class StateTransitionTask(Task): ...@@ -962,7 +986,7 @@ class StateTransitionTask(Task):
# return cls.getInfo()[0] # return cls.getInfo()[0]
@classmethod @classmethod
def getEndState(cls): def getEndState(cls): # returning the class name
return cls.getInfo()[0] return cls.getInfo()[0]
@classmethod @classmethod
...@@ -980,7 +1004,7 @@ class CreateDbTask(StateTransitionTask): ...@@ -980,7 +1004,7 @@ class CreateDbTask(StateTransitionTask):
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_EMPTY], # can begin from # [AnyState.STATE_EMPTY], # can begin from
AnyState.STATE_DB_ONLY # end state StateDbOnly() # end state
] ]
@classmethod @classmethod
...@@ -995,7 +1019,7 @@ class DropDbTask(StateTransitionTask): ...@@ -995,7 +1019,7 @@ class DropDbTask(StateTransitionTask):
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
AnyState.STATE_EMPTY StateEmpty()
] ]
@classmethod @classmethod
...@@ -1010,7 +1034,7 @@ class CreateFixedTableTask(StateTransitionTask): ...@@ -1010,7 +1034,7 @@ class CreateFixedTableTask(StateTransitionTask):
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_DB_ONLY], # [AnyState.STATE_DB_ONLY],
AnyState.STATE_TABLE_ONLY StateTableOnly()
] ]
@classmethod @classmethod
...@@ -1043,7 +1067,7 @@ class DropFixedTableTask(StateTransitionTask): ...@@ -1043,7 +1067,7 @@ class DropFixedTableTask(StateTransitionTask):
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
AnyState.STATE_DB_ONLY # meaning doesn't affect state StateDbOnly() # meaning doesn't affect state
] ]
@classmethod @classmethod
...@@ -1059,7 +1083,7 @@ class AddFixedDataTask(StateTransitionTask): ...@@ -1059,7 +1083,7 @@ class AddFixedDataTask(StateTransitionTask):
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
AnyState.STATE_HAS_DATA StateHasData()
] ]
@classmethod @classmethod
...@@ -1164,6 +1188,17 @@ class Dice(): ...@@ -1164,6 +1188,17 @@ class Dice():
# task = self.pickTask() # task = self.pickTask()
# task.execute(workerThread) # task.execute(workerThread)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
msg = record.msg
# print("type = {}, value={}".format(type(msg), msg))
# sys.exit()
if msg.startswith("[TRD]"):
return False
return True
def main(): def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
...@@ -1192,6 +1227,7 @@ def main(): ...@@ -1192,6 +1227,7 @@ def main():
global logger global logger
logger = logging.getLogger('CrashGen') logger = logging.getLogger('CrashGen')
logger.addFilter(LoggingFilter())
if ( gConfig.debug ): if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO logger.setLevel(logging.DEBUG) # default seems to be INFO
ch = logging.StreamHandler() ch = logging.StreamHandler()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册