diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index bb8de2ab4a82989309870f40a0c6ef9d4409757d..31b013d065edce3095e75f63975e854769ad22b6 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -1905,18 +1905,23 @@ class TaskCreateConsumers(StateTransitionTask): @classmethod def canBeginFrom(cls, state: AnyState): - return state.canCreateConsumers() + return state.canCreateConsumers() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - dbname = self._db.getName() - - sTable = self._db.getFixedSuperTable() # type: TdSuperTable - # wt.execSql("use db") # should always be in place - # create Consumers - if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task - if sTable.hasTopics(wt.getDbConn()): - sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) + if Config.getConfig().connector_type == 'native': + dbname = self._db.getName() + + sTable = self._db.getFixedSuperTable() # type: TdSuperTable + # wt.execSql("use db") # should always be in place + + # create Consumers + if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task + if sTable.hasTopics(wt.getDbConn()): + sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) + else: + print(" restful not support tmq consumers") + return class TaskCreateSuperTable(StateTransitionTask):