提交 6623cb63 编写于 作者: S Steven Li

Discovered TD-255 crash with params: -p -s 50 -t 10

上级 55a418ca
...@@ -11,6 +11,7 @@ debs/ ...@@ -11,6 +11,7 @@ debs/
rpms/ rpms/
mac/ mac/
*.pyc *.pyc
.mypy_cache
*.tmp *.tmp
*.swp *.swp
src/connector/nodejs/node_modules/ src/connector/nodejs/node_modules/
......
...@@ -61,6 +61,13 @@ class WorkerThread: ...@@ -61,6 +61,13 @@ class WorkerThread:
if ( gConfig.per_thread_db_connection ): # type: ignore if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn = DbConn() self._dbConn = DbConn()
def logDebug(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
def logInfo(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
def getTaskExecutor(self): def getTaskExecutor(self):
return self._tc.getTaskExecutor() return self._tc.getTaskExecutor()
...@@ -172,6 +179,7 @@ class ThreadCoordinator: ...@@ -172,6 +179,7 @@ class ThreadCoordinator:
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step # Get ready for next step
logger.info("<-- Step {} finished".format(self._curStep)) logger.info("<-- Step {} finished".format(self._curStep))
...@@ -221,7 +229,11 @@ class ThreadCoordinator: ...@@ -221,7 +229,11 @@ class ThreadCoordinator:
dbState = self.getDbState() dbState = self.getDbState()
tasks = dbState.getTasksAtState() tasks = dbState.getTasksAtState()
i = Dice.throw(len(tasks)) i = Dice.throw(len(tasks))
return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
return tasks[i].clone()
def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread
def saveExecutedTask(self, task): def saveExecutedTask(self, task):
with self._lock: with self._lock:
...@@ -512,6 +524,7 @@ class DbState(): ...@@ -512,6 +524,7 @@ class DbState():
if not isinstance(task, cls): if not isinstance(task, cls):
continue continue
if task.isSuccess(): if task.isSuccess():
task.logDebug("Task success found")
sCnt += 1 sCnt += 1
if ( sCnt >= 2 ): if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
...@@ -551,43 +564,70 @@ class TaskExecutor(): ...@@ -551,43 +564,70 @@ class TaskExecutor():
def __init__(self, curStep): def __init__(self, curStep):
self._curStep = curStep self._curStep = curStep
def getCurStep(self):
return self._curStep
def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
task.execute(wt) task.execute(wt)
def logInfo(self, msg): # def logInfo(self, msg):
logger.info(" T[{}.x]: ".format(self._curStep) + msg) # logger.info(" T[{}.x]: ".format(self._curStep) + msg)
def logDebug(self, msg): # def logDebug(self, msg):
logger.debug(" T[{}.x]: ".format(self._curStep) + msg) # logger.debug(" T[{}.x]: ".format(self._curStep) + msg)
class Task(): class Task():
taskSn = 100
@classmethod
def allocTaskNum(cls):
cls.taskSn += 1
return cls.taskSn
def __init__(self, dbState: DbState): def __init__(self, dbState: DbState):
self._dbState = dbState self._dbState = dbState
self._workerThread = None
self._err = None self._err = None
self._curStep = None
# Assign an incremental task serial number
self._taskNum = self.allocTaskNum()
def isSuccess(self): def isSuccess(self):
return self._err == None return self._err == None
def clone(self):
newTask = self.__class__(self._dbState)
return newTask
def logDebug(self, msg):
self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def logInfo(self, msg):
self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
wt.verifyThreadSelf() wt.verifyThreadSelf()
self._workerThread = wt # type: ignore
te = wt.getTaskExecutor() te = wt.getTaskExecutor()
te.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._curStep = te.getCurStep()
self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
self._err = None self._err = None
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
te.logDebug("[=]Taos Execution exception: {0}".format(err)) self.logDebug("[=]Taos Execution exception: {0}".format(err))
self._err = err self._err = err
except: except:
te.logDebug("[=]Unexpected exception") self.logDebug("[=]Unexpected exception")
raise raise
te.logDebug("[X] task execution completed, status: {}".format(self.isSuccess())) self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
def execSql(self, sql): def execSql(self, sql):
return self._dbState.execute(sql) return self._dbState.execute(sql)
...@@ -603,9 +643,9 @@ class DropDbTask(Task): ...@@ -603,9 +643,9 @@ class DropDbTask(Task):
class CreateTableTask(Task): class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tIndex = self._dbState.addTable() tIndex = self._dbState.addTable()
te.logDebug("Creating a table {} ...".format(tIndex)) self.logDebug("Creating a table {} ...".format(tIndex))
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
te.logDebug("Table {} created.".format(tIndex)) self.logDebug("Table {} created.".format(tIndex))
self._dbState.releaseTable(tIndex) self._dbState.releaseTable(tIndex)
class CreateFixedTableTask(Task): class CreateFixedTableTask(Task):
...@@ -617,9 +657,9 @@ class DropTableTask(Task): ...@@ -617,9 +657,9 @@ class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete() tableName = self._dbState.getTableNameToDelete()
if ( not tableName ): # May be "False" if ( not tableName ): # May be "False"
te.logInfo("Cannot generate a table to delete, skipping...") self.logInfo("Cannot generate a table to delete, skipping...")
return return
te.logInfo("Dropping a table db.{} ...".format(tableName)) self.logInfo("Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table db.{}".format(tableName)) wt.execSql("drop table db.{}".format(tableName))
class DropFixedTableTask(Task): class DropFixedTableTask(Task):
...@@ -630,16 +670,16 @@ class DropFixedTableTask(Task): ...@@ -630,16 +670,16 @@ class DropFixedTableTask(Task):
class AddDataTask(Task): class AddDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState ds = self._dbState
te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
tIndex = ds.pickAndAllocateTable() tIndex = ds.pickAndAllocateTable()
if ( tIndex == None ): if ( tIndex == None ):
te.logInfo("No table found to add data, skipping...") self.logInfo("No table found to add data, skipping...")
return return
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
te.logDebug("Executing SQL: {}".format(sql)) self.logDebug("Executing SQL: {}".format(sql))
wt.execSql(sql) wt.execSql(sql)
ds.releaseTable(tIndex) ds.releaseTable(tIndex)
te.logDebug("Finished adding data") self.logDebug("Finished adding data")
class AddFixedDataTask(Task): class AddFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册