From cd76a29533d5575990de44a11c8dfe0da48501de Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 31 May 2021 06:22:20 +0000 Subject: [PATCH] Adjust table locking in crash_gen to expose same-connection consistency issues, supporting TD-4444 --- src/connector/python/taos/__init__.py | 4 + tests/pytest/crash_gen/crash_gen_main.py | 139 ++++++++++++++++------ tests/pytest/crash_gen/service_manager.py | 10 +- tests/pytest/crash_gen/shared/misc.py | 3 +- 4 files changed, 115 insertions(+), 41 deletions(-) diff --git a/src/connector/python/taos/__init__.py b/src/connector/python/taos/__init__.py index 9732635738..52c6db311e 100644 --- a/src/connector/python/taos/__init__.py +++ b/src/connector/python/taos/__init__.py @@ -2,6 +2,10 @@ from .connection import TDengineConnection from .cursor import TDengineCursor +# For some reason, the following is needed for VS Code (through PyLance) to +# recognize that "error" is a valid module of the "taos" package. +from .error import ProgrammingError + # Globals threadsafety = 0 paramstyle = 'pyformat' diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 644aa79916..b743eee2ef 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -37,6 +37,7 @@ import requests import gc import taos + from .shared.types import TdColumns, TdTags # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess @@ -160,6 +161,7 @@ class WorkerThread: Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: if (Config.getConfig().per_thread_db_connection): # most likely TRUE @@ -1362,9 +1364,12 @@ class Task(): Progress.emit(Progress.ACCEPTABLE_ERROR) self._err = err else: # not an acceptable error - errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( + shortTid = threading.get_ident() % 10000 + errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, thread={}, msg: {}, SQL: {}".format( self.__class__.__name__, - errno2, err, wt.getDbConn().getLastSql()) + errno2, + shortTid, + err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) if Config.getConfig().debug: # raise # so that we see full stack @@ -1411,21 +1416,31 @@ class Task(): def lockTable(self, ftName): # full table name # print(" <<" + ftName + '_', end="", flush=True) - with Task._lock: - if not ftName in Task._tableLocks: + with Task._lock: # SHORT lock! so we only protect lock creation + if not ftName in Task._tableLocks: # Create new lock and add to list, if needed Task._tableLocks[ftName] = threading.Lock() - Task._tableLocks[ftName].acquire() + # No lock protection, anybody can do this any time + lock = Task._tableLocks[ftName] + # Logging.info("Acquiring lock: {}, {}".format(ftName, lock)) + lock.acquire() + # Logging.info("Acquiring lock successful: {}".format(lock)) def unlockTable(self, ftName): # print('_' + ftName + ">> ", end="", flush=True) - with Task._lock: + with Task._lock: if not ftName in self._tableLocks: raise RuntimeError("Corrupt state, no such lock") lock = Task._tableLocks[ftName] if not lock.locked(): raise RuntimeError("Corrupte state, already unlocked") - lock.release() + + # Important note, we want to protect unlocking under the task level + # locking, because we don't want the lock to be deleted (maybe in the futur) + # while we unlock it + # Logging.info("Releasing lock: {}".format(lock)) + lock.release() + # Logging.info("Releasing lock successful: {}".format(lock)) class ExecutionStats: @@ -1696,6 +1711,11 @@ class TdSuperTable: return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): + ''' + Make sure a regular table exists for this super table, creating it if necessary. + If there is an associated "Task" that wants to do this, "lock" this table so that + others don't access it while we create it. + ''' dbName = self._dbName sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already @@ -1703,18 +1723,24 @@ class TdSuperTable: # acquire a lock first, so as to be able to *verify*. More details in TD-1471 fullTableName = dbName + '.' + regTableName - if task is not None: # TODO: what happens if we don't lock the table - task.lockTable(fullTableName) + if task is not None: # Somethime thie operation is requested on behalf of a "task" + # Logging.info("Locking table for creation: {}".format(fullTableName)) + task.lockTable(fullTableName) # in which case we'll lock this table to ensure serialized access + # Logging.info("Table locked for creation".format(fullTableName)) Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table # print("(" + fullTableName[-3:] + ")", end="", flush=True) try: sql = "CREATE TABLE {} USING {}.{} tags ({})".format( fullTableName, dbName, self._stName, self._getTagStrForSql(dbc) ) + # Logging.info("Creating regular with SQL: {}".format(sql)) dbc.execute(sql) + # Logging.info("Regular table created: {}".format(sql)) finally: if task is not None: + # Logging.info("Unlocking table after creation: {}".format(fullTableName)) task.unlockTable(fullTableName) # no matter what + # Logging.info("Table unlocked after creation: {}".format(fullTableName)) def _getTagStrForSql(self, dbc) : tags = self._getTags(dbc) @@ -2011,9 +2037,30 @@ class TaskAddData(StateTransitionTask): def canBeginFrom(cls, state: AnyState): return state.canAddData() + def _lockTableIfNeeded(self, fullTableName, extraMsg = ''): + if Config.getConfig().verify_data: + # Logging.info("Locking table: {}".format(fullTableName)) + self.lockTable(fullTableName) + # Logging.info("Table locked {}: {}".format(extraMsg, fullTableName)) + # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written + else: + # Logging.info("Skipping locking table") + pass + + def _unlockTableIfNeeded(self, fullTableName): + if Config.getConfig().verify_data: + # Logging.info("Unlocking table: {}".format(fullTableName)) + self.unlockTable(fullTableName) + # Logging.info("Table unlocked: {}".format(fullTableName)) + else: + pass + # Logging.info("Skipping unlocking table") + def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + fullTableName = db.getName() + '.' + regTableName + self._lockTableIfNeeded(fullTableName, 'batch') sql = "INSERT INTO {} VALUES ".format(fullTableName) for j in range(numRecords): # number of records per table @@ -2021,51 +2068,60 @@ class TaskAddData(StateTransitionTask): nextTick = db.getNextTick() nextColor = db.getNextColor() sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor) - dbc.execute(sql) + + # Logging.info("Adding data in batch: {}".format(sql)) + try: + dbc.execute(sql) + finally: + # Logging.info("Data added in batch: {}".format(sql)) + self._unlockTableIfNeeded(fullTableName) + + def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS for j in range(numRecords): # number of records per table - nextInt = db.getNextInt() + intToWrite = db.getNextInt() nextTick = db.getNextTick() nextColor = db.getNextColor() if Config.getConfig().record_ops: self.prepToRecordOps() if self.fAddLogReady is None: raise CrashGenError("Unexpected empty fAddLogReady") - self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.write("Ready to write {} to {}\n".format(intToWrite, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady.fileno()) # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName - if Config.getConfig().verify_data: - self.lockTable(fullTableName) - # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written - + self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock + try: sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) fullTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), - nextTick, nextInt, nextColor) + nextTick, intToWrite, nextColor) + # Logging.info("Adding data: {}".format(sql)) dbc.execute(sql) + # Logging.info("Data added: {}".format(sql)) + intWrote = intToWrite # Quick hack, attach an update statement here. TODO: create an "update" task if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB - nextInt = db.getNextInt() + intToUpdate = db.getNextInt() # Updated, but should not succeed nextColor = db.getNextColor() sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here fullTableName, - nextTick, nextInt, nextColor) + nextTick, intToUpdate, nextColor) # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( # fullTableName, db.getNextInt(), db.getNextColor(), nextTick) dbc.execute(sql) + intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this. except: # Any exception at all - if Config.getConfig().verify_data: - self.unlockTable(fullTableName) + self._unlockTableIfNeeded(fullTableName) raise # Now read it back and verify, we might encounter an error if table is dropped @@ -2073,33 +2129,41 @@ class TaskAddData(StateTransitionTask): try: readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) - if readBack != nextInt : + if readBack != intWrote : raise taos.error.ProgrammingError( "Failed to read back same data, wrote: {}, read: {}" - .format(nextInt, readBack), 0x999) + .format(intWrote, readBack), 0x999) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result + if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result raise taos.error.ProgrammingError( - "Failed to read back same data for tick: {}, wrote: {}, read: {}" - .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"), + "Failed to read back same data for tick: {}, wrote: {}, read: EMPTY" + .format(nextTick, intWrote), + errno) + elif errno == CrashGenError.INVALID_MULTIPLE_RESULT : # multiple results + raise taos.error.ProgrammingError( + "Failed to read back same data for tick: {}, wrote: {}, read: MULTIPLE RESULTS" + .format(nextTick, intWrote), errno) elif errno in [0x218, 0x362]: # table doesn't exist # do nothing - dummy = 0 + pass else: # Re-throw otherwise raise finally: - self.unlockTable(fullTableName) # Unlock the table no matter what + self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock + # Done with read-back verification, unlock the table now + else: + self._unlockTableIfNeeded(fullTableName) # Successfully wrote the data into the DB, let's record it somehow - te.recordDataMark(nextInt) + te.recordDataMark(intWrote) if Config.getConfig().record_ops: if self.fAddLogDone is None: raise CrashGenError("Unexpected empty fAddLogDone") - self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName)) self.fAddLogDone.flush() os.fsync(self.fAddLogDone.fileno()) @@ -2137,15 +2201,16 @@ class TaskAddData(StateTransitionTask): class ThreadStacks: # stack info for all threads def __init__(self): self._allStacks = {} - allFrames = sys._current_frames() - for th in threading.enumerate(): + allFrames = sys._current_frames() # All current stack frames + for th in threading.enumerate(): # For each thread if th.ident is None: continue - stack = traceback.extract_stack(allFrames[th.ident]) - self._allStacks[th.native_id] = stack + stack = traceback.extract_stack(allFrames[th.ident]) # Get stack for a thread + shortTid = th.ident % 10000 + self._allStacks[shortTid] = stack # Was using th.native_id def print(self, filteredEndName = None, filterInternal = False): - for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom + for tIdent, stack in self._allStacks.items(): # for each thread, stack frames top to bottom lastFrame = stack[-1] if filteredEndName: # we need to filter out stacks that match this name if lastFrame.name == filteredEndName : # end did not match @@ -2157,7 +2222,7 @@ class ThreadStacks: # stack info for all threads '__init__']: # the thread that extracted the stack continue # ignore # Now print - print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid)) + print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(tIdent)) stackFrame = 0 for frame in stack: # was using: reversed(stack) # print(frame) @@ -2376,7 +2441,7 @@ class MainExec: action='store', default=0, type=int, - help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)') + help='Number of DBs to use, set to disable dropping DB. (default: 0)') parser.add_argument( '-c', '--connector-type', diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 1cd65c1dde..c6685ec469 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -179,7 +179,7 @@ quorum 2 def getServiceCmdLine(self): # to start the instance if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") - return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] + return ['exec valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] else: # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() @@ -310,7 +310,7 @@ class TdeSubProcess: # print("Starting TDengine with env: ", myEnv.items()) print("Starting TDengine: {}".format(cmdLine)) - return Popen( + ret = Popen( ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, shell=True, # Always use shell, since we need to pass ENV vars stdout=PIPE, @@ -318,6 +318,10 @@ class TdeSubProcess: close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF + time.sleep(0.01) # very brief wait, then let's check if sub process started successfully. + if ret.poll(): + raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine)) + return ret STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm @@ -614,7 +618,7 @@ class ServiceManager: # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): - if proc.name() == 'taosd': + if proc.name() == 'taosd' or proc.name() == 'memcheck-amd64-': # Regular or under Valgrind Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") time.sleep(2.0) proc.kill() diff --git a/tests/pytest/crash_gen/shared/misc.py b/tests/pytest/crash_gen/shared/misc.py index 90ad802ff1..78923bcc29 100644 --- a/tests/pytest/crash_gen/shared/misc.py +++ b/tests/pytest/crash_gen/shared/misc.py @@ -35,7 +35,8 @@ class LoggingFilter(logging.Filter): class MyLoggingAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): - return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs + shortTid = threading.get_ident() % 10000 + return "[{:04d}] {}".format(shortTid, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs -- GitLab