未验证 提交 a92039b3 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #10647 from taosdata/fix/crash_gen_restful

<fix> : fix crash_gen support restful mode by deploy taosadapter
...@@ -84,10 +84,10 @@ class WorkerThread: ...@@ -84,10 +84,10 @@ class WorkerThread:
# self._thread = threading.Thread(target=runThread, args=(self,)) # self._thread = threading.Thread(target=runThread, args=(self,))
self._thread = threading.Thread(target=self.run) self._thread = threading.Thread(target=self.run)
self._stepGate = threading.Event() self._stepGate = threading.Event()
# Let us have a DB connection of our own # Let us have a DB connection of our own
if (Config.getConfig().per_thread_db_connection): # type: ignore if (Config.getConfig().per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type)) # print("connector_type = {}".format(Config.getConfig().connector_type))
tInst = gContainer.defTdeInstance tInst = gContainer.defTdeInstance
if Config.getConfig().connector_type == 'native': if Config.getConfig().connector_type == 'native':
self._dbConn = DbConn.createNative(tInst.getDbTarget()) self._dbConn = DbConn.createNative(tInst.getDbTarget())
...@@ -963,7 +963,7 @@ class StateMechine: ...@@ -963,7 +963,7 @@ class StateMechine:
# did not do this when openning connection, and this is NOT the worker # did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own # thread, which does this on their own
dbc.use(dbName) dbc.use(dbName)
if not dbc.hasTables(): # no tables if not dbc.hasTables(dbName): # no tables
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly() return StateDbOnly()
...@@ -1434,6 +1434,7 @@ class Task(): ...@@ -1434,6 +1434,7 @@ class Task():
# TODO: refactor away, just provide the dbConn # TODO: refactor away, just provide the dbConn
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
""" Haha """ """ Haha """
# print("thread %d runing sql is : %s " %(wt._tid , sql) )
return wt.execSql(sql) return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
...@@ -1690,6 +1691,9 @@ class TdSuperTable: ...@@ -1690,6 +1691,9 @@ class TdSuperTable:
def getName(self): def getName(self):
return self._stName return self._stName
def getFullTableName(self):
return self._dbName + '.' + self._stName
def drop(self, dbc, skipCheck = False): def drop(self, dbc, skipCheck = False):
dbName = self._dbName dbName = self._dbName
if self.exists(dbc) : # if myself exists if self.exists(dbc) : # if myself exists
...@@ -1701,7 +1705,7 @@ class TdSuperTable: ...@@ -1701,7 +1705,7 @@ class TdSuperTable:
def exists(self, dbc): def exists(self, dbc):
dbc.execute("USE " + self._dbName) dbc.execute("USE " + self._dbName)
return dbc.existsSuperTable(self._stName) return dbc.existsSuperTable(self._dbName, self._stName)
# TODO: odd semantic, create() method is usually static? # TODO: odd semantic, create() method is usually static?
def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False): def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
...@@ -1710,7 +1714,7 @@ class TdSuperTable: ...@@ -1710,7 +1714,7 @@ class TdSuperTable:
dbName = self._dbName dbName = self._dbName
dbc.execute("USE " + dbName) dbc.execute("USE " + dbName)
fullTableName = dbName + '.' + self._stName fullTableName = dbName + '.' + self._stName
if dbc.existsSuperTable(self._stName): if dbc.existsSuperTable(dbName, self._stName):
if dropIfExists: if dropIfExists:
dbc.execute("DROP TAbLE {}".format(fullTableName)) dbc.execute("DROP TAbLE {}".format(fullTableName))
else: # error else: # error
...@@ -2491,7 +2495,7 @@ class MainExec: ...@@ -2491,7 +2495,7 @@ class MainExec:
action='store', action='store',
default='native', default='native',
type=str, type=str,
help='Connector type to use: native, rest, or mixed (default: 10)') help='Connector type to use: native, rest, or mixed (default: native)')
parser.add_argument( parser.add_argument(
'-d', '-d',
'--debug', '--debug',
...@@ -2552,7 +2556,7 @@ class MainExec: ...@@ -2552,7 +2556,7 @@ class MainExec:
'-r', '-r',
'--record-ops', '--record-ops',
action='store_true', action='store_true',
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') help='Use a pair of always-fsynced files to record operations performing + performed, for power-off tests (default: false)')
parser.add_argument( parser.add_argument(
'-s', '-s',
'--max-steps', '--max-steps',
......
...@@ -164,6 +164,9 @@ quorum 2 ...@@ -164,6 +164,9 @@ quorum 2
def getExecFile(self): # .../taosd def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd" return self._buildDir + "/build/bin/taosd"
def getAdapterFile(self): # .../taosadapter for restful
return self._buildDir + "/build/bin/taosadapter"
def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?! def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?!
if Config.getConfig().set_path =='': # use default path if Config.getConfig().set_path =='': # use default path
...@@ -187,6 +190,31 @@ quorum 2 ...@@ -187,6 +190,31 @@ quorum 2
else: else:
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def getAdapterCmdLine(self):
REST_PORT_INCREMENT = 11
Adapter_ports =str(self._port + REST_PORT_INCREMENT)
AdapterCmds = [self.getAdapterFile() + ' --port='+ Adapter_ports +
' --log.path='+ self.getLogDir() + ' --taosConfigDir='+self.getCfgDir()+
' --collectd.enable=false' +
' --influxdb.enable=false --node_exporter.enable=false' +
' --opentsdb.enable=false --statsd.enable=false' +
' --prometheus.enable=false --opentsdb_telnet.enable=false'] # get adapter cmd string
return AdapterCmds
def start_Adapter(self,cmdLine):
# print('nohup '+' '.join(cmdLine)+ '>>taosadapter.log 2>&1 &')
cmds = 'nohup '+' '.join(cmdLine)+ '>>taosadapter.log 2>&1 &'
ret = Popen(
cmds,
shell=True,
stdout=PIPE,
stderr=PIPE,
)
time.sleep(0.1) # very brief wait, then let's check if sub process started successfully.
if ret.poll():
raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine))
return ret
def _getDnodes(self, dbc): def _getDnodes(self, dbc):
dbc.query("show dnodes") dbc.query("show dnodes")
...@@ -230,6 +258,10 @@ quorum 2 ...@@ -230,6 +258,10 @@ quorum 2
# self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir()) self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir())
# run taosadapter by subprocess ,taosadapter is stateless with TDengine ,so it don't need monitor
self.start_Adapter(self.getAdapterCmdLine())
print(' '.join(self.getAdapterCmdLine()))
def stop(self): def stop(self):
self._subProcess.stop() self._subProcess.stop()
self._subProcess = None self._subProcess = None
......
...@@ -100,13 +100,13 @@ class DbConn: ...@@ -100,13 +100,13 @@ class DbConn:
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName))) # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here return dbName in dbs # TODO: super weird type mangling seen, once here
def existsSuperTable(self, stName): def existsSuperTable(self, dbName, stName):
self.query("show stables") self.query("show {}.stables".format(dbName))
sts = [v[0] for v in self.getQueryResult()] sts = [v[0] for v in self.getQueryResult()]
return stName in sts return stName in sts
def hasTables(self): def hasTables(self, dbName):
return self.query("show tables") > 0 return self.query("show {}.tables".format(dbName)) > 0
def execute(self, sql): def execute(self, sql):
''' Return the number of rows affected''' ''' Return the number of rows affected'''
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册