未验证 提交 968285e4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4129 from taosdata/feature/crash_gen

Refactoring crash_gen tool, helping to reproduce TD-1955
...@@ -38,9 +38,9 @@ import resource ...@@ -38,9 +38,9 @@ import resource
from guppy import hpy from guppy import hpy
import gc import gc
from .service_manager import ServiceManager, TdeInstance from crash_gen.service_manager import ServiceManager, TdeInstance
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from .db import DbConn, MyTDSql, DbConnNative, DbManager from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
import taos import taos
import requests import requests
...@@ -243,7 +243,7 @@ class WorkerThread: ...@@ -243,7 +243,7 @@ class WorkerThread:
class ThreadCoordinator: class ThreadCoordinator:
WORKER_THREAD_TIMEOUT = 180 # one minute WORKER_THREAD_TIMEOUT = 120 # Normal: 120
def __init__(self, pool: ThreadPool, dbManager: DbManager): def __init__(self, pool: ThreadPool, dbManager: DbManager):
self._curStep = -1 # first step is 0 self._curStep = -1 # first step is 0
...@@ -388,9 +388,9 @@ class ThreadCoordinator: ...@@ -388,9 +388,9 @@ class ThreadCoordinator:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP) Progress.emit(Progress.END_THREAD_STEP)
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
Logging.info("Main loop aborted, caused by worker thread time-out") Logging.info("Main loop aborted, caused by worker thread(s) time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, important thread info:") print("\n\nWorker Thread time-out detected, TAOS related threads are:")
ts = ThreadStacks() ts = ThreadStacks()
ts.print(filterInternal=True) ts.print(filterInternal=True)
workerTimeout = True workerTimeout = True
...@@ -435,7 +435,7 @@ class ThreadCoordinator: ...@@ -435,7 +435,7 @@ class ThreadCoordinator:
Logging.debug("\r\n\n--> Main thread ready to finish up...") Logging.debug("\r\n\n--> Main thread ready to finish up...")
Logging.debug("Main thread joining all threads") Logging.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish self._pool.joinAll() # Get all threads to finish
Logging.info("\nAll worker threads finished") Logging.info(". . . All worker threads finished") # No CR/LF before
self._execStats.endExec() self._execStats.endExec()
def cleanup(self): # free resources def cleanup(self): # free resources
...@@ -1072,17 +1072,18 @@ class Database: ...@@ -1072,17 +1072,18 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
Logging.info("Setting up TICKS to start from: {}".format(t4)) Logging.debug("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
@classmethod @classmethod
def getNextTick(cls): def getNextTick(cls):
with cls._clsLock: # prevent duplicate tick with cls._clsLock: # prevent duplicate tick
if cls._lastLaggingTick==0: if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps # 10k at 1/20 chance, should be enough to avoid overlaps
cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000) tick = cls.setupLastTick()
if cls._lastTick==0: # should be quite a bit into the future cls._lastTick = tick
cls._lastTick = cls.setupLastTick() cls._lastLaggingTick = tick + datetime.timedelta(0, -10000)
# if : # should be quite a bit into the future
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
...@@ -1177,6 +1178,8 @@ class Task(): ...@@ -1177,6 +1178,8 @@ class Task():
instead. But a task is always associated with a DB instead. But a task is always associated with a DB
''' '''
taskSn = 100 taskSn = 100
_lock = threading.Lock()
_tableLocks: Dict[str, threading.Lock] = {}
@classmethod @classmethod
def allocTaskNum(cls): def allocTaskNum(cls):
...@@ -1198,6 +1201,8 @@ class Task(): ...@@ -1198,6 +1201,8 @@ class Task():
self._execStats = execStats self._execStats = execStats
self._db = db # A task is always associated/for a specific DB self._db = db # A task is always associated/for a specific DB
def isSuccess(self): def isSuccess(self):
return self._err is None return self._err is None
...@@ -1237,6 +1242,7 @@ class Task(): ...@@ -1237,6 +1242,7 @@ class Task():
0x0B, # Unable to establish connection, more details in TD-1648 0x0B, # Unable to establish connection, more details in TD-1648
0x200, # invalid SQL, TODO: re-examine with TD-934 0x200, # invalid SQL, TODO: re-examine with TD-934
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776 0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
0x213, # "Disconnected from service", result of "kill connection ???"
0x217, # "db not selected", client side defined error code 0x217, # "db not selected", client side defined error code
# 0x218, # "Table does not exist" client side defined error code # 0x218, # "Table does not exist" client side defined error code
0x360, # Table already exists 0x360, # Table already exists
...@@ -1318,7 +1324,7 @@ class Task(): ...@@ -1318,7 +1324,7 @@ class Task():
self._err = err self._err = err
self._aborted = True self._aborted = True
except Exception as e: except Exception as e:
self.logInfo("Non-TAOS exception encountered") Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
self._err = e self._err = e
self._aborted = True self._aborted = True
traceback.print_exc() traceback.print_exc()
...@@ -1351,6 +1357,24 @@ class Task(): ...@@ -1351,6 +1357,24 @@ class Task():
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
return wt.getQueryResult() return wt.getQueryResult()
def lockTable(self, ftName): # full table name
# print(" <<" + ftName + '_', end="", flush=True)
with Task._lock:
if not ftName in Task._tableLocks:
Task._tableLocks[ftName] = threading.Lock()
Task._tableLocks[ftName].acquire()
def unlockTable(self, ftName):
# print('_' + ftName + ">> ", end="", flush=True)
with Task._lock:
if not ftName in self._tableLocks:
raise RuntimeError("Corrupt state, no such lock")
lock = Task._tableLocks[ftName]
if not lock.locked():
raise RuntimeError("Corrupte state, already unlocked")
lock.release()
class ExecutionStats: class ExecutionStats:
def __init__(self): def __init__(self):
...@@ -1461,7 +1485,7 @@ class StateTransitionTask(Task): ...@@ -1461,7 +1485,7 @@ class StateTransitionTask(Task):
_baseTableNumber = None _baseTableNumber = None
_endState = None _endState = None # TODO: no longter used?
@classmethod @classmethod
def getInfo(cls): # each sub class should supply their own information def getInfo(cls): # each sub class should supply their own information
...@@ -1486,7 +1510,7 @@ class StateTransitionTask(Task): ...@@ -1486,7 +1510,7 @@ class StateTransitionTask(Task):
@classmethod @classmethod
def getRegTableName(cls, i): def getRegTableName(cls, i):
if ( StateTransitionTask._baseTableNumber is None): if ( StateTransitionTask._baseTableNumber is None): # Set it one time
StateTransitionTask._baseTableNumber = Dice.throw( StateTransitionTask._baseTableNumber = Dice.throw(
999) if gConfig.dynamic_db_table_names else 0 999) if gConfig.dynamic_db_table_names else 0
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
...@@ -1544,8 +1568,11 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1544,8 +1568,11 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), self._db.getName(), sTable.create(wt.getDbConn(), self._db.getName(),
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'}) {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'},
dropIfExists = True
)
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that # No need to create the regular tables, INSERT will do that
# automatically # automatically
...@@ -1558,14 +1585,41 @@ class TdSuperTable: ...@@ -1558,14 +1585,41 @@ class TdSuperTable:
def getName(self): def getName(self):
return self._stName return self._stName
def drop(self, dbc, dbName, skipCheck = False):
if self.exists(dbc, dbName) : # if myself exists
fullTableName = dbName + '.' + self._stName
dbc.execute("DROP TABLE {}".format(fullTableName))
else:
if not skipCheck:
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
def exists(self, dbc, dbName):
dbc.execute("USE " + dbName)
return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static? # TODO: odd semantic, create() method is usually static?
def create(self, dbc, dbName, cols: dict, tags: dict): def create(self, dbc, dbName, cols: dict, tags: dict,
dropIfExists = False
):
'''Creating a super table''' '''Creating a super table'''
sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format( dbc.execute("USE " + dbName)
dbName, fullTableName = dbName + '.' + self._stName
self._stName, if dbc.existsSuperTable(self._stName):
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]), if dropIfExists:
",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) dbc.execute("DROP TABLE {}".format(fullTableName))
else: # error
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
# Now let's create
sql = "CREATE TABLE {} ({})".format(
fullTableName,
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
if tags is None :
sql += " TAGS (dummy int) "
else:
sql += " TAGS ({})".format(
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
) )
dbc.execute(sql) dbc.execute(sql)
...@@ -1583,14 +1637,25 @@ class TdSuperTable: ...@@ -1583,14 +1637,25 @@ class TdSuperTable:
def hasRegTables(self, dbc: DbConn, dbName: str): def hasRegTables(self, dbc: DbConn, dbName: str):
return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0 return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str): def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str):
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already if dbc.query(sql) >= 1 : # reg table exists already
return return
sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format(
dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName) # acquire a lock first, so as to be able to *verify*. More details in TD-1471
) fullTableName = dbName + '.' + regTableName
dbc.execute(sql) if task is not None: # optional lock
task.lockTable(fullTableName)
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
)
dbc.execute(sql)
finally:
if task is not None:
task.unlockTable(fullTableName) # no matter what
def _getTagStrForSql(self, dbc, dbName: str) : def _getTagStrForSql(self, dbc, dbName: str) :
tags = self._getTags(dbc, dbName) tags = self._getTags(dbc, dbName)
...@@ -1809,7 +1874,7 @@ class TaskRestartService(StateTransitionTask): ...@@ -1809,7 +1874,7 @@ class TaskRestartService(StateTransitionTask):
with self._classLock: with self._classLock:
if self._isRunning: if self._isRunning:
print("Skipping restart task, another running already") Logging.info("Skipping restart task, another running already")
return return
self._isRunning = True self._isRunning = True
...@@ -1847,13 +1912,88 @@ class TaskAddData(StateTransitionTask): ...@@ -1847,13 +1912,88 @@ class TaskAddData(StateTransitionTask):
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canAddData() return state.canAddData()
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName
sql = "insert into {} values ".format(fullTableName)
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
sql += "('{}', {});".format(nextTick, nextInt)
dbc.execute(sql)
def _addData(self, db, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
if gConfig.record_ops:
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush()
os.fsync(self.fAddLogReady)
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
if gConfig.verify_data:
self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try:
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
fullTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
dbc.execute(sql)
except: # Any exception at all
if gConfig.verify_data:
self.unlockTable(fullTableName)
raise
# Now read it back and verify, we might encounter an error if table is dropped
if gConfig.verify_data: # only if command line asks for it
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick))
if readBack != nextInt :
raise taos.error.ProgrammingError(
"Failed to read back same data, wrote: {}, read: {}"
.format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
errno)
elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing
dummy = 0
else:
# Re-throw otherwise
raise
finally:
self.unlockTable(fullTableName) # Unlock the table no matter what
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone)
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db db = self._db
dbc = wt.getDbConn() dbc = wt.getDbConn()
tblSeq = list(range( numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
random.shuffle(tblSeq) tblSeq = list(range(numTables ))
random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq: for i in tblSeq:
if (i in self.activeTable): # wow already active if (i in self.activeTable): # wow already active
print("x", end="", flush=True) # concurrent insertion print("x", end="", flush=True) # concurrent insertion
...@@ -1861,60 +2001,20 @@ class TaskAddData(StateTransitionTask): ...@@ -1861,60 +2001,20 @@ class TaskAddData(StateTransitionTask):
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
sTable = db.getFixedSuperTable() sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists fullTableName = db.getName() + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table if Dice.throw(1) == 0: # 1 in 2 chance
nextInt = db.getNextInt() self._addData(db, dbc, regTableName, te)
nextTick = db.getNextTick() else:
if gConfig.record_ops: self._addDataInBatch(db, dbc, regTableName, te)
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush()
os.fsync(self.fAddLogReady)
sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {})
db.getName(),
regTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
dbc.execute(sql)
# Successfully wrote the data into the DB, let's record it
# somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write(
"Wrote {} to {}\n".format(
nextInt, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone)
# Now read it back and verify, we might encounter an error if table is dropped
if gConfig.verify_data: # only if command line asks for it
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'".
format(db.getName(), regTableName, nextTick))
if readBack != nextInt :
raise taos.error.ProgrammingError(
"Failed to read back same data, wrote: {}, read: {}"
.format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
errno)
# Re-throw no matter what
raise
self.activeTable.discard(i) # not raising an error, unlike remove self.activeTable.discard(i) # not raising an error, unlike remove
class ThreadStacks: # stack info for all threads class ThreadStacks: # stack info for all threads
def __init__(self): def __init__(self):
self._allStacks = {} self._allStacks = {}
...@@ -1936,17 +2036,18 @@ class ThreadStacks: # stack info for all threads ...@@ -1936,17 +2036,18 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack '__init__']: # the thread that extracted the stack
continue # ignore continue # ignore
# Now print # Now print
print("\n<----- Thread Info for ID: {}".format(thNid)) print("\n<----- Thread Info for LWP/ID: {} (Execution stopped at Bottom Frame) <-----".format(thNid))
stackFrame = 0
for frame in stack: for frame in stack:
# print(frame) # print(frame)
print("File {filename}, line {lineno}, in {name}".format( print("[{sf}] File {filename}, line {lineno}, in {name}".format(
filename=frame.filename, lineno=frame.lineno, name=frame.name)) sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
print(" {}".format(frame.line)) print(" {}".format(frame.line))
print("-----> End of Thread Info\n") print("-----> End of Thread Info ----->\n")
class ClientManager: class ClientManager:
def __init__(self): def __init__(self):
print("Starting service manager") Logging.info("Starting service manager")
# signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGTERM, self.sigIntHandler)
# signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler)
...@@ -2048,7 +2149,7 @@ class ClientManager: ...@@ -2048,7 +2149,7 @@ class ClientManager:
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager) self.tc = ThreadCoordinator(thPool, dbManager)
print("Starting client instance to: {}".format(tInst)) Logging.info("Starting client instance: {}".format(tInst))
self.tc.run() self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats())) # print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed())) # print("TC failed = {}".format(self.tc.isFailed()))
......
...@@ -95,6 +95,11 @@ class DbConn: ...@@ -95,6 +95,11 @@ class DbConn:
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName))) # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here return dbName in dbs # TODO: super weird type mangling seen, once here
def existsSuperTable(self, stName):
self.query("show stables")
sts = [v[0] for v in self.getQueryResult()]
return stName in sts
def hasTables(self): def hasTables(self):
return self.query("show tables") > 0 return self.query("show tables") > 0
...@@ -240,6 +245,7 @@ class MyTDSql: ...@@ -240,6 +245,7 @@ class MyTDSql:
def _execInternal(self, sql): def _execInternal(self, sql):
startTime = time.time() startTime = time.time()
# Logging.debug("Executing SQL: " + sql)
ret = self._cursor.execute(sql) ret = self._cursor.execute(sql)
# print("\nSQL success: {}".format(sql)) # print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime queryTime = time.time() - startTime
......
...@@ -27,7 +27,7 @@ class LoggingFilter(logging.Filter): ...@@ -27,7 +27,7 @@ class LoggingFilter(logging.Filter):
class MyLoggingAdapter(logging.LoggerAdapter): class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs): def process(self, msg, kwargs):
return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
...@@ -51,7 +51,7 @@ class Logging: ...@@ -51,7 +51,7 @@ class Logging:
_logger.addHandler(ch) _logger.addHandler(ch)
# Logging adapter, to be used as a logger # Logging adapter, to be used as a logger
print("setting logger variable") # print("setting logger variable")
# global logger # global logger
cls.logger = MyLoggingAdapter(_logger, []) cls.logger = MyLoggingAdapter(_logger, [])
...@@ -166,6 +166,9 @@ class Progress: ...@@ -166,6 +166,9 @@ class Progress:
SERVICE_RECONNECT_START = 4 SERVICE_RECONNECT_START = 4
SERVICE_RECONNECT_SUCCESS = 5 SERVICE_RECONNECT_SUCCESS = 5
SERVICE_RECONNECT_FAILURE = 6 SERVICE_RECONNECT_FAILURE = 6
SERVICE_START_NAP = 7
CREATE_TABLE_ATTEMPT = 8
tokens = { tokens = {
STEP_BOUNDARY: '.', STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[', BEGIN_THREAD_STEP: '[',
...@@ -174,6 +177,8 @@ class Progress: ...@@ -174,6 +177,8 @@ class Progress:
SERVICE_RECONNECT_START: '<r.', SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>', SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>', SERVICE_RECONNECT_FAILURE: '.xr>',
SERVICE_START_NAP: '_zz',
CREATE_TABLE_ATTEMPT: '_c',
} }
@classmethod @classmethod
......
...@@ -47,6 +47,17 @@ class TdeInstance(): ...@@ -47,6 +47,17 @@ class TdeInstance():
.format(selfPath, projPath)) .format(selfPath, projPath))
return buildPath return buildPath
@classmethod
def prepareGcovEnv(cls, env):
# Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
bPath = cls._getBuildPath() # build PATH
numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3
numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/svc_gcov'
env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
numSegments, env['GCOV_PREFIX'] ))
def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030): def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
self._buildDir = self._getBuildPath() self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/" self._subdir = '/' + subdir # TODO: tolerate "/"
...@@ -217,6 +228,11 @@ class TdeSubProcess: ...@@ -217,6 +228,11 @@ class TdeSubProcess:
# raise CrashGenError("Empty instance not allowed in TdeSubProcess") # raise CrashGenError("Empty instance not allowed in TdeSubProcess")
# self._tInst = tInst # Default create at ServiceManagerThread # self._tInst = tInst # Default create at ServiceManagerThread
def __repr__(self):
if self.subProcess is None:
return '[TdeSubProc: Empty]'
return '[TdeSubProc: pid = {}]'.format(self.getPid())
def getStdOut(self): def getStdOut(self):
return self.subProcess.stdout return self.subProcess.stdout
...@@ -235,17 +251,30 @@ class TdeSubProcess: ...@@ -235,17 +251,30 @@ class TdeSubProcess:
# Sanity check # Sanity check
if self.subProcess: # already there if self.subProcess: # already there
raise RuntimeError("Corrupt process state") raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
myEnv = os.environ.copy()
TdeInstance.prepareGcovEnv(myEnv)
# print(myEnv)
# print(myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell = True
self.subProcess = subprocess.Popen( self.subProcess = subprocess.Popen(
cmdLine, ' '.join(cmdLine) if useShell else cmdLine,
shell=False, shell=useShell,
# svcCmdSingle, shell=True, # capture core dump? # svcCmdSingle, shell=True, # capture core dump?
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode # bufsize=1, # not supported in binary mode
close_fds=ON_POSIX close_fds=ON_POSIX,
env=myEnv
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process?
def stop(self): def stop(self):
""" """
Stop a sub process, and try to return a meaningful return code. Stop a sub process, and try to return a meaningful return code.
...@@ -267,7 +296,7 @@ class TdeSubProcess: ...@@ -267,7 +296,7 @@ class TdeSubProcess:
SIGUSR2 12 SIGUSR2 12
""" """
if not self.subProcess: if not self.subProcess:
print("Sub process already stopped") Logging.error("Sub process already stopped")
return # -1 return # -1
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
...@@ -278,20 +307,25 @@ class TdeSubProcess: ...@@ -278,20 +307,25 @@ class TdeSubProcess:
return retCode return retCode
# process still alive, let's interrupt it # process still alive, let's interrupt it
print("Terminate running process, send SIG_INT and wait...") Logging.info("Terminate running process, send SIG_{} and wait...".format(self.STOP_SIGNAL))
# sub process should end, then IPC queue should end, causing IO thread to end # sub process should end, then IPC queue should end, causing IO thread to end
# sig = signal.SIGINT topSubProc = psutil.Process(self.subProcess.pid)
sig = signal.SIGKILL for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
self.subProcess.send_signal(sig) # SIGNINT or SIGKILL child.send_signal(self.STOP_SIGNAL)
time.sleep(0.2) # 200 ms
# topSubProc.send_signal(sig) # now kill the main sub process (likely the Shell)
self.subProcess.send_signal(self.STOP_SIGNAL) # main sub process (likely the Shell)
self.subProcess.wait(20) self.subProcess.wait(20)
retCode = self.subProcess.returncode # should always be there retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore # May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now # The process is guranteed to have ended by now
self.subProcess = None self.subProcess = None
if retCode != 0: # != (- signal.SIGINT): if retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(sig, retCode)) Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
self.STOP_SIGNAL, retCode))
else: else:
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(sig)) Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(self.STOP_SIGNAL))
return - retCode return - retCode
class ServiceManager: class ServiceManager:
...@@ -439,7 +473,7 @@ class ServiceManager: ...@@ -439,7 +473,7 @@ class ServiceManager:
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy") # raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...") Logging.info("Service Manager Thread (with subprocess) ended, main thread exiting...")
def _getFirstInstance(self): def _getFirstInstance(self):
return self._tInsts[0] return self._tInsts[0]
...@@ -452,7 +486,7 @@ class ServiceManager: ...@@ -452,7 +486,7 @@ class ServiceManager:
# Find if there's already a taosd service, and then kill it # Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter(): for proc in psutil.process_iter():
if proc.name() == 'taosd': if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
time.sleep(2.0) time.sleep(2.0)
proc.kill() proc.kill()
# print("Process: {}".format(proc.name())) # print("Process: {}".format(proc.name()))
...@@ -559,7 +593,8 @@ class ServiceManagerThread: ...@@ -559,7 +593,8 @@ class ServiceManagerThread:
for i in range(0, 100): for i in range(0, 100):
time.sleep(1.0) time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up # self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True) Progress.emit(Progress.SERVICE_START_NAP)
# print("_zz_", end="", flush=True)
if self._status.isRunning(): if self._status.isRunning():
Logging.info("[] TDengine service READY to process requests") Logging.info("[] TDengine service READY to process requests")
Logging.info("[] TAOS service started: {}".format(self)) Logging.info("[] TAOS service started: {}".format(self))
...@@ -595,12 +630,12 @@ class ServiceManagerThread: ...@@ -595,12 +630,12 @@ class ServiceManagerThread:
def stop(self): def stop(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...") Logging.info("Terminating TDengine service running as the sub process...")
if self.getStatus().isStopped(): if self.getStatus().isStopped():
print("Service already stopped") Logging.info("Service already stopped")
return return
if self.getStatus().isStopping(): if self.getStatus().isStopping():
print("Service is already being stopped") Logging.info("Service is already being stopped")
return return
# Linux will send Control-C generated SIGINT to the TDengine process # Linux will send Control-C generated SIGINT to the TDengine process
# already, ref: # already, ref:
...@@ -616,10 +651,10 @@ class ServiceManagerThread: ...@@ -616,10 +651,10 @@ class ServiceManagerThread:
if retCode == signal.SIGSEGV : # SGV if retCode == signal.SIGSEGV : # SGV
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
except subprocess.TimeoutExpired as err: except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit") Logging.info("Time out waiting for TDengine service process to exit")
else: else:
if self._tdeSubProcess.isRunning(): # still running, should now never happen if self._tdeSubProcess.isRunning(): # still running, should now never happen
print("FAILED to stop sub process, it is still running... pid = {}".format( Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid())) self._tdeSubProcess.getPid()))
else: else:
self._tdeSubProcess = None # not running any more self._tdeSubProcess = None # not running any more
...@@ -683,9 +718,9 @@ class ServiceManagerThread: ...@@ -683,9 +718,9 @@ class ServiceManagerThread:
return # we are done with THIS BATCH return # we are done with THIS BATCH
else: # got line, printing out else: # got line, printing out
if forceOutput: if forceOutput:
Logging.info(line) Logging.info('[TAOSD] ' + line)
else: else:
Logging.debug(line) Logging.debug('[TAOSD] ' + line)
print(">", end="", flush=True) print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"] _ProgressBars = ["--", "//", "||", "\\\\"]
...@@ -728,11 +763,11 @@ class ServiceManagerThread: ...@@ -728,11 +763,11 @@ class ServiceManagerThread:
# queue.put(line) # queue.put(line)
# meaning sub process must have died # meaning sub process must have died
Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self)) Logging.info("EOF for TDengine STDOUT: {}".format(self))
out.close() out.close()
def svcErrorReader(self, err: IO, queue): def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''): for line in iter(err.readline, b''):
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) Logging.info("TDengine STDERR: {}".format(line))
Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self)) Logging.info("EOF for TDengine STDERR: {}".format(self))
err.close() err.close()
\ No newline at end of file
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
################################################################### ###################################################################
import sys import sys
from crash_gen.crash_gen import MainExec from crash_gen.crash_gen_main import MainExec
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册