From 73bdaae15ca2459241dee16dc9ce31c84ad437d7 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 27 Apr 2020 01:52:23 -0700 Subject: [PATCH] Now using Python barriers and events to sync threads --- tests/pytest/random_walk.py | 236 ++++++++++++++++++++++++++---------- 1 file changed, 171 insertions(+), 65 deletions(-) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index 5866351a99..146066431d 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -17,6 +17,7 @@ import getopt import threading import random import logging +import datetime from util.log import * from util.dnodes import * @@ -32,6 +33,34 @@ def runThread(workerThread): logger.info("Running Thread: {}".format(workerThread.tid)) workerThread.run() +# Used by one process to block till another is ready +# class Baton: +# def __init__(self): +# self._lock = threading.Lock() # control access to object +# self._baton = threading.Condition() # let thread block +# self._hasGiver = False +# self._hasTaker = False + +# def give(self): +# with self._lock: +# if ( self._hasGiver ): # already? +# raise RuntimeError("Cannot double-give a baton") +# self._hasGiver = True + +# self._settle() # may block, OUTSIDE self lock + +# def take(self): +# with self._lock: +# if ( self._hasTaker): +# raise RuntimeError("Cannot double-take a baton") +# self._hasTaker = True + +# self._settle() + +# def _settle(self): + + + class WorkerThread: def __init__(self, pool, tid): # note: main thread context! self.curStep = -1 @@ -39,14 +68,14 @@ class WorkerThread: self.tid = tid # self.threadIdent = threading.get_ident() self.thread = threading.Thread(target=runThread, args=(self,)) - self.stepGate = threading.Condition() + self.stepGate = threading.Event() def start(self): self.thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context - self.isSleeping = False + # self.isSleeping = False while self.curStep < self.pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run @@ -65,23 +94,26 @@ class WorkerThread: if ( not self.thread.is_alive() ): raise RuntimeError("Unexpected dead thread") - def verifyIsSleeping(self, isSleeping): - if ( isSleeping != self.isSleeping ): - raise RuntimeError("Unexpected thread sleep status") + # def verifyIsSleeping(self, isSleeping): + # if ( isSleeping != self.isSleeping ): + # raise RuntimeError("Unexpected thread sleep status") + # A gate is different from a barrier in that a thread needs to be "tapped" def crossStepGate(self): self.verifyThreadAlive() self.verifyThreadSelf() # only allowed by ourselves - self.verifyIsSleeping(False) # has to be awake + # self.verifyIsSleeping(False) # has to be awake - logger.debug("Worker thread {} going to sleep".format(self.tid)) - self.isSleeping = True # TODO: maybe too early? - self.pool.reportThreadWaiting() # TODO: this triggers the main thread, TOO early + logger.debug("Worker thread {} about to cross pool barrier".format(self.tid)) + # self.isSleeping = True # TODO: maybe too early? + self.pool.crossPoolBarrier() # wait for all other threads - # Actually going to sleep - self.stepGate.acquire() # acquire lock immediately - self.stepGate.wait() # release and then acquire - self.stepGate.release() # release + # Wait again at the "gate", waiting to be "tapped" + logger.debug("Worker thread {} about to cross the step gate".format(self.tid)) + # self.stepGate.acquire() # acquire lock immediately + self.stepGate.wait() + self.stepGate.clear() + # self.stepGate.release() # release logger.debug("Worker thread {} woke up".format(self.tid)) # Someone will wake us up here @@ -90,15 +122,15 @@ class WorkerThread: def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - self.verifyIsSleeping(True) # has to be sleeping + # self.verifyIsSleeping(True) # has to be sleeping logger.debug("Tapping worker thread {}".format(self.tid)) - self.stepGate.acquire() + # self.stepGate.acquire() # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self.stepGate.notify() # wake up! + self.stepGate.set() # wake up! # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) - self.isSleeping = False # No race condition for sure - self.stepGate.release() # this finishes before .wait() can return + # self.isSleeping = False # No race condition for sure + # self.stepGate.release() # this finishes before .wait() can return # logger.debug("Tapping worker thread {}, lock released".format(self.tid)) time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release @@ -109,20 +141,21 @@ class WorkerThread: # We define a class to run a number of threads in locking steps. class SteppingThreadPool: - def __init__(self, numThreads, maxSteps, funcSequencer): + def __init__(self, dbState, numThreads, maxSteps, funcSequencer): self.numThreads = numThreads self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(self) + self.dispatcher = WorkDispatcher(self, dbState) self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads - self.numWaitingThreads = 0 - + # self.numWaitingThreads = 0 + # Thread coordination - self.lock = threading.Lock() # for critical section execution - self.mainGate = threading.Condition() + self.barrier = threading.Barrier(numThreads + 1) # plus main thread + # self.lock = threading.Lock() # for critical section execution + # self.mainGate = threading.Condition() # starting to run all the threads, in locking steps def run(self): @@ -136,13 +169,15 @@ class SteppingThreadPool: self.curStep = -1 # not started yet while(self.curStep < self.maxSteps): logger.debug("Main thread going to sleep") - self.mainGate.acquire() - self.mainGate.wait() # start snoozing - self.mainGate.release - logger.debug("Main thread woke up") # Now not all threads had time to go to sleep - time.sleep(0.01) # This is like forever + # self.mainGate.acquire() + # self.mainGate.wait() # start snoozing + # self.mainGate.release + self.crossPoolBarrier() + self.barrier.reset() # Other worker threads should now be at the "gate" + + logger.debug("Main thread waking up, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep + # time.sleep(0.01) # This is like forever - self.curStep += 1 # starts with 0 self.tapAllThreads() # The threads will run through many steps @@ -151,21 +186,29 @@ class SteppingThreadPool: logger.info("All threads finished") - def reportThreadWaiting(self): - allThreadWaiting = False - with self.lock: - self.numWaitingThreads += 1 - if ( self.numWaitingThreads == self.numThreads ): - allThreadWaiting = True - - if (allThreadWaiting): # aha, pass the baton to the main thread - logger.debug("All threads are now waiting") - self.numWaitingThreads = 0 # do this 1st to avoid race condition - # time.sleep(0.001) # thread yield, so main thread can be ready - self.mainGate.acquire() - self.mainGate.notify() # main thread would now start to run - self.mainGate.release() - time.sleep(0) # yield, maybe main thread can run for just a bit + def crossPoolBarrier(self): + if ( self.barrier.n_waiting == self.numThreads ): # everyone else is waiting, inc main thread + logger.info("<-- Step {} finished".format(self.curStep)) + self.curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug(" ") # line break + logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep + + + self.barrier.wait() + # allThreadWaiting = False + # with self.lock: + # self.numWaitingThreads += 1 + # if ( self.numWaitingThreads == self.numThreads ): + # allThreadWaiting = True + + # if (allThreadWaiting): # aha, pass the baton to the main thread + # logger.debug("All threads are now waiting") + # self.numWaitingThreads = 0 # do this 1st to avoid race condition + # # time.sleep(0.001) # thread yield, so main thread can be ready + # self.mainGate.acquire() + # self.mainGate.notify() # main thread would now start to run + # self.mainGate.release() + # time.sleep(0) # yield, maybe main thread can run for just a bit # def waitForStep(self): # shouldWait = True; @@ -201,6 +244,7 @@ class SteppingThreadPool: else: wakeSeq.insert(0, i) logger.info("Waking up threads: {}".format(str(wakeSeq))) + # TODO: set dice seed to a deterministic value for i in wakeSeq: self.threadList[i].tapStepGate() time.sleep(0) # yield @@ -208,30 +252,86 @@ class SteppingThreadPool: # A queue of continguous POSITIVE integers class LinearQueue(): def __init__(self): - self.firstIndex = 1 + self.firstIndex = 1 # 1st ever element self.lastIndex = 0 + self.lock = threading.RLock() # our functions may call each other + self.inUse = set() # the indexes that are in use right now def push(self): # Push to the tail (largest) - if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty - self.lastIndex = self.firstIndex - return self.firstIndex - # Otherwise we have something - self.lastIndex += 1 - return self.lastIndex + with self.lock: + if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty + self.lastIndex = self.firstIndex + return self.firstIndex + # Otherwise we have something + self.lastIndex += 1 + return self.lastIndex def pop(self): - if ( self.firstIndex > self.lastIndex ): # empty - return 0 - index = self.firstIndex - self.firstIndex += 1 - return index + with self.lock: + if ( self.isEmpty() ): + raise RuntimeError("Cannot pop an empty queue") + index = self.firstIndex + self.firstIndex += 1 + return index + + def isEmpty(self): + return self.firstIndex > self.lastIndex + + def popIfNotEmpty(self): + with self.lock: + if (self.isEmpty()): + return 0 + return self.pop() + + def use(self, i): + with self.lock: + if ( i in self.inUse ): + raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) + self.inUse.add(i) + + def unUse(self, i): + with self.lock: + self.inUse.remove(i) # KeyError possible + + def size(self): + return self.lastIndex + 1 - self.firstIndex + + def allocate(self): + with self.lock: + cnt = 0 # counting the interations + while True: + cnt += 1 + if ( cnt > self.size()*10 ): # 10x iteration already + raise RuntimeError("Failed to allocate LinearQueue element") + ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) + if ( not ret in self.inUse ): + return self.use(ret) # State of the database as we believe it to be class DbState(): def __init__(self): self.tableNumQueue = LinearQueue() + self.tick = datetime.datetime(2019, 1, 1) # initial date time tick + self.int = 0 # initial integer self.openDbServerConnection() + self.lock = threading.RLock() + + def pickTable(self): # pick any table, and "use" it + return self.tableNumQueue.allocate() + + def getNextTick(self): + with self.lock: # prevent duplicate tick + self.tick += datetime.timedelta(0, 1) # add one second to it + return self.tick + + def getNextInt(self): + with self.lock: + self.int += 1 + return self.int + + def unuseTable(self, i): # return the table back, so others can use it + self.tableNumQueue.unUse(i) def openDbServerConnection(self): cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() @@ -250,12 +350,15 @@ class DbState(): return "table_{}".format(tblNum) def getTableNameToDelete(self): - tblNum = self.tableNumQueue.pop() - if( tblNum==0 ) : + if self.tableNumQueue.isEmpty: return False + tblNum = self.tableNumQueue.pop() # TODO: race condition! return "table_{}".format(tblNum) class Task(): + def __init__(self, dbState): + self.dbState = dbState + def execute(self): raise RuntimeError("Must be overriden by child class") @@ -277,6 +380,9 @@ class DropTableTask(Task): class AddDataTask(Task): def execute(self): logger.info(" Adding some data...") + # ds = self.dbState + # tIndex = self.dbState.pickTable() + # tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) # Deterministic random number generator class Dice(): @@ -312,13 +418,13 @@ class Dice(): # Anyone needing to carry out work should simply come here class WorkDispatcher(): - def __init__(self, pool): + def __init__(self, pool, dbState): self.pool = pool - self.totalNumMethods = 2 + # self.totalNumMethods = 2 self.tasks = [ - CreateTableTask(), - DropTableTask(), - AddDataTask(), + CreateTableTask(dbState), + DropTableTask(dbState), + # AddDataTask(dbState), ] def throwDice(self): @@ -337,7 +443,7 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(3, 5, 0) + threadPool = SteppingThreadPool(dbState, 3, 5, 0) threadPool.run() logger.info("Finished running thread pool") dbState.closeDbServerConnection() -- GitLab