diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index 8d2b0080bc69f5ca84852448dc5ccb197044b319..9bc701aa8b871a65ed2fc78fca3a3ae8c5883831 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -243,7 +243,7 @@ class WorkerThread: class ThreadCoordinator: - WORKER_THREAD_TIMEOUT = 180 # one minute + WORKER_THREAD_TIMEOUT = 120 # Normal: 120 def __init__(self, pool: ThreadPool, dbManager: DbManager): self._curStep = -1 # first step is 0 @@ -1177,6 +1177,8 @@ class Task(): instead. But a task is always associated with a DB ''' taskSn = 100 + _lock = threading.Lock() + _tableLocks: Dict[str, threading.Lock] = {} @classmethod def allocTaskNum(cls): @@ -1198,6 +1200,8 @@ class Task(): self._execStats = execStats self._db = db # A task is always associated/for a specific DB + + def isSuccess(self): return self._err is None @@ -1351,6 +1355,24 @@ class Task(): def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread return wt.getQueryResult() + def lockTable(self, ftName): # full table name + # print(" <<" + ftName + '_', end="", flush=True) + with Task._lock: + if not ftName in Task._tableLocks: + Task._tableLocks[ftName] = threading.Lock() + + Task._tableLocks[ftName].acquire() + + def unlockTable(self, ftName): + # print('_' + ftName + ">> ", end="", flush=True) + with Task._lock: + if not ftName in self._tableLocks: + raise RuntimeError("Corrupt state, no such lock") + lock = Task._tableLocks[ftName] + if not lock.locked(): + raise RuntimeError("Corrupte state, already unlocked") + lock.release() + class ExecutionStats: def __init__(self): @@ -1461,7 +1483,7 @@ class StateTransitionTask(Task): _baseTableNumber = None - _endState = None + _endState = None # TODO: no longter used? @classmethod def getInfo(cls): # each sub class should supply their own information @@ -1486,7 +1508,7 @@ class StateTransitionTask(Task): @classmethod def getRegTableName(cls, i): - if ( StateTransitionTask._baseTableNumber is None): + if ( StateTransitionTask._baseTableNumber is None): # Set it one time StateTransitionTask._baseTableNumber = Dice.throw( 999) if gConfig.dynamic_db_table_names else 0 return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) @@ -1583,14 +1605,23 @@ class TdSuperTable: def hasRegTables(self, dbc: DbConn, dbName: str): return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0 - def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str): + def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str): sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already return - sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format( - dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) - ) - dbc.execute(sql) + + # acquire a lock first, so as to be able to *verify*. More details in TD-1471 + fullTableName = dbName + '.' + regTableName + task.lockTable(fullTableName) + Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table + print("(" + fullTableName[-3:] + ")", end="", flush=True) + try: + sql = "CREATE TABLE {} USING {}.{} tags ({})".format( + fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) + ) + dbc.execute(sql) + finally: + task.unlockTable(fullTableName) # no matter what def _getTagStrForSql(self, dbc, dbName: str) : tags = self._getTags(dbc, dbName) @@ -1862,7 +1893,12 @@ class TaskAddData(StateTransitionTask): sTable = db.getFixedSuperTable() regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) - sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists + + + fullTableName = db.getName() + '.' + regTableName + # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" + sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists + # self._unlockTable(fullTableName) for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table nextInt = db.getNextInt() @@ -1872,27 +1908,29 @@ class TaskAddData(StateTransitionTask): self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady) - sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {}) - db.getName(), - regTableName, - # ds.getFixedSuperTableName(), - # ds.getNextBinary(), ds.getNextFloat(), - nextTick, nextInt) - dbc.execute(sql) - # Successfully wrote the data into the DB, let's record it - # somehow - te.recordDataMark(nextInt) - if gConfig.record_ops: - self.fAddLogDone.write( - "Wrote {} to {}\n".format( - nextInt, regTableName)) - self.fAddLogDone.flush() - os.fsync(self.fAddLogDone) + + # TODO: too ugly trying to lock the table reliably, refactor... + fullTableName = db.getName() + '.' + regTableName + if gConfig.verify_data: + self.lockTable(fullTableName) + # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written + + try: + sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {}) + fullTableName, + # ds.getFixedSuperTableName(), + # ds.getNextBinary(), ds.getNextFloat(), + nextTick, nextInt) + dbc.execute(sql) + except: # Any exception at all + if gConfig.verify_data: + self.unlockTable(fullTableName) + raise # Now read it back and verify, we might encounter an error if table is dropped if gConfig.verify_data: # only if command line asks for it try: - readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'". + readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) if readBack != nextInt : raise taos.error.ProgrammingError( @@ -1905,8 +1943,23 @@ class TaskAddData(StateTransitionTask): "Failed to read back same data for tick: {}, wrote: {}, read: {}" .format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"), errno) - # Re-throw no matter what - raise + elif errno in [0x218, 0x362]: # table doesn't exist + # do nothing + dummy = 0 + else: + # Re-throw otherwise + raise + finally: + self.unlockTable(fullTableName) # Unlock the table no matter what + + # Successfully wrote the data into the DB, let's record it somehow + te.recordDataMark(nextInt) + + if gConfig.record_ops: + self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone) + self.activeTable.discard(i) # not raising an error, unlike remove diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 34a33c6af60d6fe419ba9b565265e6769a78ce14..4dc053dae3c1cf9ca047ff00f39a358137a9ba65 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -166,6 +166,8 @@ class Progress: SERVICE_RECONNECT_START = 4 SERVICE_RECONNECT_SUCCESS = 5 SERVICE_RECONNECT_FAILURE = 6 + CREATE_TABLE_ATTEMPT = 7 + tokens = { STEP_BOUNDARY: '.', BEGIN_THREAD_STEP: '[', @@ -174,6 +176,7 @@ class Progress: SERVICE_RECONNECT_START: '', SERVICE_RECONNECT_FAILURE: '.xr>', + CREATE_TABLE_ATTEMPT: '_c', } @classmethod