From e48d057f7048c28659b2cb26be91965b73987f16 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Thu, 10 Nov 2022 18:36:45 +0800 Subject: [PATCH] update --- tests/pytest/crash_gen/crash_gen_main.py | 212 ++++++++++++++++++----- tests/pytest/crash_gen/shared/db.py | 23 ++- 2 files changed, 192 insertions(+), 43 deletions(-) diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 2a9966eaf8..ce0a306a9b 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -420,10 +420,12 @@ class ThreadCoordinator: except threading.BrokenBarrierError as err: self._execStats.registerFailure("Aborted due to worker thread timeout") Logging.error("\n") + Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format( ThreadCoordinator.WORKER_THREAD_TIMEOUT)) Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):") ts = ThreadStacks() + ts.record_current_time(time.time()) # record thread exit time at current moment ts.print(filterInternal=True) workerTimeout = True @@ -678,7 +680,11 @@ class AnyState: CAN_CREATE_STREAM = 3 # super table must exists CAN_CREATE_TOPIC = 3 # super table must exists CAN_CREATE_CONSUMERS = 3 + CAN_CREATE_SMA = 3 + CAN_DROP_SMA = 3 CAN_DROP_FIXED_SUPER_TABLE = 4 + CAN_DROP_TOPIC = 4 + CAN_DROP_STREAM = 4 CAN_ADD_DATA = 5 CAN_READ_DATA = 6 CAN_DELETE_DATA = 6 @@ -734,13 +740,25 @@ class AnyState: def canCreateTopic(self): return self._info[self.CAN_CREATE_TOPIC] + + def canDropTopic(self): + return self._info[self.CAN_DROP_TOPIC] def canCreateConsumers(self): return self._info[self.CAN_CREATE_CONSUMERS] + + def canCreateSma(self): + return self._info[self.CAN_CREATE_SMA] + + def canDropSma(self): + return self._info[self.CAN_DROP_SMA] - def canCreateStream(self): + def canCreateStreams(self): return self._info[self.CAN_CREATE_STREAM] + def canDropStream(self): + return self._info[self.CAN_DROP_STREAM] + def canAddData(self): return self._info[self.CAN_ADD_DATA] @@ -919,7 +937,7 @@ class StateHasData(AnyState): ): # only if we didn't create one # we shouldn't have dropped it self.assertNoTask(tasks, TaskDropDb) - if (not self.hasTask(tasks, TaskCreateSuperTable) + if not( self.hasTask(tasks, TaskCreateSuperTable) ): # if we didn't create the table # we should not have a task that drops it self.assertNoTask(tasks, TaskDropSuperTable) @@ -1376,6 +1394,7 @@ class Task(): 0x396, # Database in creating status 0x386, # Database in droping status 0x03E1, # failed on tmq_subscribe ,topic not exist + 0x03ed , # Topic must be dropped first, SQL: drop database db_0 @@ -1694,19 +1713,20 @@ class TaskDropDb(StateTransitionTask): # drop topics before drop db - if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()): + # if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()): - self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None) - self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName) + # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None) + # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName) - self.execWtSql(wt, "drop database {}".format(self._db.getName())) + self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists Logging.debug("[OPS] database dropped at {}".format(time.time())) -''' + # Streams will generator TD-20237 (it will crash taosd , start this task when this issue fixed ) + class TaskCreateStream(StateTransitionTask): @classmethod @@ -1715,7 +1735,7 @@ class TaskCreateStream(StateTransitionTask): @classmethod def canBeginFrom(cls, state: AnyState): - return state.canCreateStream() + return state.canCreateStreams() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): dbname = self._db.getName() @@ -1784,7 +1804,6 @@ class TaskCreateStream(StateTransitionTask): if errno in [0x03f0]: # stream already exists # stream need drop before drop table pass -''' class TaskCreateTopic(StateTransitionTask): @@ -1856,7 +1875,7 @@ class TaskCreateTopic(StateTransitionTask): Logging.debug("[OPS] topic is creating at {}".format(time.time())) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno in [0x03f0]: # topic already exists + if errno in [0x03f0 ]: # topic already exists # topic need drop before drop table pass @@ -1878,7 +1897,7 @@ class TaskCreateTopic(StateTransitionTask): topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname) try: self.execWtSql(wt, "use {}".format(dbname)) - self.execWtSql(wt, topic_sql) + self.queryWtSql(wt, topic_sql) Logging.debug("[OPS] stable topic is creating at {}".format(time.time())) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) @@ -1898,7 +1917,105 @@ class TaskCreateTopic(StateTransitionTask): pass else: pass - + +class TaskDropTopics(StateTransitionTask): + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropTopic() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + dbname = self._db.getName() + + + if not self._db.exists(wt.getDbConn()): + Logging.debug("Skipping task, no DB yet") + return + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + tblName = sTable.getName() + if sTable.hasTopics(wt.getDbConn()): + sTable.dropTopics(wt.getDbConn(),dbname,None) # drop topics of database + sTable.dropTopics(wt.getDbConn(),dbname,tblName) # drop topics of stable + +class TaskCreateSma(StateTransitionTask): + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateSma() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + # dbname = self._db.getName() + + if not self._db.exists(wt.getDbConn()): + Logging.debug("Skipping task, no DB yet") + return + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + # tblName = sTable.getName() + if sTable.hasStreams(wt.getDbConn()): + sTable.dropStreams(wt.getDbConn()) # drop stream of database + # sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable + +class TaskDropStreams(StateTransitionTask): + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropStream() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + # dbname = self._db.getName() + + + if not self._db.exists(wt.getDbConn()): + Logging.debug("Skipping task, no DB yet") + return + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + # tblName = sTable.getName() + if sTable.hasStreams(wt.getDbConn()): + sTable.dropStreams(wt.getDbConn()) # drop stream of database + # sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable + +class TaskDropStreamTables(StateTransitionTask): + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropStream() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + # dbname = self._db.getName() + + + if not self._db.exists(wt.getDbConn()): + Logging.debug("Skipping task, no DB yet") + return + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + # tblName = sTable.getName() + if sTable.hasStreamTables(wt.getDbConn()): + # sTable.dropStreams(wt.getDbConn()) + sTable.dropStreamTables(wt.getDbConn()) # drop stream tables class TaskCreateConsumers(StateTransitionTask): @@ -1919,9 +2036,10 @@ class TaskCreateConsumers(StateTransitionTask): # wt.execSql("use db") # should always be in place # create Consumers - if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task - if sTable.hasTopics(wt.getDbConn()): - sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) + # if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task + if sTable.hasTopics(wt.getDbConn()): + sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) + pass else: print(" restful not support tmq consumers") return @@ -1969,17 +2087,17 @@ class TdSuperTable: dbName = self._dbName if self.exists(dbc) : # if myself exists fullTableName = dbName + '.' + self._stName - if self.hasStreams(dbc): - self.dropStreams(dbc) - self.dropStreamTables(dbc) - if self.hasTopics(dbc): - self.dropTopics(dbName,None) - self.dropTopics(dbName,self._stName) + # if self.hasStreams(dbc): + # self.dropStreams(dbc) + # self.dropStreamTables(dbc) + # if self.hasTopics(dbc): + # self.dropTopics(dbName,None) + # self.dropTopics(dbName,self._stName) try: dbc.execute("DROP TABLE {}".format(fullTableName)) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist + if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table pass # # stream need drop before drop table # for stream in self.getStreamName(): @@ -2012,17 +2130,21 @@ class TdSuperTable: if dbc.existsSuperTable(self._stName): if dropIfExists: - if self.hasStreams(dbc): - self.dropStreams(dbc) - self.dropStreamTables(dbc) + # if self.hasStreams(dbc): + # self.dropStreams(dbc) + # self.dropStreamTables(dbc) - # drop topics before drop stables - if self.hasTopics(dbc): - self.dropTopics(dbc,self._dbName,None) - self.dropTopics(dbc,self._dbName,self._stName ) + # # drop topics before drop stables + # if self.hasTopics(dbc): + # self.dropTopics(dbc,self._dbName,None) + # self.dropTopics(dbc,self._dbName,self._stName ) - - dbc.execute("DROP TABLE {}".format(fullTableName)) + try: + dbc.execute("DROP TABLE {}".format(fullTableName)) + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table + pass pass # dbc.execute("DROP TABLE {}".format(fullTableName)) @@ -2125,6 +2247,7 @@ class TdSuperTable: if dbname in topic[0] and topic[0].startswith("database"): try: dbc.execute('drop topic {}'.format(topic[0])) + Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time())) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) if errno in [0x03EB]: # Topic subscribed cannot be dropped @@ -2140,6 +2263,7 @@ class TdSuperTable: for topic in topics: if topic[0].startswith(self._dbName) and topic[0].endswith('topic'): dbc.execute('drop topic {}'.format(topic[0])) + Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time())) return True else: return True @@ -2455,15 +2579,16 @@ class TaskDropSuperTable(StateTransitionTask): # Drop the super table itself tblName = self._db.getFixedSuperTableName() - # drop streams before drop stables - if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()): - self._db.getFixedSuperTable().dropStreams(wt.getDbConn()) - self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn()) + # # drop streams before drop stables + # if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()): + # self._db.getFixedSuperTable().dropStreams(wt.getDbConn()) + # self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn()) + + # # drop topics before drop stables + # if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()): + # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None) + # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName) - # drop topics before drop stables - if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()): - self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None) - self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName) try: self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) @@ -2949,6 +3074,9 @@ class ThreadStacks: # stack info for all threads shortTid = th.native_id % 10000 #type: ignore self._allStacks[shortTid] = stack # Was using th.native_id + def record_current_time(self,current_time): + self.current_time = current_time + def print(self, filteredEndName = None, filterInternal = False): for shortTid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom lastFrame = stack[-1] @@ -2963,9 +3091,11 @@ class ThreadStacks: # stack info for all threads continue # ignore # Now print print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid)) + lastSqlForThread = DbConn.fetchSqlForThread(shortTid) - time_cost = DbConn.get_time_cost() - print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, time_cost ,lastSqlForThread)) + last_sql_commit_time = DbConn.get_save_sql_time(shortTid) + # time_cost = DbConn.get_time_cost() + print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread)) stackFrame = 0 for frame in stack: # was using: reversed(stack) # print(frame) diff --git a/tests/pytest/crash_gen/shared/db.py b/tests/pytest/crash_gen/shared/db.py index 35ec9a5da6..05711efbc6 100644 --- a/tests/pytest/crash_gen/shared/db.py +++ b/tests/pytest/crash_gen/shared/db.py @@ -32,7 +32,7 @@ class DbConn: # class variables lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 - + current_time : dict[int, float] = {} # save current time @classmethod def saveSqlForCurrentThread(cls, sql: str): ''' @@ -44,6 +44,7 @@ class DbConn: th = threading.current_thread() shortTid = th.native_id % 10000 #type: ignore cls.lastSqlFromThreads[shortTid] = sql # Save this for later + cls.record_save_sql_time() @classmethod def fetchSqlForThread(cls, shortTid : int) -> str : @@ -53,6 +54,25 @@ class DbConn: raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid)) return cls.lastSqlFromThreads[shortTid] + @classmethod + def get_save_sql_time(cls, shortTid : int): + ''' + Let us save the last SQL statement on a per-thread basis, so that when later we + run into a dead-lock situation, we can pick out the deadlocked thread, and use + that information to find what what SQL statement is stuck. + ''' + return cls.current_time[shortTid] + + @classmethod + def record_save_sql_time(cls): + ''' + Let us save the last SQL statement on a per-thread basis, so that when later we + run into a dead-lock situation, we can pick out the deadlocked thread, and use + that information to find what what SQL statement is stuck. + ''' + th = threading.current_thread() + shortTid = th.native_id % 10000 #type: ignore + cls.current_time[shortTid] = float(time.time()) # Save this for later @classmethod def sql_exec_spend(cls, cost: float): @@ -460,7 +480,6 @@ class DbConnNative(DbConn): finally: time_cost = time.time() - time_start self.sql_exec_spend(time_cost) - cls = self.__class__ cls.totalRequests += 1 -- GitLab