提交 f6de1a86 编写于 作者: W wenzhouwww@live.cn

enh crash_gen for 3.0 about [stream processing] , [data subscription] ,[delete data]

上级 be9a32fc
......@@ -45,7 +45,7 @@ fi
# Now getting ready to execute Python
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
PYTHON_EXEC=python3.8
PYTHON_EXEC=python3
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
# export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd)
......
......@@ -26,9 +26,12 @@ class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
# class variables
lastSqlFromThreads : dict[int, str] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
spendThreads : dict[int, float] = {} # stored by thread id, obtained from threading.current_thread().ident%10000
@classmethod
def saveSqlForCurrentThread(cls, sql: str):
......@@ -37,15 +40,36 @@ class DbConn:
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.lastSqlFromThreads[shortTid] = sql # Save this for later
@classmethod
def fetchSqlForThread(cls, shortTid : int) -> str :
def fetchSqlForThread(cls, shortTid : int) -> str :
print("=======================")
if shortTid not in cls.lastSqlFromThreads:
raise CrashGenError("No last-attempted-SQL found for thread id: {}".format(shortTid))
return cls.lastSqlFromThreads[shortTid]
return cls.lastSqlFromThreads[shortTid]
@classmethod
def sql_exec_spend(cls, cost: float):
'''
Let us save the last SQL statement on a per-thread basis, so that when later we
run into a dead-lock situation, we can pick out the deadlocked thread, and use
that information to find what what SQL statement is stuck.
'''
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
cls.spendThreads[shortTid] = cost # Save this for later
@classmethod
def get_time_cost(cls) ->float:
th = threading.current_thread()
shortTid = th.native_id % 10000 #type: ignore
return cls.spendThreads.get(shortTid)
@classmethod
def create(cls, connType, dbTarget):
......@@ -61,6 +85,7 @@ class DbConn:
def createNative(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_NATIVE, dbTarget)
@classmethod
def createRest(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_REST, dbTarget)
......@@ -75,6 +100,7 @@ class DbConn:
return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
def getLastSql(self):
return self._lastSql
def open(self):
......@@ -184,13 +210,19 @@ class DbConnRest(DbConn):
def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted
self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above
try:
time_cost = -1
time_start = time.time()
try:
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
auth = HTTPBasicAuth('root', 'taosdata'))
except:
print("REST API Failure (TODO: more info here)")
self.sql_exec_spend(-2)
raise
finally:
time_cost = time.time()- time_start
self.sql_exec_spend(time_cost)
rj = r.json()
# Sanity check for the "Json Result"
if ('status' not in rj):
......@@ -223,6 +255,8 @@ class DbConnRest(DbConn):
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql): # return rows affected
return self.execute(sql)
......@@ -336,6 +370,7 @@ class MyTDSql:
raise
return self.affectedRows
class DbTarget:
def __init__(self, cfgPath, hostAddr, port):
self.cfgPath = cfgPath
......@@ -355,6 +390,7 @@ class DbConnNative(DbConn):
# _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private
totalRequests = 0
time_cost = -1
def __init__(self, dbTarget):
super().__init__(dbTarget)
......@@ -413,8 +449,19 @@ class DbConnNative(DbConn):
"Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
time_cost = -1
nRows = 0
time_start = time.time()
self.saveSqlForCurrentThread(sql) # Save in global structure too. #TODO: combine with above
nRows = self._tdSql.execute(sql)
try:
nRows= self._tdSql.execute(sql)
except Exception as e:
self.sql_exec_spend(-2)
finally:
time_cost = time.time() - time_start
self.sql_exec_spend(time_cost)
cls = self.__class__
cls.totalRequests += 1
Logging.debug(
......@@ -494,4 +541,3 @@ class DbManager():
self._dbConn.close()
self._dbConn = None
Logging.debug("DbManager closed DB connection...")
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册