提交 3060fafd 编写于 作者: S Steven Li

Now locking create table operation, resolving TD-1471

上级 54b34680
......@@ -243,7 +243,7 @@ class WorkerThread:
class ThreadCoordinator:
WORKER_THREAD_TIMEOUT = 180 # one minute
WORKER_THREAD_TIMEOUT = 120 # Normal: 120
def __init__(self, pool: ThreadPool, dbManager: DbManager):
self._curStep = -1 # first step is 0
......@@ -1177,6 +1177,8 @@ class Task():
instead. But a task is always associated with a DB
'''
taskSn = 100
_lock = threading.Lock()
_tableLocks: Dict[str, threading.Lock] = {}
@classmethod
def allocTaskNum(cls):
......@@ -1198,6 +1200,8 @@ class Task():
self._execStats = execStats
self._db = db # A task is always associated/for a specific DB
def isSuccess(self):
return self._err is None
......@@ -1351,6 +1355,24 @@ class Task():
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
return wt.getQueryResult()
def lockTable(self, ftName): # full table name
# print(" <<" + ftName + '_', end="", flush=True)
with Task._lock:
if not ftName in Task._tableLocks:
Task._tableLocks[ftName] = threading.Lock()
Task._tableLocks[ftName].acquire()
def unlockTable(self, ftName):
# print('_' + ftName + ">> ", end="", flush=True)
with Task._lock:
if not ftName in self._tableLocks:
raise RuntimeError("Corrupt state, no such lock")
lock = Task._tableLocks[ftName]
if not lock.locked():
raise RuntimeError("Corrupte state, already unlocked")
lock.release()
class ExecutionStats:
def __init__(self):
......@@ -1461,7 +1483,7 @@ class StateTransitionTask(Task):
_baseTableNumber = None
_endState = None
_endState = None # TODO: no longter used?
@classmethod
def getInfo(cls): # each sub class should supply their own information
......@@ -1486,7 +1508,7 @@ class StateTransitionTask(Task):
@classmethod
def getRegTableName(cls, i):
if ( StateTransitionTask._baseTableNumber is None):
if ( StateTransitionTask._baseTableNumber is None): # Set it one time
StateTransitionTask._baseTableNumber = Dice.throw(
999) if gConfig.dynamic_db_table_names else 0
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
......@@ -1583,14 +1605,23 @@ class TdSuperTable:
def hasRegTables(self, dbc: DbConn, dbName: str):
return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str):
def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str):
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
return
sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format(
dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
)
dbc.execute(sql)
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
fullTableName = dbName + '.' + regTableName
task.lockTable(fullTableName)
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
print("(" + fullTableName[-3:] + ")", end="", flush=True)
try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
)
dbc.execute(sql)
finally:
task.unlockTable(fullTableName) # no matter what
def _getTagStrForSql(self, dbc, dbName: str) :
tags = self._getTags(dbc, dbName)
......@@ -1862,7 +1893,12 @@ class TaskAddData(StateTransitionTask):
sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
fullTableName = db.getName() + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = db.getNextInt()
......@@ -1872,27 +1908,29 @@ class TaskAddData(StateTransitionTask):
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.flush()
os.fsync(self.fAddLogReady)
sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {})
db.getName(),
regTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
dbc.execute(sql)
# Successfully wrote the data into the DB, let's record it
# somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write(
"Wrote {} to {}\n".format(
nextInt, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone)
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
if gConfig.verify_data:
self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try:
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
fullTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
dbc.execute(sql)
except: # Any exception at all
if gConfig.verify_data:
self.unlockTable(fullTableName)
raise
# Now read it back and verify, we might encounter an error if table is dropped
if gConfig.verify_data: # only if command line asks for it
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'".
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick))
if readBack != nextInt :
raise taos.error.ProgrammingError(
......@@ -1905,8 +1943,23 @@ class TaskAddData(StateTransitionTask):
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
errno)
# Re-throw no matter what
raise
elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing
dummy = 0
else:
# Re-throw otherwise
raise
finally:
self.unlockTable(fullTableName) # Unlock the table no matter what
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone)
self.activeTable.discard(i) # not raising an error, unlike remove
......
......@@ -166,6 +166,8 @@ class Progress:
SERVICE_RECONNECT_START = 4
SERVICE_RECONNECT_SUCCESS = 5
SERVICE_RECONNECT_FAILURE = 6
CREATE_TABLE_ATTEMPT = 7
tokens = {
STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[',
......@@ -174,6 +176,7 @@ class Progress:
SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>',
CREATE_TABLE_ATTEMPT: '_c',
}
@classmethod
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册