提交 a79c5b12 编写于 作者: S Steven Li

Minor tweaks for crash_gen tool, re-demonstrating TD-997

上级 b115abf1
...@@ -136,7 +136,10 @@ class WorkerThread: ...@@ -136,7 +136,10 @@ class WorkerThread:
# clean up # clean up
if (gConfig.per_thread_db_connection): # type: ignore if (gConfig.per_thread_db_connection): # type: ignore
if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close() self._dbConn.close()
else:
logger.warning("Cleaning up worker thread, dbConn already closed")
def _doTaskLoop(self): def _doTaskLoop(self):
# while self._curStep < self._pool.maxSteps: # while self._curStep < self._pool.maxSteps:
...@@ -146,6 +149,7 @@ class WorkerThread: ...@@ -146,6 +149,7 @@ class WorkerThread:
try: try:
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
except threading.BrokenBarrierError as err: # main thread timed out except threading.BrokenBarrierError as err: # main thread timed out
print("_bto", end="")
logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out") logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
break break
...@@ -153,6 +157,7 @@ class WorkerThread: ...@@ -153,6 +157,7 @@ class WorkerThread:
self.crossStepGate() # then per-thread gate, after being tapped self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning(): if not self._tc.isRunning():
print("_wts", end="")
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
...@@ -169,6 +174,7 @@ class WorkerThread: ...@@ -169,6 +174,7 @@ class WorkerThread:
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died
def verifyThreadSelf(self): # ensure we are called by this own thread def verifyThreadSelf(self): # ensure we are called by this own thread
if (threading.get_ident() != self._thread.ident): if (threading.get_ident() != self._thread.ident):
...@@ -197,12 +203,15 @@ class WorkerThread: ...@@ -197,12 +203,15 @@ class WorkerThread:
# self._curStep += 1 # off to a new step... # self._curStep += 1 # off to a new step...
def tapStepGate(self): # give it a tap, release the thread waiting there def tapStepGate(self): # give it a tap, release the thread waiting there
self.verifyThreadAlive() # self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread self.verifyThreadMain() # only allowed for main thread
if self._thread.is_alive():
logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up! self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit time.sleep(0) # let the released thread run a bit
else:
print("_tad", end="") # Thread already dead
def execSql(self, sql): # TODO: expose DbConn directly def execSql(self, sql): # TODO: expose DbConn directly
return self.getDbConn().execute(sql) return self.getDbConn().execute(sql)
...@@ -332,6 +341,7 @@ class ThreadCoordinator: ...@@ -332,6 +341,7 @@ class ThreadCoordinator:
# end, and maybe signal them to stop # end, and maybe signal them to stop
else: else:
raise raise
return transitionFailed
self.resetExecutedTasks() # clear the tasks after we are done self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step # Get ready for next step
...@@ -373,7 +383,13 @@ class ThreadCoordinator: ...@@ -373,7 +383,13 @@ class ThreadCoordinator:
break # do transition only if tasks are error free break # do transition only if tasks are error free
# Ending previous step # Ending previous step
try:
transitionFailed = self._doTransition() # To start, we end step -1 first transitionFailed = self._doTransition() # To start, we end step -1 first
except taos.error.ProgrammingError as err:
transitionFailed = True
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
logger.info("Transition failed: errno=0x{:X}, msg: {}".format(errno2, err))
# Then we move on to the next step # Then we move on to the next step
self._releaseAllWorkerThreads(transitionFailed) self._releaseAllWorkerThreads(transitionFailed)
...@@ -788,12 +804,15 @@ class DbConnNative(DbConn): ...@@ -788,12 +804,15 @@ class DbConnNative(DbConn):
else: else:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
buildPath = None
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
break break
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}".format(self_path))
return buildPath return buildPath
...@@ -1015,29 +1034,11 @@ class StateDbOnly(AnyState): ...@@ -1015,29 +1034,11 @@ class StateDbOnly(AnyState):
if (not self.hasTask(tasks, TaskCreateDb)): if (not self.hasTask(tasks, TaskCreateDb)):
# only if we don't create any more # only if we don't create any more
self.assertAtMostOneSuccess(tasks, TaskDropDb) self.assertAtMostOneSuccess(tasks, TaskDropDb)
self.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # TODO: restore the below, the problem exists, although unlikely in real-world
# Nothing to be said about adding data task # if (gSvcMgr!=None) and gSvcMgr.isRestarting():
# if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) :
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY
# if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
# if ( not self.hasTask(tasks, TaskDropSuperTable) ):
# self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
# self._state = self.STATE_TABLE_ONLY
# else:
# self._state = self.STATE_HAS_DATA
# What about AddFixedData?
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
# self._state = self.STATE_HAS_DATA
# else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
# self._state = self.STATE_DB_ONLY # no change
class StateSuperTableOnly(AnyState): class StateSuperTableOnly(AnyState):
...@@ -1192,7 +1193,7 @@ class StateMechine: ...@@ -1192,7 +1193,7 @@ class StateMechine:
# case of multiple creation and drops # case of multiple creation and drops
if self._curState.canDropDb(): if self._curState.canDropDb():
if gSvcMgr == None: # only if we are not restarting service if gSvcMgr == None: # only if we are running as client-only
self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
# case of drop-create-drop # case of drop-create-drop
...@@ -1522,8 +1523,7 @@ class Task(): ...@@ -1522,8 +1523,7 @@ class Task():
try: try:
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if ( errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if (gConfig.continue_on_exception): # user choose to continue if (gConfig.continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
...@@ -1961,6 +1961,7 @@ class TaskRestartService(StateTransitionTask): ...@@ -1961,6 +1961,7 @@ class TaskRestartService(StateTransitionTask):
return state.canDropFixedSuperTable() # Basicallly when we have the super table return state.canDropFixedSuperTable() # Basicallly when we have the super table
return False # don't run this otherwise return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 100
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not gConfig.auto_start_service: # only execute when we are in -a mode if not gConfig.auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True) print("_a", end="", flush=True)
...@@ -1972,7 +1973,7 @@ class TaskRestartService(StateTransitionTask): ...@@ -1972,7 +1973,7 @@ class TaskRestartService(StateTransitionTask):
return return
self._isRunning = True self._isRunning = True
if Dice.throw(50) == 0: # 1 in N chance if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
dbc = wt.getDbConn() dbc = wt.getDbConn()
dbc.execute("show databases") # simple delay, align timing with other workers dbc.execute("show databases") # simple delay, align timing with other workers
gSvcMgr.restart() gSvcMgr.restart()
...@@ -2460,10 +2461,11 @@ class TdeSubProcess: ...@@ -2460,10 +2461,11 @@ class TdeSubProcess:
# filePath = os.path.join(logPath, f) # filePath = os.path.join(logPath, f)
# print("Removing log file: {}".format(filePath)) # print("Removing log file: {}".format(filePath))
# os.remove(filePath) # os.remove(filePath)
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S') logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Saving old log files to: {}".format(logPathSaved)) logger.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved) os.rename(logPath, logPathSaved)
os.mkdir(logPath) # recreate # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
svcCmd = [taosdPath, '-c', cfgPath] svcCmd = [taosdPath, '-c', cfgPath]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册