提交 59b080f0 编写于 作者: S Steven Li

Half way through enabling read/write check

上级 69aca0e1
...@@ -658,7 +658,10 @@ class DbConn: ...@@ -658,7 +658,10 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open") raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql) nRows = self.query(sql)
if nRows != 1: if nRows != 1:
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
)
if self.getResultRows() != 1 or self.getResultCols() != 1: if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql)) raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0] return self.getQueryResult()[0][0]
...@@ -815,6 +818,7 @@ class MyTDSql: ...@@ -815,6 +818,7 @@ class MyTDSql:
def _execInternal(self, sql): def _execInternal(self, sql):
startTime = time.time() startTime = time.time()
ret = self._cursor.execute(sql) ret = self._cursor.execute(sql)
print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime queryTime = time.time() - startTime
# Record the query time # Record the query time
cls = self.__class__ cls = self.__class__
...@@ -1342,13 +1346,17 @@ class Database: ...@@ -1342,13 +1346,17 @@ class Database:
For now we use it to manage state transitions in that database For now we use it to manage state transitions in that database
''' '''
_clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer
_lastTick = 0
_lastLaggingTick = 0 # lagging tick, for unsequenced insersions
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose self._dbNum = dbNum # we assign a number to databases, for our testing purpose
self._stateMachine = StateMechine(self) self._stateMachine = StateMechine(self)
self._stateMachine.init(dbc) self._stateMachine.init(dbc)
self._lastTick = self.setupLastTick()
self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock() self._lock = threading.RLock()
def getStateMachine(self) -> StateMechine: def getStateMachine(self) -> StateMechine:
...@@ -1387,7 +1395,8 @@ class Database: ...@@ -1387,7 +1395,8 @@ class Database:
# by a factor of 500. # by a factor of 500.
# TODO: what if it goes beyond 10 years into the future # TODO: what if it goes beyond 10 years into the future
# TODO: fix the error as result of above: "tsdb timestamp is out of range" # TODO: fix the error as result of above: "tsdb timestamp is out of range"
def setupLastTick(self): @classmethod
def setupLastTick(cls):
t1 = datetime.datetime(2020, 6, 1) t1 = datetime.datetime(2020, 6, 1)
t2 = datetime.datetime.now() t2 = datetime.datetime.now()
# maybe a very large number, takes 69 years to exceed Python int range # maybe a very large number, takes 69 years to exceed Python int range
...@@ -1401,14 +1410,22 @@ class Database: ...@@ -1401,14 +1410,22 @@ class Database:
logger.info("Setting up TICKS to start from: {}".format(t4)) logger.info("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
def getNextTick(self): @classmethod
with self._lock: # prevent duplicate tick def getNextTick(cls):
if Dice.throw(20) == 0: # 1 in 20 chance with cls._clsLock: # prevent duplicate tick
return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds if cls._lastLaggingTick==0:
# 10k at 1/20 chance, should be enough to avoid overlaps
cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000)
if cls._lastTick==0: # should be quite a bit into the future
cls._lastTick = cls.setupLastTick()
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
return cls._lastLaggingTick
else: # regular else: # regular
# add one second to it # add one second to it
self._lastTick += datetime.timedelta(0, 1) cls._lastTick += datetime.timedelta(0, 1)
return self._lastTick return cls._lastTick
def getNextInt(self): def getNextInt(self):
with self._lock: with self._lock:
...@@ -2177,6 +2194,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2177,6 +2194,7 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db db = self._db
dbc = wt.getDbConn()
tblSeq = list(range( tblSeq = list(range(
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq) random.shuffle(tblSeq)
...@@ -2192,6 +2210,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2192,6 +2210,7 @@ class TaskAddData(StateTransitionTask):
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick()
if gConfig.record_ops: if gConfig.record_ops:
self.prepToRecordOps() self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
...@@ -2202,8 +2221,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2202,8 +2221,8 @@ class TaskAddData(StateTransitionTask):
regTableName, regTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
db.getNextTick(), nextInt) nextTick, nextInt)
self.execWtSql(wt, sql) dbc.execute(sql)
# Successfully wrote the data into the DB, let's record it # Successfully wrote the data into the DB, let's record it
# somehow # somehow
te.recordDataMark(nextInt) te.recordDataMark(nextInt)
...@@ -2213,6 +2232,26 @@ class TaskAddData(StateTransitionTask): ...@@ -2213,6 +2232,26 @@ class TaskAddData(StateTransitionTask):
nextInt, regTableName)) nextInt, regTableName))
self.fAddLogDone.flush() self.fAddLogDone.flush()
os.fsync(self.fAddLogDone) os.fsync(self.fAddLogDone)
# Now read it back and verify, we might encounter an error if table is dropped
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'".
format(db.getName(), regTableName, nextTick))
if readBack != nextInt :
raise taos.error.ProgrammingError(
"Failed to read back same data, wrote: {}, read: {}"
.format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
errno)
# Re-throw no matter what
raise
self.activeTable.discard(i) # not raising an error, unlike remove self.activeTable.discard(i) # not raising an error, unlike remove
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册