diff --git a/.gitignore b/.gitignore index 806663987a1b99fd9e10a58c06f57c7425dc0920..9772284ef1c29fe3005195ec394afef3aa8992e8 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ debs/ rpms/ mac/ *.pyc +.mypy_cache *.tmp *.swp src/connector/nodejs/node_modules/ diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index ccc90708d329dcd8a02107b57219d3a29a41da47..431de7c56ab6bd607055137f50cc22ab34be450b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -61,6 +61,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn = DbConn() + def logDebug(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + def logInfo(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -172,6 +179,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step logger.info("<-- Step {} finished".format(self._curStep)) @@ -221,7 +229,11 @@ class ThreadCoordinator: dbState = self.getDbState() tasks = dbState.getTasksAtState() i = Dice.throw(len(tasks)) - return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + return tasks[i].clone() + + def resetExecutedTasks(self): + self._executedTasks = [] # should be under single thread def saveExecutedTask(self, task): with self._lock: @@ -512,6 +524,7 @@ class DbState(): if not isinstance(task, cls): continue if task.isSuccess(): + task.logDebug("Task success found") sCnt += 1 if ( sCnt >= 2 ): raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) @@ -551,43 +564,70 @@ class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep + def getCurStep(self): + return self._curStep + def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread task.execute(wt) - def logInfo(self, msg): - logger.info(" T[{}.x]: ".format(self._curStep) + msg) + # def logInfo(self, msg): + # logger.info(" T[{}.x]: ".format(self._curStep) + msg) - def logDebug(self, msg): - logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + # def logDebug(self, msg): + # logger.debug(" T[{}.x]: ".format(self._curStep) + msg) class Task(): + taskSn = 100 + + @classmethod + def allocTaskNum(cls): + cls.taskSn += 1 + return cls.taskSn + def __init__(self, dbState: DbState): self._dbState = dbState + self._workerThread = None self._err = None + self._curStep = None + + # Assign an incremental task serial number + self._taskNum = self.allocTaskNum() def isSuccess(self): return self._err == None + def clone(self): + newTask = self.__class__(self._dbState) + return newTask + + def logDebug(self, msg): + self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + + def logInfo(self, msg): + self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) def execute(self, wt: WorkerThread): wt.verifyThreadSelf() + self._workerThread = wt # type: ignore te = wt.getTaskExecutor() - te.logDebug("[-] executing task {}...".format(self.__class__.__name__)) + self._curStep = te.getCurStep() + self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._err = None try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - te.logDebug("[=]Taos Execution exception: {0}".format(err)) + self.logDebug("[=]Taos Execution exception: {0}".format(err)) self._err = err except: - te.logDebug("[=]Unexpected exception") + self.logDebug("[=]Unexpected exception") raise - te.logDebug("[X] task execution completed, status: {}".format(self.isSuccess())) + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) def execSql(self, sql): return self._dbState.execute(sql) @@ -603,9 +643,9 @@ class DropDbTask(Task): class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tIndex = self._dbState.addTable() - te.logDebug("Creating a table {} ...".format(tIndex)) + self.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - te.logDebug("Table {} created.".format(tIndex)) + self.logDebug("Table {} created.".format(tIndex)) self._dbState.releaseTable(tIndex) class CreateFixedTableTask(Task): @@ -617,9 +657,9 @@ class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - te.logInfo("Cannot generate a table to delete, skipping...") + self.logInfo("Cannot generate a table to delete, skipping...") return - te.logInfo("Dropping a table db.{} ...".format(tableName)) + self.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) class DropFixedTableTask(Task): @@ -630,16 +670,16 @@ class DropFixedTableTask(Task): class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState - te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): - te.logInfo("No table found to add data, skipping...") + self.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - te.logDebug("Executing SQL: {}".format(sql)) + self.logDebug("Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - te.logDebug("Finished adding data") + self.logDebug("Finished adding data") class AddFixedDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):