From 5d14a745283226b00da32558131349524fb652c4 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 4 May 2020 22:23:27 -0700 Subject: [PATCH] Refactored to use ThreadCoordinator and TaskExecutor, added command line parameters --- tests/pytest/crash_gen.py | 328 +++++++++++++++++++++----------------- 1 file changed, 178 insertions(+), 150 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index ee4a68eee5..a5baf8577b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.7 ################################################################### # Copyright (c) 2016 by TAOS Technologies, Inc. # All rights reserved. @@ -11,7 +11,13 @@ ################################################################### # -*- coding: utf-8 -*- +from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel + import sys +# Require Python 3 +if sys.version_info[0] < 3: + raise Exception("Must be using Python 3") + import getopt import argparse @@ -25,78 +31,68 @@ from util.dnodes import * from util.cases import * from util.sql import * +import crash_gen import taos +# Global variables, tried to keep a small number. +gConfig = None # Command-line/Environment Configurations, will set a bit later +logger = None -# Command-line/Environment Configurations -gConfig = None # will set a bit later - -def runThread(workerThread): - workerThread.run() - -# Used by one process to block till another is ready -# class Baton: -# def __init__(self): -# self._lock = threading.Lock() # control access to object -# self._baton = threading.Condition() # let thread block -# self._hasGiver = False -# self._hasTaker = False - -# def give(self): -# with self._lock: -# if ( self._hasGiver ): # already? -# raise RuntimeError("Cannot double-give a baton") -# self._hasGiver = True - -# self._settle() # may block, OUTSIDE self lock - -# def take(self): -# with self._lock: -# if ( self._hasTaker): -# raise RuntimeError("Cannot double-take a baton") -# self._hasTaker = True - -# self._settle() - -# def _settle(self): - - +def runThread(wt: WorkerThread): + wt.run() class WorkerThread: - def __init__(self, pool, tid, dbState): # note: main thread context! - self._curStep = -1 + def __init__(self, pool: SteppingThreadPool, tid, dbState, + tc: ThreadCoordinator, + # te: TaskExecutor, + ): # note: main thread context! + # self._curStep = -1 self._pool = pool self._tid = tid self._dbState = dbState + self._tc = tc # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() # Let us have a DB connection of our own - if ( gConfig.per_thread_db_connection ): - self._dbConn = DbConn() + if ( gConfig.per_thread_db_connection ): # type: ignore + self._dbConn = DbConn() + + def getTaskExecutor(self): + return self._tc.getTaskExecutor() def start(self): self._thread.start() # AFTER the thread is recorded - def run(self): + def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False logger.info("Starting to run thread: {}".format(self._tid)) - if ( gConfig.per_thread_db_connection ): + if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn.open() - # self._dbConn.resetDb() - - while self._curStep < self._pool.maxSteps: - # stepNo = self.pool.waitForStep() # Step to run - self.crossStepGate() # self.curStep will get incremented - self.doWork() + self._doTaskLoop() + # clean up - if ( gConfig.per_thread_db_connection ): + if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn.close() + def _doTaskLoop(self) : + # while self._curStep < self._pool.maxSteps: + # tc = ThreadCoordinator(None) + while True: + self._tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + logger.debug("Thread task loop exited barrier...") + self.crossStepGate() # then per-thread gate, after being tapped + logger.debug("Thread task loop exited step gate...") + if not self._tc.isRunning(): + break + + task = self._tc.fetchTask() + task.execute(self) + def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): raise RuntimeError("Unexpectly called from other threads") @@ -109,53 +105,25 @@ class WorkerThread: if ( not self._thread.is_alive() ): raise RuntimeError("Unexpected dead thread") - # def verifyIsSleeping(self, isSleeping): - # if ( isSleeping != self.isSleeping ): - # raise RuntimeError("Unexpected thread sleep status") - # A gate is different from a barrier in that a thread needs to be "tapped" def crossStepGate(self): self.verifyThreadAlive() self.verifyThreadSelf() # only allowed by ourselves - # self.verifyIsSleeping(False) # has to be awake - - # logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) - # self.isSleeping = True # TODO: maybe too early? - self._pool.crossPoolBarrier() # wait for all other threads # Wait again at the "gate", waiting to be "tapped" # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() - # logger.debug("Worker thread {} woke up".format(self._tid)) - # Someone will wake us up here - self._curStep += 1 # off to a new step... + # self._curStep += 1 # off to a new step... def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - # self.verifyIsSleeping(True) # has to be sleeping - + logger.debug("Tapping worker thread {}".format(self._tid)) - # self.stepGate.acquire() - # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self._stepGate.set() # wake up! - # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) - # self.isSleeping = False # No race condition for sure - # self.stepGate.release() # this finishes before .wait() can return - # logger.debug("Tapping worker thread {}, lock released".format(self.tid)) - time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release - - def doWork(self): - self.logInfo("Thread starting an execution") - self._pool.dispatcher.doWork(self) - - def logInfo(self, msg): - logger.info(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) - - def logDebug(self, msg): - logger.debug(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) + self._stepGate.set() # wake up! + time.sleep(0) # let the released thread run a bit def execSql(self, sql): if ( gConfig.per_thread_db_connection ): @@ -163,64 +131,61 @@ class WorkerThread: else: return self._dbState.getDbConn().execSql(sql) +class ThreadCoordinator: + def __init__(self, pool, wd: WorkDispatcher): + self._curStep = -1 # first step is 0 + self._pool = pool + self._wd = wd + self._te = None # prepare for every new step -# We define a class to run a number of threads in locking steps. -class SteppingThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): - self.numThreads = numThreads - self.maxSteps = maxSteps - self.funcSequencer = funcSequencer - # Internal class variables - self.dispatcher = WorkDispatcher(self, dbState) - self.curStep = 0 - self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 - - # Thread coordination - self._lock = threading.RLock() # lock to control access (e.g. even reading it is dangerous) - self._poolBarrier = threading.Barrier(numThreads + 1) # do nothing before crossing this, except main thread + self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads - # starting to run all the threads, in locking steps - def run(self): - for tid in range(0, self.numThreads): # Create the threads - workerThread = WorkerThread(self, tid, dbState) - self.threadList.append(workerThread) - workerThread.start() # start, but should block immediately before step 0 + def getTaskExecutor(self): + return self._te + + def crossStepBarrier(self): + self._stepBarrier.wait() + + def run(self, dbState): + self._pool.createAndStartThreads(dbState, self) # Coordinate all threads step by step - self.curStep = -1 # not started yet - while(self.curStep < self.maxSteps): + self._curStep = -1 # not started yet + maxSteps = gConfig.max_steps # type: ignore + while(self._curStep < maxSteps): print(".", end="", flush=True) logger.debug("Main thread going to sleep") # Now ready to enter a step - self.crossPoolBarrier() # let other threads go past the pool barrier, but wait at the thread gate - self._poolBarrier.reset() # Other worker threads should now be at the "gate" + self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" + logger.info("<-- Step {} finished".format(self._curStep)) + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep - # Rare chance, when all threads should be blocked at the "step gate" for each thread - logger.info("<-- Step {} finished".format(self.curStep)) - self.curStep += 1 # we are about to get into next step. TODO: race condition here! - logger.debug(" ") # line break - logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep + self._te = TaskExecutor(self._curStep) - logger.debug("Main thread waking up at step {}, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep + logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() - # The threads will run through many steps - for workerThread in self.threadList: - workerThread._thread.join() # slight hack, accessing members + logger.debug("Main thread ready to finish up...") + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") + self._pool.joinAll() # Get all threads to finish logger.info("All threads finished") - print("") - print("Finished") - - def crossPoolBarrier(self): - self._poolBarrier.wait() + print("\r\nFinished") def tapAllThreads(self): # in a deterministic manner wakeSeq = [] - for i in range(self.numThreads): # generate a random sequence + for i in range(self._pool.numThreads): # generate a random sequence if Dice.throw(2) == 1 : wakeSeq.append(i) else: @@ -228,9 +193,43 @@ class SteppingThreadPool: logger.info("Waking up threads: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: - self.threadList[i].tapStepGate() + self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! time.sleep(0) # yield + def isRunning(self): + return self._te != None + + def fetchTask(self) -> Task : + if ( not self.isRunning() ): # no task + raise RuntimeError("Cannot fetch task when not running") + return self._wd.pickTask() + +# We define a class to run a number of threads in locking steps. +class SteppingThreadPool: + def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + self.numThreads = numThreads + self.maxSteps = maxSteps + self.funcSequencer = funcSequencer + # Internal class variables + self.dispatcher = WorkDispatcher(dbState) + self.curStep = 0 + self.threadList = [] + # self.stepGate = threading.Condition() # Gate to hold/sync all threads + # self.numWaitingThreads = 0 + + + # starting to run all the threads, in locking steps + def createAndStartThreads(self, dbState, tc: ThreadCoordinator): + for tid in range(0, self.numThreads): # Create the threads + workerThread = WorkerThread(self, tid, dbState, tc) + self.threadList.append(workerThread) + workerThread.start() # start, but should block immediately before step 0 + + def joinAll(self): + for workerThread in self.threadList: + logger.debug("Joining thread...") + workerThread._thread.join() + # A queue of continguous POSITIVE integers class LinearQueue(): def __init__(self): @@ -404,51 +403,67 @@ class DbState(): def cleanUp(self): self._dbConn.close() -# A task is a long-living entity, carrying out short-lived "executions" for threads +class TaskExecutor(): + def __init__(self, curStep): + self._curStep = curStep + + def execute(self, task, wt: WorkerThread): # execute a task on a thread + task.execute(self, wt) + + def logInfo(self, msg): + logger.info(" T[{}.x]: ".format(self._curStep) + msg) + + def logDebug(self, msg): + logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + + class Task(): def __init__(self, dbState): self.dbState = dbState - def _executeInternal(self, wt): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes") - def execute(self, workerThread): - self._executeInternal(workerThread) # TODO: no return value? - workerThread.logDebug("[X] task execution completed") + def execute(self, wt: WorkerThread): + wt.verifyThreadSelf() + + te = wt.getTaskExecutor() + self._executeInternal(te, wt) # TODO: no return value? + te.logDebug("[X] task execution completed") def execSql(self, sql): return self.dbState.execute(sql) class CreateTableTask(Task): - def _executeInternal(self, wt): - tIndex = dbState.addTable() - wt.logDebug("Creating a table {} ...".format(tIndex)) + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tIndex = self.dbState.addTable() + te.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - wt.logDebug("Table {} created.".format(tIndex)) - dbState.releaseTable(tIndex) + te.logDebug("Table {} created.".format(tIndex)) + self.dbState.releaseTable(tIndex) class DropTableTask(Task): - def _executeInternal(self, wt): - tableName = dbState.getTableNameToDelete() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tableName = self.dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - wt.logInfo("Cannot generate a table to delete, skipping...") + te.logInfo("Cannot generate a table to delete, skipping...") return - wt.logInfo("Dropping a table db.{} ...".format(tableName)) + te.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) class AddDataTask(Task): - def _executeInternal(self, wt): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self.dbState - wt.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): - wt.logInfo("No table found to add data, skipping...") + te.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - wt.logDebug("Executing SQL: {}".format(sql)) + te.logDebug("Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - wt.logDebug("Finished adding data") + te.logDebug("Finished adding data") # Deterministic random number generator class Dice(): @@ -484,8 +499,7 @@ class Dice(): # Anyone needing to carry out work should simply come here class WorkDispatcher(): - def __init__(self, pool, dbState): - self.pool = pool + def __init__(self, dbState): # self.totalNumMethods = 2 self.tasks = [ CreateTableTask(dbState), @@ -499,31 +513,45 @@ class WorkDispatcher(): # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) return dRes - def doWork(self, workerThread): + def pickTask(self): dice = self.throwDice() - task = self.tasks[dice] + return self.tasks[dice] + + def doWork(self, workerThread): + task = self.pickTask() task.execute(workerThread) -if __name__ == "__main__": +def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator') parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + help='Maximum number of steps to run (default: 100)') + parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + help='Number of threads to run (default: 10)') + global gConfig gConfig = parser.parse_args() + global logger logger = logging.getLogger('myApp') if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler() logger.addHandler(ch) - Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 5, 500, 0) - threadPool.run() - logger.info("Finished running thread pool") + Dice.seed(0) # initial seeding of dice + tc = ThreadCoordinator( + SteppingThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + WorkDispatcher(dbState) + ) + tc.run(dbState) dbState.cleanUp() - + logger.info("Finished running thread pool") + +if __name__ == "__main__": + main() -- GitLab