提交 cd76a295 编写于 作者: S Steven Li

Adjust table locking in crash_gen to expose same-connection consistency issues, supporting TD-4444

上级 7f3fab49
...@@ -2,6 +2,10 @@ ...@@ -2,6 +2,10 @@
from .connection import TDengineConnection from .connection import TDengineConnection
from .cursor import TDengineCursor from .cursor import TDengineCursor
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import ProgrammingError
# Globals # Globals
threadsafety = 0 threadsafety = 0
paramstyle = 'pyformat' paramstyle = 'pyformat'
......
...@@ -37,6 +37,7 @@ import requests ...@@ -37,6 +37,7 @@ import requests
import gc import gc
import taos import taos
from .shared.types import TdColumns, TdTags from .shared.types import TdColumns, TdTags
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
...@@ -160,6 +161,7 @@ class WorkerThread: ...@@ -160,6 +161,7 @@ class WorkerThread:
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try: try:
if (Config.getConfig().per_thread_db_connection): # most likely TRUE if (Config.getConfig().per_thread_db_connection): # most likely TRUE
...@@ -1362,9 +1364,12 @@ class Task(): ...@@ -1362,9 +1364,12 @@ class Task():
Progress.emit(Progress.ACCEPTABLE_ERROR) Progress.emit(Progress.ACCEPTABLE_ERROR)
self._err = err self._err = err
else: # not an acceptable error else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( shortTid = threading.get_ident() % 10000
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, thread={}, msg: {}, SQL: {}".format(
self.__class__.__name__, self.__class__.__name__,
errno2, err, wt.getDbConn().getLastSql()) errno2,
shortTid,
err, wt.getDbConn().getLastSql())
self.logDebug(errMsg) self.logDebug(errMsg)
if Config.getConfig().debug: if Config.getConfig().debug:
# raise # so that we see full stack # raise # so that we see full stack
...@@ -1411,21 +1416,31 @@ class Task(): ...@@ -1411,21 +1416,31 @@ class Task():
def lockTable(self, ftName): # full table name def lockTable(self, ftName): # full table name
# print(" <<" + ftName + '_', end="", flush=True) # print(" <<" + ftName + '_', end="", flush=True)
with Task._lock: with Task._lock: # SHORT lock! so we only protect lock creation
if not ftName in Task._tableLocks: if not ftName in Task._tableLocks: # Create new lock and add to list, if needed
Task._tableLocks[ftName] = threading.Lock() Task._tableLocks[ftName] = threading.Lock()
Task._tableLocks[ftName].acquire() # No lock protection, anybody can do this any time
lock = Task._tableLocks[ftName]
# Logging.info("Acquiring lock: {}, {}".format(ftName, lock))
lock.acquire()
# Logging.info("Acquiring lock successful: {}".format(lock))
def unlockTable(self, ftName): def unlockTable(self, ftName):
# print('_' + ftName + ">> ", end="", flush=True) # print('_' + ftName + ">> ", end="", flush=True)
with Task._lock: with Task._lock:
if not ftName in self._tableLocks: if not ftName in self._tableLocks:
raise RuntimeError("Corrupt state, no such lock") raise RuntimeError("Corrupt state, no such lock")
lock = Task._tableLocks[ftName] lock = Task._tableLocks[ftName]
if not lock.locked(): if not lock.locked():
raise RuntimeError("Corrupte state, already unlocked") raise RuntimeError("Corrupte state, already unlocked")
lock.release()
# Important note, we want to protect unlocking under the task level
# locking, because we don't want the lock to be deleted (maybe in the futur)
# while we unlock it
# Logging.info("Releasing lock: {}".format(lock))
lock.release()
# Logging.info("Releasing lock successful: {}".format(lock))
class ExecutionStats: class ExecutionStats:
...@@ -1696,6 +1711,11 @@ class TdSuperTable: ...@@ -1696,6 +1711,11 @@ class TdSuperTable:
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
'''
Make sure a regular table exists for this super table, creating it if necessary.
If there is an associated "Task" that wants to do this, "lock" this table so that
others don't access it while we create it.
'''
dbName = self._dbName dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already if dbc.query(sql) >= 1 : # reg table exists already
...@@ -1703,18 +1723,24 @@ class TdSuperTable: ...@@ -1703,18 +1723,24 @@ class TdSuperTable:
# acquire a lock first, so as to be able to *verify*. More details in TD-1471 # acquire a lock first, so as to be able to *verify*. More details in TD-1471
fullTableName = dbName + '.' + regTableName fullTableName = dbName + '.' + regTableName
if task is not None: # TODO: what happens if we don't lock the table if task is not None: # Somethime thie operation is requested on behalf of a "task"
task.lockTable(fullTableName) # Logging.info("Locking table for creation: {}".format(fullTableName))
task.lockTable(fullTableName) # in which case we'll lock this table to ensure serialized access
# Logging.info("Table locked for creation".format(fullTableName))
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
# print("(" + fullTableName[-3:] + ")", end="", flush=True) # print("(" + fullTableName[-3:] + ")", end="", flush=True)
try: try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format( sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc) fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
) )
# Logging.info("Creating regular with SQL: {}".format(sql))
dbc.execute(sql) dbc.execute(sql)
# Logging.info("Regular table created: {}".format(sql))
finally: finally:
if task is not None: if task is not None:
# Logging.info("Unlocking table after creation: {}".format(fullTableName))
task.unlockTable(fullTableName) # no matter what task.unlockTable(fullTableName) # no matter what
# Logging.info("Table unlocked after creation: {}".format(fullTableName))
def _getTagStrForSql(self, dbc) : def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc) tags = self._getTags(dbc)
...@@ -2011,9 +2037,30 @@ class TaskAddData(StateTransitionTask): ...@@ -2011,9 +2037,30 @@ class TaskAddData(StateTransitionTask):
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canAddData() return state.canAddData()
def _lockTableIfNeeded(self, fullTableName, extraMsg = ''):
if Config.getConfig().verify_data:
# Logging.info("Locking table: {}".format(fullTableName))
self.lockTable(fullTableName)
# Logging.info("Table locked {}: {}".format(extraMsg, fullTableName))
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
else:
# Logging.info("Skipping locking table")
pass
def _unlockTableIfNeeded(self, fullTableName):
if Config.getConfig().verify_data:
# Logging.info("Unlocking table: {}".format(fullTableName))
self.unlockTable(fullTableName)
# Logging.info("Table unlocked: {}".format(fullTableName))
else:
pass
# Logging.info("Skipping unlocking table")
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName, 'batch')
sql = "INSERT INTO {} VALUES ".format(fullTableName) sql = "INSERT INTO {} VALUES ".format(fullTableName)
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
...@@ -2021,51 +2068,60 @@ class TaskAddData(StateTransitionTask): ...@@ -2021,51 +2068,60 @@ class TaskAddData(StateTransitionTask):
nextTick = db.getNextTick() nextTick = db.getNextTick()
nextColor = db.getNextColor() nextColor = db.getNextColor()
sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor) sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
dbc.execute(sql)
# Logging.info("Adding data in batch: {}".format(sql))
try:
dbc.execute(sql)
finally:
# Logging.info("Data added in batch: {}".format(sql))
self._unlockTableIfNeeded(fullTableName)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() intToWrite = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
nextColor = db.getNextColor() nextColor = db.getNextColor()
if Config.getConfig().record_ops: if Config.getConfig().record_ops:
self.prepToRecordOps() self.prepToRecordOps()
if self.fAddLogReady is None: if self.fAddLogReady is None:
raise CrashGenError("Unexpected empty fAddLogReady") raise CrashGenError("Unexpected empty fAddLogReady")
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(intToWrite, regTableName))
self.fAddLogReady.flush() self.fAddLogReady.flush()
os.fsync(self.fAddLogReady.fileno()) os.fsync(self.fAddLogReady.fileno())
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
if Config.getConfig().verify_data: self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try: try:
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName, fullTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt, nextColor) nextTick, intToWrite, nextColor)
# Logging.info("Adding data: {}".format(sql))
dbc.execute(sql) dbc.execute(sql)
# Logging.info("Data added: {}".format(sql))
intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt() intToUpdate = db.getNextInt() # Updated, but should not succeed
nextColor = db.getNextColor() nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
fullTableName, fullTableName,
nextTick, nextInt, nextColor) nextTick, intToUpdate, nextColor)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick) # fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc.execute(sql) dbc.execute(sql)
intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this.
except: # Any exception at all except: # Any exception at all
if Config.getConfig().verify_data: self._unlockTableIfNeeded(fullTableName)
self.unlockTable(fullTableName)
raise raise
# Now read it back and verify, we might encounter an error if table is dropped # Now read it back and verify, we might encounter an error if table is dropped
...@@ -2073,33 +2129,41 @@ class TaskAddData(StateTransitionTask): ...@@ -2073,33 +2129,41 @@ class TaskAddData(StateTransitionTask):
try: try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick)) format(db.getName(), regTableName, nextTick))
if readBack != nextInt : if readBack != intWrote :
raise taos.error.ProgrammingError( raise taos.error.ProgrammingError(
"Failed to read back same data, wrote: {}, read: {}" "Failed to read back same data, wrote: {}, read: {}"
.format(nextInt, readBack), 0x999) .format(intWrote, readBack), 0x999)
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
raise taos.error.ProgrammingError( raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}" "Failed to read back same data for tick: {}, wrote: {}, read: EMPTY"
.format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"), .format(nextTick, intWrote),
errno)
elif errno == CrashGenError.INVALID_MULTIPLE_RESULT : # multiple results
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: MULTIPLE RESULTS"
.format(nextTick, intWrote),
errno) errno)
elif errno in [0x218, 0x362]: # table doesn't exist elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing # do nothing
dummy = 0 pass
else: else:
# Re-throw otherwise # Re-throw otherwise
raise raise
finally: finally:
self.unlockTable(fullTableName) # Unlock the table no matter what self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock
# Done with read-back verification, unlock the table now
else:
self._unlockTableIfNeeded(fullTableName)
# Successfully wrote the data into the DB, let's record it somehow # Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt) te.recordDataMark(intWrote)
if Config.getConfig().record_ops: if Config.getConfig().record_ops:
if self.fAddLogDone is None: if self.fAddLogDone is None:
raise CrashGenError("Unexpected empty fAddLogDone") raise CrashGenError("Unexpected empty fAddLogDone")
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName))
self.fAddLogDone.flush() self.fAddLogDone.flush()
os.fsync(self.fAddLogDone.fileno()) os.fsync(self.fAddLogDone.fileno())
...@@ -2137,15 +2201,16 @@ class TaskAddData(StateTransitionTask): ...@@ -2137,15 +2201,16 @@ class TaskAddData(StateTransitionTask):
class ThreadStacks: # stack info for all threads class ThreadStacks: # stack info for all threads
def __init__(self): def __init__(self):
self._allStacks = {} self._allStacks = {}
allFrames = sys._current_frames() allFrames = sys._current_frames() # All current stack frames
for th in threading.enumerate(): for th in threading.enumerate(): # For each thread
if th.ident is None: if th.ident is None:
continue continue
stack = traceback.extract_stack(allFrames[th.ident]) stack = traceback.extract_stack(allFrames[th.ident]) # Get stack for a thread
self._allStacks[th.native_id] = stack shortTid = th.ident % 10000
self._allStacks[shortTid] = stack # Was using th.native_id
def print(self, filteredEndName = None, filterInternal = False): def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom for tIdent, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1] lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match if lastFrame.name == filteredEndName : # end did not match
...@@ -2157,7 +2222,7 @@ class ThreadStacks: # stack info for all threads ...@@ -2157,7 +2222,7 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack '__init__']: # the thread that extracted the stack
continue # ignore continue # ignore
# Now print # Now print
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid)) print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(tIdent))
stackFrame = 0 stackFrame = 0
for frame in stack: # was using: reversed(stack) for frame in stack: # was using: reversed(stack)
# print(frame) # print(frame)
...@@ -2376,7 +2441,7 @@ class MainExec: ...@@ -2376,7 +2441,7 @@ class MainExec:
action='store', action='store',
default=0, default=0,
type=int, type=int,
help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)') help='Number of DBs to use, set to disable dropping DB. (default: 0)')
parser.add_argument( parser.add_argument(
'-c', '-c',
'--connector-type', '--connector-type',
......
...@@ -179,7 +179,7 @@ quorum 2 ...@@ -179,7 +179,7 @@ quorum 2
def getServiceCmdLine(self): # to start the instance def getServiceCmdLine(self): # to start the instance
if Config.getConfig().track_memory_leaks: if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...") Logging.info("Invoking VALGRIND on service...")
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] return ['exec valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
else: else:
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
...@@ -310,7 +310,7 @@ class TdeSubProcess: ...@@ -310,7 +310,7 @@ class TdeSubProcess:
# print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine with env: ", myEnv.items())
print("Starting TDengine: {}".format(cmdLine)) print("Starting TDengine: {}".format(cmdLine))
return Popen( ret = Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars shell=True, # Always use shell, since we need to pass ENV vars
stdout=PIPE, stdout=PIPE,
...@@ -318,6 +318,10 @@ class TdeSubProcess: ...@@ -318,6 +318,10 @@ class TdeSubProcess:
close_fds=ON_POSIX, close_fds=ON_POSIX,
env=myEnv env=myEnv
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
time.sleep(0.01) # very brief wait, then let's check if sub process started successfully.
if ret.poll():
raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine))
return ret
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
...@@ -614,7 +618,7 @@ class ServiceManager: ...@@ -614,7 +618,7 @@ class ServiceManager:
# Find if there's already a taosd service, and then kill it # Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter(): for proc in psutil.process_iter():
if proc.name() == 'taosd': if proc.name() == 'taosd' or proc.name() == 'memcheck-amd64-': # Regular or under Valgrind
Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
time.sleep(2.0) time.sleep(2.0)
proc.kill() proc.kill()
......
...@@ -35,7 +35,8 @@ class LoggingFilter(logging.Filter): ...@@ -35,7 +35,8 @@ class LoggingFilter(logging.Filter):
class MyLoggingAdapter(logging.LoggerAdapter): class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs): def process(self, msg, kwargs):
return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs shortTid = threading.get_ident() % 10000
return "[{:04d}] {}".format(shortTid, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册