diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index f323611e4d2814421b7858cea32169c813246cd3..d481b135473f0e7ffb687985427dc0e7b41068e6 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -658,7 +658,10 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") nRows = self.query(sql) if nRows != 1: - raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + raise taos.error.ProgrammingError( + "Unexpected result for query: {}, rows = {}".format(sql, nRows), + (0x991 if nRows==0 else 0x992) + ) if self.getResultRows() != 1 or self.getResultCols() != 1: raise RuntimeError("Unexpected result set for query: {}".format(sql)) return self.getQueryResult()[0][0] @@ -815,6 +818,7 @@ class MyTDSql: def _execInternal(self, sql): startTime = time.time() ret = self._cursor.execute(sql) + print("\nSQL success: {}".format(sql)) queryTime = time.time() - startTime # Record the query time cls = self.__class__ @@ -1342,13 +1346,17 @@ class Database: For now we use it to manage state transitions in that database ''' + _clsLock = threading.Lock() # class wide lock + _lastInt = 101 # next one is initial integer + _lastTick = 0 + _lastLaggingTick = 0 # lagging tick, for unsequenced insersions + def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc self._dbNum = dbNum # we assign a number to databases, for our testing purpose self._stateMachine = StateMechine(self) self._stateMachine.init(dbc) - self._lastTick = self.setupLastTick() - self._lastInt = 0 # next one is initial integer + self._lock = threading.RLock() def getStateMachine(self) -> StateMechine: @@ -1387,7 +1395,8 @@ class Database: # by a factor of 500. # TODO: what if it goes beyond 10 years into the future # TODO: fix the error as result of above: "tsdb timestamp is out of range" - def setupLastTick(self): + @classmethod + def setupLastTick(cls): t1 = datetime.datetime(2020, 6, 1) t2 = datetime.datetime.now() # maybe a very large number, takes 69 years to exceed Python int range @@ -1401,14 +1410,22 @@ class Database: logger.info("Setting up TICKS to start from: {}".format(t4)) return t4 - def getNextTick(self): - with self._lock: # prevent duplicate tick - if Dice.throw(20) == 0: # 1 in 20 chance - return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds + @classmethod + def getNextTick(cls): + with cls._clsLock: # prevent duplicate tick + if cls._lastLaggingTick==0: + # 10k at 1/20 chance, should be enough to avoid overlaps + cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000) + if cls._lastTick==0: # should be quite a bit into the future + cls._lastTick = cls.setupLastTick() + + if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick + cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds + return cls._lastLaggingTick else: # regular # add one second to it - self._lastTick += datetime.timedelta(0, 1) - return self._lastTick + cls._lastTick += datetime.timedelta(0, 1) + return cls._lastTick def getNextInt(self): with self._lock: @@ -2177,6 +2194,7 @@ class TaskAddData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access db = self._db + dbc = wt.getDbConn() tblSeq = list(range( self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) @@ -2192,6 +2210,7 @@ class TaskAddData(StateTransitionTask): for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table nextInt = db.getNextInt() + nextTick = db.getNextTick() if gConfig.record_ops: self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) @@ -2202,8 +2221,8 @@ class TaskAddData(StateTransitionTask): regTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), - db.getNextTick(), nextInt) - self.execWtSql(wt, sql) + nextTick, nextInt) + dbc.execute(sql) # Successfully wrote the data into the DB, let's record it # somehow te.recordDataMark(nextInt) @@ -2213,6 +2232,26 @@ class TaskAddData(StateTransitionTask): nextInt, regTableName)) self.fAddLogDone.flush() os.fsync(self.fAddLogDone) + + # Now read it back and verify, we might encounter an error if table is dropped + try: + readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'". + format(db.getName(), regTableName, nextTick)) + if readBack != nextInt : + raise taos.error.ProgrammingError( + "Failed to read back same data, wrote: {}, read: {}" + .format(nextInt, readBack), 0x999) + except taos.error.ProgrammingError as err: + errno = Helper.convertErrno(err.errno) + if errno in [0x991, 0x992] : # not a single result + raise taos.error.ProgrammingError( + "Failed to read back same data for tick: {}, wrote: {}, read: {}" + .format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"), + errno) + # Re-throw no matter what + raise + + self.activeTable.discard(i) # not raising an error, unlike remove