diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index 539314dea46d7901a4e6d58f8aed0f7aeef603e1..cc2941a52a3fabfd5db9bfff30123d796c767369 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -45,7 +45,7 @@ fi # Now getting ready to execute Python # The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk -PYTHON_EXEC=python3.8 +PYTHON_EXEC=python3 # First we need to set up a path for Python to find our own TAOS modules, so that "import" can work. # export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd) diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 600c64b8e6ac0a521d3c736c3256c79dfcefbf8e..d8946780a2562afc4af998487cc410800e4ca1e0 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -254,7 +254,7 @@ class WorkerThread: class ThreadCoordinator: - WORKER_THREAD_TIMEOUT = 120 # Normal: 120 + WORKER_THREAD_TIMEOUT = 1200 # Normal: 120 def __init__(self, pool: ThreadPool, dbManager: DbManager): self._curStep = -1 # first step is 0 @@ -674,9 +674,12 @@ class AnyState: # only "under normal circumstances", as we may override it with the -b option CAN_DROP_DB = 2 CAN_CREATE_FIXED_SUPER_TABLE = 3 + CAN_CREATE_STREAM = 3 # super table must exists CAN_DROP_FIXED_SUPER_TABLE = 4 CAN_ADD_DATA = 5 + CAN_DROP_STREAM = 5 CAN_READ_DATA = 6 + CAN_DELETE_DATA = 6 def __init__(self): self._info = self.getInfo() @@ -727,12 +730,18 @@ class AnyState: return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] + def canCreateStream(self): + return self._info[self.CAN_CREATE_STREAM] + def canAddData(self): return self._info[self.CAN_ADD_DATA] def canReadData(self): return self._info[self.CAN_READ_DATA] + def canDeleteData(self): + return self._info[self.CAN_DELETE_DATA] + def assertAtMostOneSuccess(self, tasks, cls): sCnt = 0 for task in tasks: @@ -921,7 +930,7 @@ class StateMechine: except taos.error.ProgrammingError as err: Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err)) traceback.print_stack() - raise # re-throw + pass # re-throw # TODO: seems no lnoger used, remove? def getCurrentState(self): @@ -974,14 +983,21 @@ class StateMechine: # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own dbc.use(dbName) + if not dbc.hasTables(): # no tables + Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() # For sure we have tables, which means we must have the super table. # TODO: are we sure? + sTable = self._db.getFixedSuperTable() - if sTable.hasRegTables(dbc): # no regular tables + + + if sTable.hasRegTables(dbc): # no regular tables + # print("debug=====*\n"*100) Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() else: # has actual tables Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) @@ -1109,6 +1125,7 @@ class Database: return "fs_table" def getFixedSuperTable(self) -> TdSuperTable: + return TdSuperTable(self.getFixedSuperTableName(), self.getName()) # We aim to create a starting time tick, such that, whenever we run our test here once @@ -1342,6 +1359,11 @@ class Task(): 0x2603, # Table does not exist, replaced by 2662 below 0x260d, # Tags number not matched 0x2662, # Table does not exist #TODO: what about 2603 above? + 0x032C, # Object is creating + 0x032D, # Object is dropping + 0x03D3, # Conflict transaction not completed + 0x0707, # Query not ready , it always occur at replica 3 + 0x707, # Query not ready @@ -1638,9 +1660,12 @@ class TaskCreateDb(StateTransitionTask): # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N numReplica = Config.getConfig().num_replicas # fixed, always repStr = "replica {}".format(numReplica) - updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active + updatePostfix = "" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active , 3.0 version default is update 1 + vg_nums = random.randint(1,8) + cache_model = Dice.choice(['none' , 'last_row' , 'last_value' , 'both']) + buffer = random.randint(3,128) dbName = self._db.getName() - self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) + self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr, updatePostfix, vg_nums, cache_model,buffer ) ) if dbName == "db_0" and Config.getConfig().use_shadow_db: self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) @@ -1657,6 +1682,69 @@ class TaskDropDb(StateTransitionTask): self.execWtSql(wt, "drop database {}".format(self._db.getName())) Logging.debug("[OPS] database dropped at {}".format(time.time())) +class TaskCreateStream(StateTransitionTask): + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canCreateStream() + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + dbname = self._db.getName() + + sub_stream_name = dbname+ '_sub_stream' + sub_stream_tb_name = 'avg_sub' + super_stream_name = dbname+ '_super_stream' + super_stream_tb_name = 'avg_super' + if not self._db.exists(wt.getDbConn()): + Logging.debug("Skipping task, no DB yet") + return + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + + # create stream + ''' + CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); + ''' + stbname =sTable.getName() + sub_tables = sTable.getRegTables(wt.getDbConn()) + if sub_tables: + + if sub_tables: # if not empty + sub_tbname = sub_tables[0] + + # create stream with query above sub_table + stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,dbname,sub_tbname) + try: + self.execWtSql(wt, stream_sql) + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [0x03f0]: # stream already exists + # stream need drop before drop table + pass + + sTable.setStreamName(sub_stream_name) + else: + pass + + else: + stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,dbname,stbname) + + try: + self.execWtSql(wt, stream_sql) + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [0x03f0]: # stream already exists + # stream need drop before drop table + pass + + + + class TaskCreateSuperTable(StateTransitionTask): @classmethod def getEndState(cls): @@ -1688,15 +1776,40 @@ class TdSuperTable: def __init__(self, stName, dbName): self._stName = stName self._dbName = dbName + self._streamName = [] def getName(self): return self._stName + def setStreamName(self,name): + self._streamName.append(name) + + def getStreamName(self): + return self._streamName + def drop(self, dbc, skipCheck = False): dbName = self._dbName if self.exists(dbc) : # if myself exists - fullTableName = dbName + '.' + self._stName - dbc.execute("DROP TABLE {}".format(fullTableName)) + fullTableName = dbName + '.' + self._stName + try: + dbc.execute("DROP TABLE {}".format(fullTableName)) + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist + pass + # # stream need drop before drop table + # for stream in self.getStreamName(): + # drop_stream_sql = 'drop stream {}'.format(stream) + # try: + # dbc.execute(drop_stream_sql) + # except taos.error.ProgrammingError as err: + # # correcting for strange error number scheme + # errno3 = Helper.convertErrno(err.errno) + # if errno3 in [1011,0x3F3,0x03f3,0x2662,0x03f1]: # stream not exists + # pass + # dbc.execute("DROP TABLE {}".format(fullTableName)) + # pass + else: if not skipCheck: raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName)) @@ -1711,10 +1824,17 @@ class TdSuperTable: dbName = self._dbName dbc.execute("USE " + dbName) - fullTableName = dbName + '.' + self._stName + fullTableName = dbName + '.' + self._stName + if dbc.existsSuperTable(self._stName): if dropIfExists: + dbc.execute("DROP TABLE {}".format(fullTableName)) + + pass + + + # dbc.execute("DROP TABLE {}".format(fullTableName)) else: # error raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName)) @@ -1733,7 +1853,7 @@ class TdSuperTable: def getRegTables(self, dbc: DbConn): dbName = self._dbName try: - dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later + dbc.query("select distinct TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) @@ -1743,7 +1863,44 @@ class TdSuperTable: return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation def hasRegTables(self, dbc: DbConn): - return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 + # print(self._stName) + # dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) + # print(dbc.getQueryResult()) + if self.hasStreamTables(dbc) or self.hasStreams(dbc): + if self.dropStreams(dbc): + self.dropStreamTables(dbc) + if dbc.existsSuperTable(self._stName): + + return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 + else: + return False + + def hasStreamTables(self,dbc: DbConn): + + return dbc.query("show {}.stables like 'avg%'".format(self._dbName)) > 0 + + def hasStreams(self,dbc: DbConn): + return dbc.query("show streams") > 0 + + def dropStreams(self,dbc:DbConn): + dbc.query("show streams ") + Streams = dbc.getQueryResult() + for Stream in Streams: + if Stream[0].startswith(self._dbName): + dbc.execute('drop stream {}'.format(Stream[0])) + + return not dbc.query("show streams ") > 0 + + def dropStreamTables(self, dbc: DbConn): + dbc.query("show {}.stables like 'avg%'".format(self._dbName)) + + StreamTables = dbc.getQueryResult() + + for StreamTable in StreamTables: + if self.dropStreams: + dbc.execute('drop table {}.{}'.format(self._dbName,StreamTable[0])) + + return not dbc.query("show {}.stables like 'avg%'".format(self._dbName)) def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): ''' @@ -1838,10 +1995,46 @@ class TdSuperTable: # Run the query against the regular table first doAggr = (Dice.throw(2) == 0) # 1 in 2 chance if not doAggr: # don't do aggregate query, just simple one + commonExpr = Dice.choice([ + '*', + # 'abs(speed)', + # 'acos(speed)', + # 'asin(speed)', + # 'atan(speed)', + # 'ceil(speed)', + # 'cos(speed)', + # 'cos(speed)', + # 'floor(speed)', + # 'log(speed,2)', + # 'pow(speed,2)', + # 'round(speed)', + # 'sin(speed)', + # 'sqrt(speed)', + # 'char_length(color)', + # 'concat(color,color)', + # 'concat_ws(" ", color,color," ")', + # 'length(color)', + # 'lower(color)', + # 'ltrim(color)', + # 'substr(color , 2)', + # 'upper(color)', + # 'cast(speed as double)', + # 'cast(ts as bigint)', + # # 'TO_ISO8601(color)', + # # 'TO_UNIXTIMESTAMP(ts)', + # 'now()', + # 'timediff(ts,now)', + # 'timezone()', + # 'TIMETRUNCATE(ts,1s)', + # 'TIMEZONE()', + # 'TODAY()', + # 'distinct(color)' + ] + ) ret.append(SqlQuery( # reg table - "select {} from {}.{}".format('*', self._dbName, rTbName))) + "select {} from {}.{}".format(commonExpr, self._dbName, rTbName))) ret.append(SqlQuery( # super table - "select {} from {}.{}".format('*', self._dbName, self.getName()))) + "select {} from {}.{}".format(commonExpr, self._dbName, self.getName()))) else: # Aggregate query aggExpr = Dice.choice([ 'count(*)', @@ -1857,17 +2050,38 @@ class TdSuperTable: 'top(speed, 50)', # TODO: not supported? 'bottom(speed, 50)', # TODO: not supported? 'apercentile(speed, 10)', # TODO: TD-1316 - # 'last_row(speed)', # TODO: commented out per TD-3231, we should re-create + 'last_row(*)', # TODO: commented out per TD-3231, we should re-create # Transformation Functions # 'diff(speed)', # TODO: no supported?! - 'spread(speed)' + 'spread(speed)', + # 'elapsed(ts)', + # 'mode(speed)', + # 'bottom(speed,1)', + # 'top(speed,1)', + # 'tail(speed,1)', + # 'unique(color)', + # 'csum(speed)', + # 'DERIVATIVE(speed,1s,1)', + # 'diff(speed,1)', + # 'irate(speed)', + # 'mavg(speed,3)', + # 'sample(speed,5)', + # 'STATECOUNT(speed,"LT",1)', + # 'STATEDURATION(speed,"LT",1)', + # 'twa(speed)' + + + + + ]) # TODO: add more from 'top' # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049) sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) if Dice.throw(3) == 0: # 1 in X chance - sql = sql + ' GROUP BY color' + partion_expr = Dice.choice(['color','tbname']) + sql = sql + ' partition BY ' + partion_expr + ' order by ' + partion_expr Progress.emit(Progress.QUERY_GROUP_BY) # Logging.info("Executing GROUP-BY query: " + sql) ret.append(SqlQuery(sql)) @@ -1964,16 +2178,17 @@ class TaskDropSuperTable(StateTransitionTask): isSuccess = True for i in tblSeq: regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) + streams_prix = self._db.getName() try: self.execWtSql(wt, "drop table {}.{}". format(self._db.getName(), regTableName)) # nRows always 0, like MySQL except taos.error.ProgrammingError as err: - # correcting for strange error number scheme - errno2 = Helper.convertErrno(err.errno) - if (errno2 in [0x362]): # mnode invalid table name - isSuccess = False - Logging.debug("[DB] Acceptable error when dropping a table") - continue # try to delete next regular table + pass + + + + + if (not tickOutput): tickOutput = True # Print only one time @@ -1984,7 +2199,17 @@ class TaskDropSuperTable(StateTransitionTask): # Drop the super table itself tblName = self._db.getFixedSuperTableName() - self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) + try: + self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) + except taos.error.ProgrammingError as err: + # correcting for strange error number scheme + errno2 = Helper.convertErrno(err.errno) + if (errno2 in [0x362]): # mnode invalid table name + isSuccess = False + Logging.debug("[DB] Acceptable error when dropping a table") + elif errno2 in [1011,0x3F3,0x03f3]: # table doesn't exist + + pass class TaskAlterTags(StateTransitionTask): @@ -2234,6 +2459,212 @@ class TaskAddData(StateTransitionTask): self.activeTable.discard(i) # not raising an error, unlike remove +class TaskDeleteData(StateTransitionTask): + # Track which table is being actively worked on + activeTable: Set[int] = set() + + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None # type: Optional[io.TextIOWrapper] + fAddLogDone = None # type: Optional[io.TextIOWrapper] + + @classmethod + def prepToRecordOps(cls): + if Config.getConfig().record_ops: + if (cls.fAddLogReady is None): + Logging.info( + "Recording in a file operations to be performed...") + cls.fAddLogReady = open("add_log_ready.txt", "w") + if (cls.fAddLogDone is None): + Logging.info("Recording in a file operations completed...") + cls.fAddLogDone = open("add_log_done.txt", "w") + + @classmethod + def getEndState(cls): + return StateHasData() + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDeleteData() + + def _lockTableIfNeeded(self, fullTableName, extraMsg = ''): + if Config.getConfig().verify_data: + # Logging.info("Locking table: {}".format(fullTableName)) + self.lockTable(fullTableName) + # Logging.info("Table locked {}: {}".format(extraMsg, fullTableName)) + # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written + else: + # Logging.info("Skipping locking table") + pass + + def _unlockTableIfNeeded(self, fullTableName): + if Config.getConfig().verify_data: + # Logging.info("Unlocking table: {}".format(fullTableName)) + self.unlockTable(fullTableName) + # Logging.info("Table unlocked: {}".format(fullTableName)) + else: + pass + # Logging.info("Skipping unlocking table") + + def _deleteData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + del_Records = int(numRecords/5) + if Dice.throw(2) == 0: + for j in range(del_Records): # number of records per table + intToWrite = db.getNextInt() + nextTick = db.getNextTick() + # nextColor = db.getNextColor() + if Config.getConfig().record_ops: + self.prepToRecordOps() + if self.fAddLogReady is None: + raise CrashGenError("Unexpected empty fAddLogReady") + self.fAddLogReady.write("Ready to delete {} to {}\n".format(intToWrite, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady.fileno()) + + # TODO: too ugly trying to lock the table reliably, refactor... + fullTableName = db.getName() + '.' + regTableName + self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock + + try: + sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {}) + fullTableName, + # ds.getFixedSuperTableName(), + # ds.getNextBinary(), ds.getNextFloat(), + nextTick) + + # print(sql) + # Logging.info("Adding data: {}".format(sql)) + dbc.execute(sql) + # Logging.info("Data added: {}".format(sql)) + intWrote = intToWrite + + # Quick hack, attach an update statement here. TODO: create an "update" task + if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + intToUpdate = db.getNextInt() # Updated, but should not succeed + # nextColor = db.getNextColor() + sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here + fullTableName, + nextTick) + # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( + # fullTableName, db.getNextInt(), db.getNextColor(), nextTick) + dbc.execute(sql) + intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this. + + except: # Any exception at all + self._unlockTableIfNeeded(fullTableName) + raise + + # Now read it back and verify, we might encounter an error if table is dropped + if Config.getConfig().verify_data: # only if command line asks for it + try: + readBack = dbc.queryScalar("SELECT * from {}.{} WHERE ts='{}'". + format(db.getName(), regTableName, nextTick)) + if readBack == None : + pass + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result + print("D1",end="") # D1 means delete data success and only 1 record + + elif errno in [0x218, 0x362]: # table doesn't exist + # do nothing + pass + else: + # Re-throw otherwise + raise + finally: + self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock + # Done with read-back verification, unlock the table now + # Successfully wrote the data into the DB, let's record it somehow + te.recordDataMark(intWrote) + else: + + # delete all datas and verify datas ,expected table is empty + if Config.getConfig().record_ops: + self.prepToRecordOps() + if self.fAddLogReady is None: + raise CrashGenError("Unexpected empty fAddLogReady") + self.fAddLogReady.write("Ready to delete {} to {}\n".format(intToWrite, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady.fileno()) + + # TODO: too ugly trying to lock the table reliably, refactor... + fullTableName = db.getName() + '.' + regTableName + self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock + + try: + sql = "delete from {} ;".format( # removed: tags ('{}', {}) + fullTableName) + # Logging.info("Adding data: {}".format(sql)) + dbc.execute(sql) + # Logging.info("Data added: {}".format(sql)) + + # Quick hack, attach an update statement here. TODO: create an "update" task + if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + sql = "delete from {} ;".format( # "INSERt" means "update" here + fullTableName) + dbc.execute(sql) + + except: # Any exception at all + self._unlockTableIfNeeded(fullTableName) + raise + + # Now read it back and verify, we might encounter an error if table is dropped + if Config.getConfig().verify_data: # only if command line asks for it + try: + readBack = dbc.queryScalar("SELECT * from {}.{} ". + format(db.getName(), regTableName)) + if readBack == None : + pass + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result + print("Da",end="") # Da means delete data success and for all datas + + elif errno in [0x218, 0x362]: # table doesn't exist + # do nothing + pass + else: + # Re-throw otherwise + raise + finally: + self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock + # Done with read-back verification, unlock the table now + + if Config.getConfig().record_ops: + if self.fAddLogDone is None: + raise CrashGenError("Unexpected empty fAddLogDone") + self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone.fileno()) + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access + db = self._db + dbc = wt.getDbConn() + numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + tblSeq = list(range(numTables )) + random.shuffle(tblSeq) # now we have random sequence + for i in tblSeq: + if (i in self.activeTable): # wow already active + # print("x", end="", flush=True) # concurrent insertion + Progress.emit(Progress.CONCURRENT_INSERTION) + else: + self.activeTable.add(i) # marking it active + + dbName = db.getName() + sTable = db.getFixedSuperTable() + regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) + fullTableName = dbName + '.' + regTableName + # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" + sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists + # self._unlockTable(fullTableName) + + self._deleteData(db, dbc, regTableName, te) + + self.activeTable.discard(i) # not raising an error, unlike remove + class ThreadStacks: # stack info for all threads def __init__(self): @@ -2259,7 +2690,8 @@ class ThreadStacks: # stack info for all threads # Now print print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid)) lastSqlForThread = DbConn.fetchSqlForThread(shortTid) - print("Last SQL statement attempted from thread {} is: {}".format(shortTid, lastSqlForThread)) + time_cost = DbConn.get_time_cost() + print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, time_cost ,lastSqlForThread)) stackFrame = 0 for frame in stack: # was using: reversed(stack) # print(frame)