diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 5125d28ec02564abc24ccf8190d805863dedabf5..c525a59d7e1af8f4f02b185aa0f903e828c21d60 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -13,6 +13,7 @@ # -*- coding: utf-8 -*- import sys import getopt +import argparse import threading import random @@ -26,11 +27,11 @@ from util.sql import * import taos -# Constants -LOGGING_LEVEL = logging.DEBUG -def runThread(workerThread): - logger.info("Running Thread: {}".format(workerThread.tid)) +# Command-line/Environment Configurations +gConfig = None # will set a bit later + +def runThread(workerThread): workerThread.run() # Used by one process to block till another is ready @@ -63,36 +64,42 @@ def runThread(workerThread): class WorkerThread: def __init__(self, pool, tid, dbState): # note: main thread context! - self.curStep = -1 - self.pool = pool - self.tid = tid - self.dbState = dbState + self._curStep = -1 + self._pool = pool + self._tid = tid + self._dbState = dbState # self.threadIdent = threading.get_ident() - self.thread = threading.Thread(target=runThread, args=(self,)) - self.stepGate = threading.Event() + self._thread = threading.Thread(target=runThread, args=(self,)) + self._stepGate = threading.Event() # Let us have a DB connection of our own - self._dbConn = DbConn() + if ( gConfig.per_thread_db_connection ): + self._dbConn = DbConn() def start(self): - self.thread.start() # AFTER the thread is recorded + self._thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False - self._dbConn.open() + logger.info("Starting to run thread: {}".format(self._tid)) + + if ( gConfig.per_thread_db_connection ): + self._dbConn.open() + # self._dbConn.resetDb() - while self.curStep < self.pool.maxSteps: + while self._curStep < self._pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run self.crossStepGate() # self.curStep will get incremented self.doWork() # clean up - self._dbConn.close() + if ( gConfig.per_thread_db_connection ): + self._dbConn.close() def verifyThreadSelf(self): # ensure we are called by this own thread - if ( threading.get_ident() != self.thread.ident ): + if ( threading.get_ident() != self._thread.ident ): raise RuntimeError("Unexpectly called from other threads") def verifyThreadMain(self): # ensure we are called by the main thread @@ -100,7 +107,7 @@ class WorkerThread: raise RuntimeError("Unexpectly called from other threads") def verifyThreadAlive(self): - if ( not self.thread.is_alive() ): + if ( not self._thread.is_alive() ): raise RuntimeError("Unexpected dead thread") # def verifyIsSleeping(self, isSleeping): @@ -113,30 +120,30 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # self.verifyIsSleeping(False) # has to be awake - logger.debug("Worker thread {} about to cross pool barrier".format(self.tid)) + logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) # self.isSleeping = True # TODO: maybe too early? - self.pool.crossPoolBarrier() # wait for all other threads + self._pool.crossPoolBarrier() # wait for all other threads # Wait again at the "gate", waiting to be "tapped" - logger.debug("Worker thread {} about to cross the step gate".format(self.tid)) + logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) # self.stepGate.acquire() # acquire lock immediately - self.stepGate.wait() - self.stepGate.clear() + self._stepGate.wait() + self._stepGate.clear() # self.stepGate.release() # release - logger.debug("Worker thread {} woke up".format(self.tid)) + logger.debug("Worker thread {} woke up".format(self._tid)) # Someone will wake us up here - self.curStep += 1 # off to a new step... + self._curStep += 1 # off to a new step... def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread # self.verifyIsSleeping(True) # has to be sleeping - logger.debug("Tapping worker thread {}".format(self.tid)) + logger.debug("Tapping worker thread {}".format(self._tid)) # self.stepGate.acquire() # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self.stepGate.set() # wake up! + self._stepGate.set() # wake up! # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) # self.isSleeping = False # No race condition for sure # self.stepGate.release() # this finishes before .wait() can return @@ -144,12 +151,15 @@ class WorkerThread: time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release def doWork(self): - logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) - self.pool.dispatcher.doWork(self) + logger.info(" Step {}, thread {}: ".format(self._curStep, self._tid)) + self._pool.dispatcher.doWork(self) def execSql(self, sql): - return self.dbState.execSql(sql) - + if ( gConfig.per_thread_db_connection ): + return self._dbConn.execSql(sql) + else: + return self._dbState.getDbConn().execSql(sql) + # We define a class to run a number of threads in locking steps. class SteppingThreadPool: @@ -188,7 +198,7 @@ class SteppingThreadPool: # The threads will run through many steps for workerThread in self.threadList: - workerThread.thread.join() # slight hack, accessing members + workerThread._thread.join() # slight hack, accessing members logger.info("All threads finished") @@ -240,10 +250,15 @@ class LinearQueue(): def pop(self): with self._lock: if ( self.isEmpty() ): - raise RuntimeError("Cannot pop an empty queue") + # raise RuntimeError("Cannot pop an empty queue") + return False # TODO: None? + index = self.firstIndex if ( index in self.inUse ): - self.inUse.remove(index) # TODO: what about discard? + return False + + # if ( index in self.inUse ): + # self.inUse.remove(index) # TODO: what about discard? self.firstIndex += 1 return index @@ -259,13 +274,15 @@ class LinearQueue(): def allocate(self, i): with self._lock: + # logger.debug("LQ allocating item {}".format(i)) if ( i in self.inUse ): raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) self.inUse.add(i) def release(self, i): with self._lock: - self.inUse.remove(i) # KeyError possible + # logger.debug("LQ releasing item {}".format(i)) + self.inUse.remove(i) # KeyError possible, TODO: why? def size(self): return self.lastIndex + 1 - self.firstIndex @@ -287,6 +304,8 @@ class LinearQueue(): class DbConn: def __init__(self): + self._conn = None + self._cursor = None self.isOpen = False def open(self): # Open connection @@ -294,16 +313,27 @@ class DbConn: raise RuntimeError("Cannot re-open an existing DB connection") cfgPath = "../../build/test/cfg" - conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + # Get the connection/cursor ready + self._cursor.execute('reset query cache') + # self._cursor.execute('use db') + + # Open connection self._tdSql = TDSql() - self._tdSql.init(conn.cursor()) + self._tdSql.init(self._cursor) self.isOpen = True def resetDb(self): # reset the whole database, etc. if ( not self.isOpen ): raise RuntimeError("Cannot reset database until connection is open") - self._tdSql.prepare() # Recreate database, etc. + # self._tdSql.prepare() # Recreate database, etc. + + self._cursor.execute('drop database if exists db') + self._cursor.execute('create database db') + # self._cursor.execute('use db') + # tdSql.execute('show databases') def close(self): @@ -312,7 +342,9 @@ class DbConn: self._tdSql.close() self.isOpen = False - def execSql(self, sql): + def execSql(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") return self._tdSql.execute(sql) # State of the database as we believe it to be @@ -328,6 +360,9 @@ class DbState(): self._dbConn.open() self._dbConn.resetDb() # drop and recreate DB + def getDbConn(self): + return self._dbConn + def pickAndAllocateTable(self): # pick any table, and "use" it return self.tableNumQueue.pickAndAllocate() @@ -350,9 +385,10 @@ class DbState(): return self._lastInt def getTableNameToDelete(self): - if self.tableNumQueue.isEmpty(): - return False tblNum = self.tableNumQueue.pop() # TODO: race condition! + if ( not tblNum ): # maybe false + return False + return "table_{}".format(tblNum) def execSql(self, sql): # using the main DB connection @@ -375,7 +411,7 @@ class CreateTableTask(Task): def execute(self, wt): tIndex = dbState.addTable() logger.debug(" Creating a table {} ...".format(tIndex)) - wt.execSql("create table table_{} (ts timestamp, speed int)".format(tIndex)) + wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) logger.debug(" Table {} created.".format(tIndex)) dbState.releaseTable(tIndex) @@ -385,8 +421,8 @@ class DropTableTask(Task): if ( not tableName ): # May be "False" logger.info(" Cannot generate a table to delete, skipping...") return - logger.info(" Dropping a table {} ...".format(tableName)) - wt.execSql("drop table {}".format(tableName)) + logger.info(" Dropping a table db.{} ...".format(tableName)) + wt.execSql("drop table db.{}".format(tableName)) class AddDataTask(Task): def execute(self, wt): @@ -396,7 +432,7 @@ class AddDataTask(Task): if ( tIndex == None ): logger.info(" No table found to add data, skipping...") return - sql = "insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) + sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) logger.debug(" Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) @@ -441,7 +477,7 @@ class WorkDispatcher(): # self.totalNumMethods = 2 self.tasks = [ CreateTableTask(dbState), - # DropTableTask(dbState), + DropTableTask(dbState), AddDataTask(dbState), ] @@ -457,14 +493,24 @@ class WorkDispatcher(): task.execute(workerThread) if __name__ == "__main__": + # Super cool Python argument library: https://docs.python.org/3/library/argparse.html + parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator') + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') + parser.add_argument('-d', '--debug', action='store_true', + help='Turn on DEBUG mode for more logging (default: false)') + + gConfig = parser.parse_args() + logger = logging.getLogger('myApp') - logger.setLevel(LOGGING_LEVEL) + if ( gConfig.debug ): + logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler() logger.addHandler(ch) Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 3, 5, 0) + threadPool = SteppingThreadPool(dbState, 5, 10, 0) threadPool.run() logger.info("Finished running thread pool") dbState.cleanUp() diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index 4db751f9977d54055ac4416494a9b6382726474b..c845b3976473bedc0fef5684c29b06f866df0840 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -9,6 +9,7 @@ # 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg # 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg # 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above +# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils # # RUNNING THIS SCRIPT # @@ -36,5 +37,5 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 # Then let us set up the library path so that our compiled SO file can be loaded by Python export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib -# Now we are all let, and let's see if we can find a crash. -./crash_gen.py +# Now we are all let, and let's see if we can find a crash. Note we pass all params +./crash_gen.py $@