diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 252b343a068e84589776c9e6c59c5fd5a0384944..1e6304ed5aa4c2f23689610460ae0b09bbe1334e 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -84,10 +84,10 @@ class WorkerThread: # self._thread = threading.Thread(target=runThread, args=(self,)) self._thread = threading.Thread(target=self.run) self._stepGate = threading.Event() - # Let us have a DB connection of our own if (Config.getConfig().per_thread_db_connection): # type: ignore - # print("connector_type = {}".format(gConfig.connector_type)) + # print("connector_type = {}".format(Config.getConfig().connector_type)) + tInst = gContainer.defTdeInstance if Config.getConfig().connector_type == 'native': self._dbConn = DbConn.createNative(tInst.getDbTarget()) @@ -963,7 +963,7 @@ class StateMechine: # did not do this when openning connection, and this is NOT the worker # thread, which does this on their own dbc.use(dbName) - if not dbc.hasTables(): # no tables + if not dbc.hasTables(dbName): # no tables Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) return StateDbOnly() @@ -1434,6 +1434,7 @@ class Task(): # TODO: refactor away, just provide the dbConn def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread """ Haha """ + # print("thread %d runing sql is : %s " %(wt._tid , sql) ) return wt.execSql(sql) def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread @@ -1690,6 +1691,9 @@ class TdSuperTable: def getName(self): return self._stName + def getFullTableName(self): + return self._dbName + '.' + self._stName + def drop(self, dbc, skipCheck = False): dbName = self._dbName if self.exists(dbc) : # if myself exists @@ -1701,7 +1705,7 @@ class TdSuperTable: def exists(self, dbc): dbc.execute("USE " + self._dbName) - return dbc.existsSuperTable(self._stName) + return dbc.existsSuperTable(self._dbName, self._stName) # TODO: odd semantic, create() method is usually static? def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False): @@ -1710,7 +1714,7 @@ class TdSuperTable: dbName = self._dbName dbc.execute("USE " + dbName) fullTableName = dbName + '.' + self._stName - if dbc.existsSuperTable(self._stName): + if dbc.existsSuperTable(dbName, self._stName): if dropIfExists: dbc.execute("DROP TAbLE {}".format(fullTableName)) else: # error @@ -2491,7 +2495,7 @@ class MainExec: action='store', default='native', type=str, - help='Connector type to use: native, rest, or mixed (default: 10)') + help='Connector type to use: native, rest, or mixed (default: native)') parser.add_argument( '-d', '--debug', @@ -2552,7 +2556,7 @@ class MainExec: '-r', '--record-ops', action='store_true', - help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') + help='Use a pair of always-fsynced files to record operations performing + performed, for power-off tests (default: false)') parser.add_argument( '-s', '--max-steps', diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 3aa0d00a49844fefbb808ddce4b89b67f37ac10a..780ee7cc607a5da12e0d03c63e0584f9c9ff2bf1 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -164,6 +164,9 @@ quorum 2 def getExecFile(self): # .../taosd return self._buildDir + "/build/bin/taosd" + + def getAdapterFile(self): # .../taosadapter for restful + return self._buildDir + "/build/bin/taosadapter" def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?! if Config.getConfig().set_path =='': # use default path @@ -187,6 +190,31 @@ quorum 2 else: # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() + + def getAdapterCmdLine(self): + REST_PORT_INCREMENT = 11 + Adapter_ports =str(self._port + REST_PORT_INCREMENT) + AdapterCmds = [self.getAdapterFile() + ' --port='+ Adapter_ports + + ' --log.path='+ self.getLogDir() + ' --taosConfigDir='+self.getCfgDir()+ + ' --collectd.enable=false' + + ' --influxdb.enable=false --node_exporter.enable=false' + + ' --opentsdb.enable=false --statsd.enable=false' + + ' --prometheus.enable=false --opentsdb_telnet.enable=false'] # get adapter cmd string + return AdapterCmds + + def start_Adapter(self,cmdLine): + # print('nohup '+' '.join(cmdLine)+ '>>taosadapter.log 2>&1 &') + cmds = 'nohup '+' '.join(cmdLine)+ '>>taosadapter.log 2>&1 &' + ret = Popen( + cmds, + shell=True, + stdout=PIPE, + stderr=PIPE, + ) + time.sleep(0.1) # very brief wait, then let's check if sub process started successfully. + if ret.poll(): + raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine)) + return ret def _getDnodes(self, dbc): dbc.query("show dnodes") @@ -230,6 +258,10 @@ quorum 2 # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir()) + # run taosadapter by subprocess ,taosadapter is stateless with TDengine ,so it don't need monitor + self.start_Adapter(self.getAdapterCmdLine()) + print(' '.join(self.getAdapterCmdLine())) + def stop(self): self._subProcess.stop() self._subProcess = None diff --git a/tests/pytest/crash_gen/shared/db.py b/tests/pytest/crash_gen/shared/db.py index 75931ace48ed65708c7dfa97d01a426a0baa8203..3effb80ee9045f89943444c57a30be3ba56fc775 100644 --- a/tests/pytest/crash_gen/shared/db.py +++ b/tests/pytest/crash_gen/shared/db.py @@ -100,13 +100,13 @@ class DbConn: # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName))) return dbName in dbs # TODO: super weird type mangling seen, once here - def existsSuperTable(self, stName): - self.query("show stables") + def existsSuperTable(self, dbName, stName): + self.query("show {}.stables".format(dbName)) sts = [v[0] for v in self.getQueryResult()] return stName in sts - def hasTables(self): - return self.query("show tables") > 0 + def hasTables(self, dbName): + return self.query("show {}.tables".format(dbName)) > 0 def execute(self, sql): ''' Return the number of rows affected'''