提交 e48d057f 编写于 作者: W wenzhouwww@live.cn

update

上级 119c4c7c
...@@ -420,10 +420,12 @@ class ThreadCoordinator: ...@@ -420,10 +420,12 @@ class ThreadCoordinator:
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
Logging.error("\n") Logging.error("\n")
Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format( Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
ThreadCoordinator.WORKER_THREAD_TIMEOUT)) ThreadCoordinator.WORKER_THREAD_TIMEOUT))
Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):") Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
ts = ThreadStacks() ts = ThreadStacks()
ts.record_current_time(time.time()) # record thread exit time at current moment
ts.print(filterInternal=True) ts.print(filterInternal=True)
workerTimeout = True workerTimeout = True
...@@ -678,7 +680,11 @@ class AnyState: ...@@ -678,7 +680,11 @@ class AnyState:
CAN_CREATE_STREAM = 3 # super table must exists CAN_CREATE_STREAM = 3 # super table must exists
CAN_CREATE_TOPIC = 3 # super table must exists CAN_CREATE_TOPIC = 3 # super table must exists
CAN_CREATE_CONSUMERS = 3 CAN_CREATE_CONSUMERS = 3
CAN_CREATE_SMA = 3
CAN_DROP_SMA = 3
CAN_DROP_FIXED_SUPER_TABLE = 4 CAN_DROP_FIXED_SUPER_TABLE = 4
CAN_DROP_TOPIC = 4
CAN_DROP_STREAM = 4
CAN_ADD_DATA = 5 CAN_ADD_DATA = 5
CAN_READ_DATA = 6 CAN_READ_DATA = 6
CAN_DELETE_DATA = 6 CAN_DELETE_DATA = 6
...@@ -735,12 +741,24 @@ class AnyState: ...@@ -735,12 +741,24 @@ class AnyState:
def canCreateTopic(self): def canCreateTopic(self):
return self._info[self.CAN_CREATE_TOPIC] return self._info[self.CAN_CREATE_TOPIC]
def canDropTopic(self):
return self._info[self.CAN_DROP_TOPIC]
def canCreateConsumers(self): def canCreateConsumers(self):
return self._info[self.CAN_CREATE_CONSUMERS] return self._info[self.CAN_CREATE_CONSUMERS]
def canCreateStream(self): def canCreateSma(self):
return self._info[self.CAN_CREATE_SMA]
def canDropSma(self):
return self._info[self.CAN_DROP_SMA]
def canCreateStreams(self):
return self._info[self.CAN_CREATE_STREAM] return self._info[self.CAN_CREATE_STREAM]
def canDropStream(self):
return self._info[self.CAN_DROP_STREAM]
def canAddData(self): def canAddData(self):
return self._info[self.CAN_ADD_DATA] return self._info[self.CAN_ADD_DATA]
...@@ -919,7 +937,7 @@ class StateHasData(AnyState): ...@@ -919,7 +937,7 @@ class StateHasData(AnyState):
): # only if we didn't create one ): # only if we didn't create one
# we shouldn't have dropped it # we shouldn't have dropped it
self.assertNoTask(tasks, TaskDropDb) self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, TaskCreateSuperTable) if not( self.hasTask(tasks, TaskCreateSuperTable)
): # if we didn't create the table ): # if we didn't create the table
# we should not have a task that drops it # we should not have a task that drops it
self.assertNoTask(tasks, TaskDropSuperTable) self.assertNoTask(tasks, TaskDropSuperTable)
...@@ -1376,6 +1394,7 @@ class Task(): ...@@ -1376,6 +1394,7 @@ class Task():
0x396, # Database in creating status 0x396, # Database in creating status
0x386, # Database in droping status 0x386, # Database in droping status
0x03E1, # failed on tmq_subscribe ,topic not exist 0x03E1, # failed on tmq_subscribe ,topic not exist
0x03ed , # Topic must be dropped first, SQL: drop database db_0
...@@ -1694,19 +1713,20 @@ class TaskDropDb(StateTransitionTask): ...@@ -1694,19 +1713,20 @@ class TaskDropDb(StateTransitionTask):
# drop topics before drop db # drop topics before drop db
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()): # if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None) # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName) # self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName)
self.execWtSql(wt, "drop database {}".format(self._db.getName())) self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists
Logging.debug("[OPS] database dropped at {}".format(time.time())) Logging.debug("[OPS] database dropped at {}".format(time.time()))
'''
# Streams will generator TD-20237 (it will crash taosd , start this task when this issue fixed ) # Streams will generator TD-20237 (it will crash taosd , start this task when this issue fixed )
class TaskCreateStream(StateTransitionTask): class TaskCreateStream(StateTransitionTask):
@classmethod @classmethod
...@@ -1715,7 +1735,7 @@ class TaskCreateStream(StateTransitionTask): ...@@ -1715,7 +1735,7 @@ class TaskCreateStream(StateTransitionTask):
@classmethod @classmethod
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canCreateStream() return state.canCreateStreams()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName() dbname = self._db.getName()
...@@ -1784,7 +1804,6 @@ class TaskCreateStream(StateTransitionTask): ...@@ -1784,7 +1804,6 @@ class TaskCreateStream(StateTransitionTask):
if errno in [0x03f0]: # stream already exists if errno in [0x03f0]: # stream already exists
# stream need drop before drop table # stream need drop before drop table
pass pass
'''
class TaskCreateTopic(StateTransitionTask): class TaskCreateTopic(StateTransitionTask):
...@@ -1856,7 +1875,7 @@ class TaskCreateTopic(StateTransitionTask): ...@@ -1856,7 +1875,7 @@ class TaskCreateTopic(StateTransitionTask):
Logging.debug("[OPS] topic is creating at {}".format(time.time())) Logging.debug("[OPS] topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # topic already exists if errno in [0x03f0 ]: # topic already exists
# topic need drop before drop table # topic need drop before drop table
pass pass
...@@ -1878,7 +1897,7 @@ class TaskCreateTopic(StateTransitionTask): ...@@ -1878,7 +1897,7 @@ class TaskCreateTopic(StateTransitionTask):
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname) topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname)
try: try:
self.execWtSql(wt, "use {}".format(dbname)) self.execWtSql(wt, "use {}".format(dbname))
self.execWtSql(wt, topic_sql) self.queryWtSql(wt, topic_sql)
Logging.debug("[OPS] stable topic is creating at {}".format(time.time())) Logging.debug("[OPS] stable topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
...@@ -1899,6 +1918,104 @@ class TaskCreateTopic(StateTransitionTask): ...@@ -1899,6 +1918,104 @@ class TaskCreateTopic(StateTransitionTask):
else: else:
pass pass
class TaskDropTopics(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropTopic()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
tblName = sTable.getName()
if sTable.hasTopics(wt.getDbConn()):
sTable.dropTopics(wt.getDbConn(),dbname,None) # drop topics of database
sTable.dropTopics(wt.getDbConn(),dbname,tblName) # drop topics of stable
class TaskCreateSma(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canCreateSma()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database
# sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable
class TaskDropStreams(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropStream()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database
# sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable
class TaskDropStreamTables(StateTransitionTask):
@classmethod
def getEndState(cls):
return StateHasData()
@classmethod
def canBeginFrom(cls, state: AnyState):
return state.canDropStream()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
# tblName = sTable.getName()
if sTable.hasStreamTables(wt.getDbConn()):
# sTable.dropStreams(wt.getDbConn())
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
class TaskCreateConsumers(StateTransitionTask): class TaskCreateConsumers(StateTransitionTask):
...@@ -1919,9 +2036,10 @@ class TaskCreateConsumers(StateTransitionTask): ...@@ -1919,9 +2036,10 @@ class TaskCreateConsumers(StateTransitionTask):
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
# create Consumers # create Consumers
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task # if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
if sTable.hasTopics(wt.getDbConn()): if sTable.hasTopics(wt.getDbConn()):
sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
pass
else: else:
print(" restful not support tmq consumers") print(" restful not support tmq consumers")
return return
...@@ -1969,17 +2087,17 @@ class TdSuperTable: ...@@ -1969,17 +2087,17 @@ class TdSuperTable:
dbName = self._dbName dbName = self._dbName
if self.exists(dbc) : # if myself exists if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName fullTableName = dbName + '.' + self._stName
if self.hasStreams(dbc): # if self.hasStreams(dbc):
self.dropStreams(dbc) # self.dropStreams(dbc)
self.dropStreamTables(dbc) # self.dropStreamTables(dbc)
if self.hasTopics(dbc): # if self.hasTopics(dbc):
self.dropTopics(dbName,None) # self.dropTopics(dbName,None)
self.dropTopics(dbName,self._stName) # self.dropTopics(dbName,self._stName)
try: try:
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table
pass pass
# # stream need drop before drop table # # stream need drop before drop table
# for stream in self.getStreamName(): # for stream in self.getStreamName():
...@@ -2012,17 +2130,21 @@ class TdSuperTable: ...@@ -2012,17 +2130,21 @@ class TdSuperTable:
if dbc.existsSuperTable(self._stName): if dbc.existsSuperTable(self._stName):
if dropIfExists: if dropIfExists:
if self.hasStreams(dbc): # if self.hasStreams(dbc):
self.dropStreams(dbc) # self.dropStreams(dbc)
self.dropStreamTables(dbc) # self.dropStreamTables(dbc)
# drop topics before drop stables
if self.hasTopics(dbc):
self.dropTopics(dbc,self._dbName,None)
self.dropTopics(dbc,self._dbName,self._stName )
# # drop topics before drop stables
# if self.hasTopics(dbc):
# self.dropTopics(dbc,self._dbName,None)
# self.dropTopics(dbc,self._dbName,self._stName )
try:
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table
pass
pass pass
# dbc.execute("DROP TABLE {}".format(fullTableName)) # dbc.execute("DROP TABLE {}".format(fullTableName))
...@@ -2125,6 +2247,7 @@ class TdSuperTable: ...@@ -2125,6 +2247,7 @@ class TdSuperTable:
if dbname in topic[0] and topic[0].startswith("database"): if dbname in topic[0] and topic[0].startswith("database"):
try: try:
dbc.execute('drop topic {}'.format(topic[0])) dbc.execute('drop topic {}'.format(topic[0]))
Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x03EB]: # Topic subscribed cannot be dropped if errno in [0x03EB]: # Topic subscribed cannot be dropped
...@@ -2140,6 +2263,7 @@ class TdSuperTable: ...@@ -2140,6 +2263,7 @@ class TdSuperTable:
for topic in topics: for topic in topics:
if topic[0].startswith(self._dbName) and topic[0].endswith('topic'): if topic[0].startswith(self._dbName) and topic[0].endswith('topic'):
dbc.execute('drop topic {}'.format(topic[0])) dbc.execute('drop topic {}'.format(topic[0]))
Logging.debug("[OPS] topic {} is droping at {}".format(topic,time.time()))
return True return True
else: else:
return True return True
...@@ -2455,15 +2579,16 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -2455,15 +2579,16 @@ class TaskDropSuperTable(StateTransitionTask):
# Drop the super table itself # Drop the super table itself
tblName = self._db.getFixedSuperTableName() tblName = self._db.getFixedSuperTableName()
# drop streams before drop stables # # drop streams before drop stables
if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()): # if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()):
self._db.getFixedSuperTable().dropStreams(wt.getDbConn()) # self._db.getFixedSuperTable().dropStreams(wt.getDbConn())
self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn()) # self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn())
# # drop topics before drop stables
# if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
# drop topics before drop stables
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
try: try:
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
...@@ -2949,6 +3074,9 @@ class ThreadStacks: # stack info for all threads ...@@ -2949,6 +3074,9 @@ class ThreadStacks: # stack info for all threads
shortTid = th.native_id % 10000 #type: ignore shortTid = th.native_id % 10000 #type: ignore
self._allStacks[shortTid] = stack # Was using th.native_id self._allStacks[shortTid] = stack # Was using th.native_id
def record_current_time(self,current_time):
self.current_time = current_time
def print(self, filteredEndName = None, filterInternal = False): def print(self, filteredEndName = None, filterInternal = False):
for shortTid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom for shortTid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1] lastFrame = stack[-1]
...@@ -2963,9 +3091,11 @@ class ThreadStacks: # stack info for all threads ...@@ -2963,9 +3091,11 @@ class ThreadStacks: # stack info for all threads
continue # ignore continue # ignore
# Now print # Now print
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid)) print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid))
lastSqlForThread = DbConn.fetchSqlForThread(shortTid) lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
time_cost = DbConn.get_time_cost() last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, time_cost ,lastSqlForThread)) # time_cost = DbConn.get_time_cost()
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread))
stackFrame = 0 stackFrame = 0
for frame in stack: # was using: reversed(stack) for frame in stack: # was using: reversed(stack)
# print(frame) # print(frame)
......
...@@ -32,7 +32,7 @@ class DbConn: ...@@ -32,7 +32,7 @@ class DbConn:
# class variables # class variables
lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000 spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
current_time : dict[int, float] = {} # save current time
@classmethod @classmethod
def saveSqlForCurrentThread(cls, sql: str): def saveSqlForCurrentThread(cls, sql: str):
''' '''
...@@ -44,6 +44,7 @@ class DbConn: ...@@ -44,6 +44,7 @@ class DbConn:
th = threading.current_thread() th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore shortTid = th.native_id % 10000 #type: ignore
cls.lastSqlFromThreads[shortTid] = sql # Save this for later cls.lastSqlFromThreads[shortTid] = sql # Save this for later
cls.record_save_sql_time()
@classmethod @classmethod
def fetchSqlForThread(cls, shortTid : int) -> str : def fetchSqlForThread(cls, shortTid : int) -> str :
...@@ -53,6 +54,25 @@ class DbConn: ...@@ -53,6 +54,25 @@ class DbConn:
raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid)) raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid))
return cls.lastSqlFromThreads[shortTid] return cls.lastSqlFromThreads[shortTid]
@classmethod
def get_save_sql_time(cls, shortTid : int):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
return cls.current_time[shortTid]
@classmethod
def record_save_sql_time(cls):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.current_time[shortTid] = float(time.time()) # Save this for later
@classmethod @classmethod
def sql_exec_spend(cls, cost: float): def sql_exec_spend(cls, cost: float):
...@@ -461,7 +481,6 @@ class DbConnNative(DbConn): ...@@ -461,7 +481,6 @@ class DbConnNative(DbConn):
time_cost = time.time() - time_start time_cost = time.time() - time_start
self.sql_exec_spend(time_cost) self.sql_exec_spend(time_cost)
cls = self.__class__ cls = self.__class__
cls.totalRequests += 1 cls.totalRequests += 1
Logging.debug( Logging.debug(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册