提交 467ca437 编写于 作者: S Steven Li

Found invalid pointer access with -t 2 -s 5

上级 68b4ea01
...@@ -152,12 +152,18 @@ class WorkerThread: ...@@ -152,12 +152,18 @@ class WorkerThread:
self._stepGate.set() # wake up! self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit time.sleep(0) # let the released thread run a bit
def execSql(self, sql): # not "execute", since we are out side the DB context def execSql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ): if ( gConfig.per_thread_db_connection ):
return self._dbConn.execute(sql) return self._dbConn.execute(sql)
else: else:
return self._tc.getDbState().getDbConn().execute(sql) return self._tc.getDbState().getDbConn().execute(sql)
def getDbConn(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn
else:
return self._tc.getDbState().getDbConn()
# def querySql(self, sql): # not "execute", since we are out side the DB context # def querySql(self, sql): # not "execute", since we are out side the DB context
# if ( gConfig.per_thread_db_connection ): # if ( gConfig.per_thread_db_connection ):
# return self._dbConn.query(sql) # return self._dbConn.query(sql)
...@@ -432,14 +438,23 @@ class DbConn: ...@@ -432,14 +438,23 @@ class DbConn:
def execute(self, sql): def execute(self, sql):
if ( not self.isOpen ): if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open") raise RuntimeError("Cannot execute database commands until connection is open")
return self._tdSql.execute(sql) logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.execute(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql) : # return rows affected def query(self, sql) : # return rows affected
if ( not self.isOpen ): if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open") raise RuntimeError("Cannot query database until connection is open")
return self._tdSql.query(sql) logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult # results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
def _queryAny(self, sql) : # actual query result as an int def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ): if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open") raise RuntimeError("Cannot query database until connection is open")
...@@ -468,8 +483,8 @@ class AnyState: ...@@ -468,8 +483,8 @@ class AnyState:
STATE_VAL_IDX = 0 STATE_VAL_IDX = 0
CAN_CREATE_DB = 1 CAN_CREATE_DB = 1
CAN_DROP_DB = 2 CAN_DROP_DB = 2
CAN_CREATE_FIXED_TABLE = 3 CAN_CREATE_FIXED_SUPER_TABLE = 3
CAN_DROP_FIXED_TABLE = 4 CAN_DROP_FIXED_SUPER_TABLE = 4
CAN_ADD_DATA = 5 CAN_ADD_DATA = 5
CAN_READ_DATA = 6 CAN_READ_DATA = 6
...@@ -502,10 +517,10 @@ class AnyState: ...@@ -502,10 +517,10 @@ class AnyState:
return self._info[self.CAN_CREATE_DB] return self._info[self.CAN_CREATE_DB]
def canDropDb(self): def canDropDb(self):
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
def canCreateFixedTable(self): def canCreateFixedSuperTable(self):
return self._info[self.CAN_CREATE_FIXED_TABLE] return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedTable(self): def canDropFixedSuperTable(self):
return self._info[self.CAN_DROP_FIXED_TABLE] return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
def canAddData(self): def canAddData(self):
return self._info[self.CAN_ADD_DATA] return self._info[self.CAN_ADD_DATA]
def canReadData(self): def canReadData(self):
...@@ -602,9 +617,9 @@ class StateDbOnly(AnyState): ...@@ -602,9 +617,9 @@ class StateDbOnly(AnyState):
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
self.assertAtMostOneSuccess(tasks, DropDbTask) self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY # self._state = self.STATE_EMPTY
elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful
self.assertNoTask(tasks, DropDbTask) # should have have tried self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail # # can't say there's add-data attempts, since they may all fail
...@@ -618,7 +633,7 @@ class StateDbOnly(AnyState): ...@@ -618,7 +633,7 @@ class StateDbOnly(AnyState):
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks, # # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
# self._state = self.STATE_DB_ONLY # no change # self._state = self.STATE_DB_ONLY # no change
class StateTableOnly(AnyState): class StateSuperTableOnly(AnyState):
def getInfo(self): def getInfo(self):
return [ return [
self.STATE_TABLE_ONLY, self.STATE_TABLE_ONLY,
...@@ -628,8 +643,8 @@ class StateTableOnly(AnyState): ...@@ -628,8 +643,8 @@ class StateTableOnly(AnyState):
] ]
def verifyTasksToState(self, tasks, newState): def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, DropFixedTableTask) self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask)
# self._state = self.STATE_DB_ONLY # self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
...@@ -658,16 +673,16 @@ class StateHasData(AnyState): ...@@ -658,16 +673,16 @@ class StateHasData(AnyState):
elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task
self.assertNoTask(tasks, DropDbTask) # we must have drop_db task self.assertNoTask(tasks, DropDbTask) # we must have drop_db task
self.hasSuccess(tasks, DropFixedTableTask) self.hasSuccess(tasks, DropFixedSuperTableTask)
self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # TODO: dicy self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, DropDbTask) self.assertNoTask(tasks, DropDbTask)
self.assertNoTask(tasks, DropFixedTableTask) self.assertNoTask(tasks, DropFixedSuperTableTask)
self.assertNoTask(tasks, AddFixedDataTask) self.assertNoTask(tasks, AddFixedDataTask)
# self.hasSuccess(tasks, DeleteDataTasks) # self.hasSuccess(tasks, DeleteDataTasks)
else: else:
self.assertNoTask(tasks, DropDbTask) self.assertNoTask(tasks, DropDbTask)
self.assertNoTask(tasks, DropFixedTableTask) self.assertNoTask(tasks, DropFixedSuperTableTask)
self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
...@@ -681,7 +696,7 @@ class DbState(): ...@@ -681,7 +696,7 @@ class DbState():
self._lock = threading.RLock() self._lock = threading.RLock()
self._state = StateInvalid() # starting state self._state = StateInvalid() # starting state
self._stateWeights = [1,3,5,10] self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
# self.openDbServerConnection() # self.openDbServerConnection()
self._dbConn = DbConn() self._dbConn = DbConn()
...@@ -711,8 +726,8 @@ class DbState(): ...@@ -711,8 +726,8 @@ class DbState():
tIndex = self.tableNumQueue.push() tIndex = self.tableNumQueue.push()
return tIndex return tIndex
def getFixedTableName(self): def getFixedSuperTableName(self):
return "fixed_table" return "fs_table"
def releaseTable(self, i): # return the table back, so others can use it def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i) self.tableNumQueue.release(i)
...@@ -726,6 +741,12 @@ class DbState(): ...@@ -726,6 +741,12 @@ class DbState():
with self._lock: with self._lock:
self._lastInt += 1 self._lastInt += 1
return self._lastInt return self._lastInt
def getNextBinary(self):
return "Los_Angeles_{}".format(self.getNextInt())
def getNextFloat(self):
return 0.9 + self.getNextInt()
def getTableNameToDelete(self): def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition! tblNum = self.tableNumQueue.pop() # TODO: race condition!
...@@ -752,11 +773,12 @@ class DbState(): ...@@ -752,11 +773,12 @@ class DbState():
if endState == None: if endState == None:
continue continue
for tc in allTaskClasses: # what task can further begin from there? for tc in allTaskClasses: # what task can further begin from there?
if tc.canBeginFrom(endState) and (endState not in firstTaskTypes): if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
taskTypes.append(tc) # gather it taskTypes.append(tc) # gather it
if len(taskTypes) <= 0: if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._state)) raise RuntimeError("No suitable task types found for state: {}".format(self._state))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes))
return taskTypes return taskTypes
# tasks.append(ReadFixedDataTask(self)) # always for everybody # tasks.append(ReadFixedDataTask(self)) # always for everybody
...@@ -809,10 +831,10 @@ class DbState(): ...@@ -809,10 +831,10 @@ class DbState():
# logger.debug("Found DB ONLY state") # logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly() return StateDbOnly()
if dbc.query("SELECT * FROM db.{}".format(self.getFixedTableName()) ) == 0 : # no data if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data
# logger.debug("Found TABLE_ONLY state") # logger.debug("Found TABLE_ONLY state")
logger.debug("[STT] TABLE_ONLY found, between {} and {}".format(ts, time.time())) logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateTableOnly() return StateSuperTableOnly()
else: else:
# logger.debug("Found HAS_DATA state") # logger.debug("Found HAS_DATA state")
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
...@@ -933,7 +955,7 @@ class Task(): ...@@ -933,7 +955,7 @@ class Task():
return self._dbState.execute(sql) return self._dbState.execute(sql)
class ExecutionStats: class ExecutionStats:
def __init__(self): def __init__(self):
self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
self._tasksInProgress = 0 self._tasksInProgress = 0
...@@ -984,7 +1006,7 @@ class ExecutionStats: ...@@ -984,7 +1006,7 @@ class ExecutionStats:
logger.info("| Task Execution Times (success/total):") logger.info("| Task Execution Times (success/total):")
execTimesAny = 0 execTimesAny = 0
for k, n in self._execTimes.items(): for k, n in self._execTimes.items():
execTimesAny += n[1] execTimesAny += n[0]
logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0])) logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0]))
logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny)) logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
...@@ -1035,7 +1057,7 @@ class CreateDbTask(StateTransitionTask): ...@@ -1035,7 +1057,7 @@ class CreateDbTask(StateTransitionTask):
return state.canCreateDb() return state.canCreateDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db") wt.execSql("create database db")
class DropDbTask(StateTransitionTask): class DropDbTask(StateTransitionTask):
@classmethod @classmethod
...@@ -1053,21 +1075,23 @@ class DropDbTask(StateTransitionTask): ...@@ -1053,21 +1075,23 @@ class DropDbTask(StateTransitionTask):
wt.execSql("drop database db") wt.execSql("drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time())) logger.debug("[OPS] database dropped at {}".format(time.time()))
class CreateFixedTableTask(StateTransitionTask): class CreateFixedSuperTableTask(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
# [AnyState.STATE_DB_ONLY], # [AnyState.STATE_DB_ONLY],
StateTableOnly() StateSuperTableOnly()
] ]
@classmethod @classmethod
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canCreateFixedTable() return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName() tblName = self._dbState.getFixedSuperTableName()
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(20), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
class ReadFixedDataTask(StateTransitionTask): class ReadFixedDataTask(StateTransitionTask):
@classmethod @classmethod
...@@ -1082,11 +1106,17 @@ class ReadFixedDataTask(StateTransitionTask): ...@@ -1082,11 +1106,17 @@ class ReadFixedDataTask(StateTransitionTask):
return state.canReadData() return state.canReadData()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName() sTbName = self._dbState.getFixedSuperTableName()
wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later dbc = wt.getDbConn()
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
rTables = dbc.getQueryResult()
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# tdSql.query(" cars where tbname in ('carzero', 'carone')") # tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropFixedTableTask(StateTransitionTask): class DropFixedSuperTableTask(StateTransitionTask):
@classmethod @classmethod
def getInfo(cls): def getInfo(cls):
return [ return [
...@@ -1096,10 +1126,10 @@ class DropFixedTableTask(StateTransitionTask): ...@@ -1096,10 +1126,10 @@ class DropFixedTableTask(StateTransitionTask):
@classmethod @classmethod
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
return state.canDropFixedTable() return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName() tblName = self._dbState.getFixedSuperTableName()
wt.execSql("drop table db.{}".format(tblName)) wt.execSql("drop table db.{}".format(tblName))
class AddFixedDataTask(StateTransitionTask): class AddFixedDataTask(StateTransitionTask):
...@@ -1116,8 +1146,14 @@ class AddFixedDataTask(StateTransitionTask): ...@@ -1116,8 +1146,14 @@ class AddFixedDataTask(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState ds = self._dbState
sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
wt.execSql(sql) for i in range(10): # 0 to 9
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
i,
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
#---------- Non State-Transition Related Tasks ----------# #---------- Non State-Transition Related Tasks ----------#
...@@ -1150,10 +1186,10 @@ class AddDataTask(Task): ...@@ -1150,10 +1186,10 @@ class AddDataTask(Task):
self.logInfo("No table found to add data, skipping...") self.logInfo("No table found to add data, skipping...")
return return
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
self.logDebug("Executing SQL: {}".format(sql)) self.logDebug("[SQL] Executing SQL: {}".format(sql))
wt.execSql(sql) wt.execSql(sql)
ds.releaseTable(tIndex) ds.releaseTable(tIndex)
self.logDebug("Finished adding data") self.logDebug("[OPS] Finished adding data")
# Deterministic random number generator # Deterministic random number generator
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册