提交 dcb11554 编写于 作者: S Steven Li

Discovered TD-256 with params: -p -d -t 10 -s 20

上级 6623cb63
......@@ -136,11 +136,17 @@ class WorkerThread:
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
def execSql(self, sql):
def execSql(self, sql): # not "execute", since we are out side the DB context
if ( gConfig.per_thread_db_connection ):
return self._dbConn.execute(sql)
else:
return self._tc.getDbState.getDbConn().execute(sql)
def querySql(self, sql): # not "execute", since we are out side the DB context
if ( gConfig.per_thread_db_connection ):
return self._dbConn.execSql(sql)
return self._dbConn.query(sql)
else:
return self._tc.getDbState.getDbConn().execSql(sql)
return self._tc.getDbState.getDbConn().query(sql)
class ThreadCoordinator:
def __init__(self, pool, wd: WorkDispatcher, dbState):
......@@ -380,11 +386,17 @@ class DbConn:
self._tdSql.close()
self.isOpen = False
def execSql(self, sql):
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
raise RuntimeError("Cannot execute database commands until connection is open")
return self._tdSql.execute(sql)
def query(self, sql) -> int : # return number of rows retrieved
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
return self._tdSql.query(sql)
# State of the database as we believe it to be
class DbState():
STATE_INVALID = -1
......@@ -441,27 +453,36 @@ class DbState():
return "table_{}".format(tblNum)
def execSql(self, sql): # using the main DB connection
return self._dbConn.execSql(sql)
return self._dbConn.execute(sql)
def cleanUp(self):
self._dbConn.close()
def getTasksAtState(self):
tasks = []
tasks.append(ReadFixedDataTask(self)) # always
if ( self._state == self.STATE_EMPTY ):
return [CreateDbTask(self), CreateFixedTableTask(self)]
tasks.append(CreateDbTask(self))
tasks.append(CreateFixedTableTask(self))
elif ( self._state == self.STATE_DB_ONLY ):
return [DropDbTask(self), CreateFixedTableTask(self), AddFixedDataTask(self)]
tasks.append(DropDbTask(self))
tasks.append(CreateFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_TABLE_ONLY ):
return [DropFixedTableTask(self), AddFixedDataTask(self)]
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
return [DropFixedTableTask(self), AddFixedDataTask(self)]
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
return tasks
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
return # do nothing
if ( self._state == self.STATE_EMPTY ):
# self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
if ( self.hasSuccess(tasks, CreateDbTask) ):
self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
self._state = self.STATE_DB_ONLY
......@@ -504,7 +525,7 @@ class DbState():
else: # did not drop table, did not insert data, that is impossible
raise RuntimeError("Unexpected no-success scenarios")
elif ( self._state == self.STATE_TABLE_ONLY ): # Same as above, TODO: adjust
elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
self._state = self.STATE_DB_ONLY
......@@ -589,6 +610,7 @@ class Task():
self._workerThread = None
self._err = None
self._curStep = None
self._numRows = None # Number of rows affected
# Assign an incremental task serial number
self._taskNum = self.allocTaskNum()
......@@ -653,6 +675,12 @@ class CreateFixedTableTask(Task):
tblName = self._dbState.getFixedTableName()
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
class ReadFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册