From 456ea712b357b69f0ec79a58a50439977abdbf40 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 11 May 2021 01:48:13 +0000 Subject: [PATCH] Refactored crash_gen tool with stronger typing --- tests/pytest/crash_gen/__init__.py | 4 - tests/pytest/crash_gen/crash_gen_main.py | 184 +++++++++--------- tests/pytest/crash_gen/service_manager.py | 33 ++-- .../{settings.py => shared/config.py} | 23 ++- tests/pytest/crash_gen/{ => shared}/db.py | 19 +- tests/pytest/crash_gen/{ => shared}/misc.py | 10 +- tests/pytest/crash_gen/shared/types.py | 28 +++ tests/pytest/crash_gen/types.py | 5 - 8 files changed, 169 insertions(+), 137 deletions(-) rename tests/pytest/crash_gen/{settings.py => shared/config.py} (54%) rename tests/pytest/crash_gen/{ => shared}/db.py (97%) rename tests/pytest/crash_gen/{ => shared}/misc.py (96%) create mode 100644 tests/pytest/crash_gen/shared/types.py delete mode 100644 tests/pytest/crash_gen/types.py diff --git a/tests/pytest/crash_gen/__init__.py b/tests/pytest/crash_gen/__init__.py index 2509d7d76d..fe03bde354 100644 --- a/tests/pytest/crash_gen/__init__.py +++ b/tests/pytest/crash_gen/__init__.py @@ -1,6 +1,2 @@ # Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645 from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess -from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager -from crash_gen.settings import Settings -from crash_gen.types import DirPath \ No newline at end of file diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 8d34a7555b..644aa79916 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -1,6 +1,6 @@ # -----!/usr/bin/python3.7 ################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. +# Copyright (c) 2016-2021 by TAOS Technologies, Inc. # All rights reserved. # # This file is proprietary and confidential to TAOS Technologies. @@ -24,30 +24,34 @@ import textwrap import time import datetime import random -import logging import threading -import copy import argparse -import getopt import sys import os import io import signal import traceback -import resource +import requests # from guppy import hpy import gc +import taos + +from .shared.types import TdColumns, TdTags # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess -from crash_gen import ServiceManager, Settings, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \ - CrashGenError, Progress, MyTDSql, \ - TdeInstance +# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \ +# CrashGenError, Progress, MyTDSql, \ +# TdeInstance -import taos -import requests +from .service_manager import ServiceManager, TdeInstance + +from .shared.config import Config +from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql +from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress +from .shared.types import TdDataType -Settings.init() +# Config.init() # Require Python 3 if sys.version_info[0] < 3: @@ -81,20 +85,20 @@ class WorkerThread: self._stepGate = threading.Event() # Let us have a DB connection of our own - if (Settings.getConfig().per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore # print("connector_type = {}".format(gConfig.connector_type)) tInst = gContainer.defTdeInstance - if Settings.getConfig().connector_type == 'native': + if Config.getConfig().connector_type == 'native': self._dbConn = DbConn.createNative(tInst.getDbTarget()) - elif Settings.getConfig().connector_type == 'rest': + elif Config.getConfig().connector_type == 'rest': self._dbConn = DbConn.createRest(tInst.getDbTarget()) - elif Settings.getConfig().connector_type == 'mixed': + elif Config.getConfig().connector_type == 'mixed': if Dice.throw(2) == 0: # 1/2 chance self._dbConn = DbConn.createNative(tInst.getDbTarget()) else: self._dbConn = DbConn.createRest(tInst.getDbTarget()) else: - raise RuntimeError("Unexpected connector type: {}".format(Settings.getConfig().connector_type)) + raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type)) # self._dbInUse = False # if "use db" was executed already @@ -123,14 +127,14 @@ class WorkerThread: # self.isSleeping = False Logging.info("Starting to run thread: {}".format(self._tid)) - if (Settings.getConfig().per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore Logging.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() # clean up - if (Settings.getConfig().per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore if self._dbConn.isOpen: #sometimes it is not open self._dbConn.close() else: @@ -158,7 +162,7 @@ class WorkerThread: # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: - if (Settings.getConfig().per_thread_db_connection): # most likely TRUE + if (Config.getConfig().per_thread_db_connection): # most likely TRUE if not self._dbConn.isOpen: # might have been closed during server auto-restart self._dbConn.open() # self.useDb() # might encounter exceptions. TODO: catch @@ -232,7 +236,7 @@ class WorkerThread: return self.getDbConn().getQueryResult() def getDbConn(self) -> DbConn : - if (Settings.getConfig().per_thread_db_connection): + if (Config.getConfig().per_thread_db_connection): return self._dbConn else: return self._tc.getDbManager().getDbConn() @@ -283,7 +287,7 @@ class ThreadCoordinator: self._execStats.registerFailure("User Interruption") def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): - maxSteps = Settings.getConfig().max_steps # type: ignore + maxSteps = Config.getConfig().max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True if self._runStatus != Status.STATUS_RUNNING: @@ -388,7 +392,7 @@ class ThreadCoordinator: hasAbortedTask = False workerTimeout = False while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): - if not Settings.getConfig().debug: # print this only if we are not in debug mode + if not Config.getConfig().debug: # print this only if we are not in debug mode Progress.emit(Progress.STEP_BOUNDARY) # print(".", end="", flush=True) # if (self._curStep % 2) == 0: # print memory usage once every 10 steps @@ -512,18 +516,18 @@ class ThreadCoordinator: ''' Initialize multiple databases, invoked at __ini__() time ''' self._dbs = [] # type: List[Database] dbc = self.getDbManager().getDbConn() - if Settings.getConfig().max_dbs == 0: + if Config.getConfig().max_dbs == 0: self._dbs.append(Database(0, dbc)) else: baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic - )*333) % 888 if Settings.getConfig().dynamic_db_table_names else 0 - for i in range(Settings.getConfig().max_dbs): + )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0 + for i in range(Config.getConfig().max_dbs): self._dbs.append(Database(baseDbNumber + i, dbc)) def pickDatabase(self): idxDb = 0 - if Settings.getConfig().max_dbs != 0 : - idxDb = Dice.throw(Settings.getConfig().max_dbs) # 0 to N-1 + if Config.getConfig().max_dbs != 0 : + idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1 db = self._dbs[idxDb] # type: Database return db @@ -705,7 +709,7 @@ class AnyState: def canDropDb(self): # If user requests to run up to a number of DBs, # we'd then not do drop_db operations any more - if Settings.getConfig().max_dbs > 0 or Settings.getConfig().use_shadow_db : + if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db : return False return self._info[self.CAN_DROP_DB] @@ -713,7 +717,7 @@ class AnyState: return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] def canDropFixedSuperTable(self): - if Settings.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table + if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] @@ -1110,7 +1114,7 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - Logging.info("Setting up TICKS to start from: {}".format(t4)) + Logging.debug("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod @@ -1126,7 +1130,7 @@ class Database: cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast # if : # should be quite a bit into the future - if Settings.getConfig().mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick + if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence return cls._lastLaggingTick else: # regular @@ -1310,8 +1314,8 @@ class Task(): # This case handled below already. # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service: # return True # We may get "network unavilable" when restarting service - elif Settings.getConfig().ignore_errors: # something is specified on command line - moreErrnos = [int(v, 0) for v in Settings.getConfig().ignore_errors.split(',')] + elif Config.getConfig().ignore_errors: # something is specified on command line + moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')] if errno in moreErrnos: return True elif errno == 0x200 : # invalid SQL, we need to div in a bit more @@ -1347,7 +1351,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) - if (Settings.getConfig().continue_on_exception): # user choose to continue + if (Config.getConfig().continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) self._err = err @@ -1362,7 +1366,7 @@ class Task(): self.__class__.__name__, errno2, err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) - if Settings.getConfig().debug: + if Config.getConfig().debug: # raise # so that we see full stack traceback.print_exc() print( @@ -1560,7 +1564,7 @@ class StateTransitionTask(Task): def getRegTableName(cls, i): if ( StateTransitionTask._baseTableNumber is None): # Set it one time StateTransitionTask._baseTableNumber = Dice.throw( - 999) if Settings.getConfig().dynamic_db_table_names else 0 + 999) if Config.getConfig().dynamic_db_table_names else 0 return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) def execute(self, wt: WorkerThread): @@ -1580,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # was: self.execWtSql(wt, "create database db") repStr = "" - if Settings.getConfig().num_replicas != 1: + if Config.getConfig().num_replicas != 1: # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N - numReplica = Settings.getConfig().num_replicas # fixed, always + numReplica = Config.getConfig().num_replicas # fixed, always repStr = "replica {}".format(numReplica) - updatePostfix = "update 1" if Settings.getConfig().verify_data else "" # allow update only when "verify data" is active + updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active dbName = self._db.getName() self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) - if dbName == "db_0" and Settings.getConfig().use_shadow_db: + if dbName == "db_0" and Config.getConfig().use_shadow_db: self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) class TaskDropDb(StateTransitionTask): @@ -1620,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask): sTable = self._db.getFixedSuperTable() # type: TdSuperTable # wt.execSql("use db") # should always be in place - sTable.create(wt.getDbConn(), - {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'}, - dropIfExists = True - ) + sTable.create(wt.getDbConn(), + {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, { + 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT}, + dropIfExists=True + ) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically @@ -1651,9 +1656,7 @@ class TdSuperTable: return dbc.existsSuperTable(self._stName) # TODO: odd semantic, create() method is usually static? - def create(self, dbc, cols: dict, tags: dict, - dropIfExists = False - ): + def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False): '''Creating a super table''' dbName = self._dbName @@ -1664,17 +1667,17 @@ class TdSuperTable: dbc.execute("DROP TABLE {}".format(fullTableName)) else: # error raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName)) - + # Now let's create sql = "CREATE TABLE {} ({})".format( fullTableName, - ",".join(['%s %s'%(k,v) for (k,v) in cols.items()])) - if tags is None : - sql += " TAGS (dummy int) " - else: + ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()])) + if tags : sql += " TAGS ({})".format( - ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) - ) + ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()]) + ) + else: + sql += " TAGS (dummy int) " dbc.execute(sql) def getRegTables(self, dbc: DbConn): @@ -1692,7 +1695,7 @@ class TdSuperTable: def hasRegTables(self, dbc: DbConn): return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 - def ensureTable(self, task: Task, dbc: DbConn, regTableName: str): + def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): dbName = self._dbName sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already @@ -1700,7 +1703,7 @@ class TdSuperTable: # acquire a lock first, so as to be able to *verify*. More details in TD-1471 fullTableName = dbName + '.' + regTableName - if task is not None: # optional lock + if task is not None: # TODO: what happens if we don't lock the table task.lockTable(fullTableName) Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table # print("(" + fullTableName[-3:] + ")", end="", flush=True) @@ -1892,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask): if Dice.throw(2) == 0: # print("_7_", end="", flush=True) tblSeq = list(range( - 2 + (self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES))) + 2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES))) random.shuffle(tblSeq) tickOutput = False # if we have spitted out a "d" character for "drop regular table" isSuccess = True @@ -1958,13 +1961,13 @@ class TaskRestartService(StateTransitionTask): @classmethod def canBeginFrom(cls, state: AnyState): - if Settings.getConfig().auto_start_service: + if Config.getConfig().auto_start_service: return state.canDropFixedSuperTable() # Basicallly when we have the super table return False # don't run this otherwise CHANCE_TO_RESTART_SERVICE = 200 def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - if not Settings.getConfig().auto_start_service: # only execute when we are in -a mode + if not Config.getConfig().auto_start_service: # only execute when we are in -a mode print("_a", end="", flush=True) return @@ -1991,7 +1994,7 @@ class TaskAddData(StateTransitionTask): @classmethod def prepToRecordOps(cls): - if Settings.getConfig().record_ops: + if Config.getConfig().record_ops: if (cls.fAddLogReady is None): Logging.info( "Recording in a file operations to be performed...") @@ -2009,7 +2012,7 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): - numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS fullTableName = db.getName() + '.' + regTableName sql = "INSERT INTO {} VALUES ".format(fullTableName) @@ -2021,13 +2024,13 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches - numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS for j in range(numRecords): # number of records per table nextInt = db.getNextInt() nextTick = db.getNextTick() nextColor = db.getNextColor() - if Settings.getConfig().record_ops: + if Config.getConfig().record_ops: self.prepToRecordOps() if self.fAddLogReady is None: raise CrashGenError("Unexpected empty fAddLogReady") @@ -2037,7 +2040,7 @@ class TaskAddData(StateTransitionTask): # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName - if Settings.getConfig().verify_data: + if Config.getConfig().verify_data: self.lockTable(fullTableName) # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written @@ -2050,7 +2053,7 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) # Quick hack, attach an update statement here. TODO: create an "update" task - if (not Settings.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB nextInt = db.getNextInt() nextColor = db.getNextColor() sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here @@ -2061,12 +2064,12 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) except: # Any exception at all - if Settings.getConfig().verify_data: + if Config.getConfig().verify_data: self.unlockTable(fullTableName) raise # Now read it back and verify, we might encounter an error if table is dropped - if Settings.getConfig().verify_data: # only if command line asks for it + if Config.getConfig().verify_data: # only if command line asks for it try: readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) @@ -2093,7 +2096,7 @@ class TaskAddData(StateTransitionTask): # Successfully wrote the data into the DB, let's record it somehow te.recordDataMark(nextInt) - if Settings.getConfig().record_ops: + if Config.getConfig().record_ops: if self.fAddLogDone is None: raise CrashGenError("Unexpected empty fAddLogDone") self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) @@ -2104,8 +2107,8 @@ class TaskAddData(StateTransitionTask): # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access db = self._db dbc = wt.getDbConn() - numTables = self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES - numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS tblSeq = list(range(numTables )) random.shuffle(tblSeq) # now we have random sequence for i in tblSeq: @@ -2120,7 +2123,7 @@ class TaskAddData(StateTransitionTask): regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) fullTableName = dbName + '.' + regTableName # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" - sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists + sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists # self._unlockTable(fullTableName) if Dice.throw(1) == 0: # 1 in 2 chance @@ -2264,7 +2267,7 @@ class ClientManager: global gContainer tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" - cfg = Settings.getConfig() + cfg = Config.getConfig() dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function thPool = ThreadPool(cfg.num_threads, cfg.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) @@ -2280,7 +2283,7 @@ class ClientManager: # Release global variables # gConfig = None - Settings.clearConfig() + Config.clearConfig() gSvcMgr = None logger = None @@ -2331,7 +2334,7 @@ class MainExec: def runClient(self): global gSvcMgr - if Settings.getConfig().auto_start_service: + if Config.getConfig().auto_start_service: gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert gSvcMgr.startTaosServices() # we start, don't run @@ -2346,20 +2349,12 @@ class MainExec: def runService(self): global gSvcMgr - gSvcMgr = self._svcMgr = ServiceManager(Settings.getConfig().num_dnodes) # save it in a global variable TODO: hack alert + gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr.run() # run to some end state gSvcMgr = self._svcMgr = None - def init(self): # TODO: refactor - global gContainer - gContainer = Container() # micky-mouse DI - - global gSvcMgr # TODO: refactor away - gSvcMgr = None - - # Super cool Python argument library: - # https://docs.python.org/3/library/argparse.html + def _buildCmdLineParser(self): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent('''\ @@ -2480,20 +2475,29 @@ class MainExec: action='store_true', help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') - # global gConfig - config = parser.parse_args() - Settings.setConfig(config) # TODO: fix this hack, consolidate this global var + return parser + + + def init(self): # TODO: refactor + global gContainer + gContainer = Container() # micky-mouse DI + + global gSvcMgr # TODO: refactor away + gSvcMgr = None + + parser = self._buildCmdLineParser() + Config.init(parser) # Sanity check for arguments - if Settings.getConfig().use_shadow_db and Settings.getConfig().max_dbs>1 : + if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 : raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") - Logging.clsInit(Settings.getConfig()) + Logging.clsInit(Config.getConfig().debug) Dice.seed(0) # initial seeding of dice def run(self): - if Settings.getConfig().run_tdengine: # run server + if Config.getConfig().run_tdengine: # run server try: self.runService() return 0 # success diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index f6784f17e3..95507f0142 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -19,10 +19,15 @@ except: sys.exit(-1) from queue import Queue, Empty -from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status -from crash_gen.db import DbConn, DbTarget -from crash_gen.settings import Settings -from crash_gen.types import DirPath +from .shared.config import Config +from .shared.db import DbTarget, DbConn +from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice +from .shared.types import DirPath + +# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status +# from crash_gen.db import DbConn, DbTarget +# from crash_gen.settings import Config +# from crash_gen.types import DirPath class TdeInstance(): """ @@ -173,7 +178,7 @@ quorum 2 def getServiceCmdLine(self): # to start the instance cmdLine = [] - if Settings.getConfig().track_memory_leaks: + if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") cmdLine = ['valgrind', '--leak-check=yes'] # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control @@ -789,22 +794,10 @@ class ServiceManagerThread: def stop(self): # can be called from both main thread or signal handler - # Linux will send Control-C generated SIGINT to the TDengine process - # already, ref: + # Linux will send Control-C generated SIGINT to the TDengine process already, ref: # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes - # if not self._tdeSubProcess: - # raise RuntimeError("sub process object missing") - - # self._status.set(Status.STATUS_STOPPING) - # TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what - # self._tdeSubProcess = None - # if not self._tdeSubProcess.stop(): # everything withing - # if self._tdeSubProcess.isRunning(): # still running, should now never happen - # Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( - # self._tdeSubProcess.getPid())) - # else: - # self._tdeSubProcess = None # not running any more - self.join() # stop the thread, change the status, etc. + + self.join() # stop the thread, status change moved to TdeSubProcess # Check if it's really stopped outputLines = 10 # for last output diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/shared/config.py similarity index 54% rename from tests/pytest/crash_gen/settings.py rename to tests/pytest/crash_gen/shared/config.py index 82214930b4..7b9f7c3873 100644 --- a/tests/pytest/crash_gen/settings.py +++ b/tests/pytest/crash_gen/shared/config.py @@ -1,17 +1,23 @@ from __future__ import annotations import argparse + from typing import Optional -from crash_gen.misc import CrashGenError +from .misc import CrashGenError + +# from crash_gen.misc import CrashGenError # gConfig: Optional[argparse.Namespace] -class Settings: +class Config: _config = None # type Optional[argparse.Namespace] @classmethod - def init(cls): - cls._config = None + def init(cls, parser: argparse.ArgumentParser): + if cls._config is not None: + raise CrashGenError("Config can only be initialized once") + cls._config = parser.parse_args() + # print(cls._config) @classmethod def setConfig(cls, config: argparse.Namespace): @@ -26,4 +32,11 @@ class Settings: @classmethod def clearConfig(cls): - cls._config = None \ No newline at end of file + cls._config = None + + @classmethod + def isSet(cls, cfgKey): + cfg = cls.getConfig() + if cfgKey not in cfg: + return False + return cfg.__getattribute__(cfgKey) \ No newline at end of file diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/shared/db.py similarity index 97% rename from tests/pytest/crash_gen/db.py rename to tests/pytest/crash_gen/shared/db.py index 6c85da0051..75931ace48 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/shared/db.py @@ -1,11 +1,13 @@ from __future__ import annotations import sys +import os +import datetime import time import threading import requests from requests.auth import HTTPBasicAuth -from crash_gen.types import QueryResult + import taos from util.sql import * @@ -13,13 +15,12 @@ from util.cases import * from util.dnodes import * from util.log import * -from .misc import Logging, CrashGenError, Helper, Dice -import os -import datetime import traceback # from .service_manager import TdeInstance -from crash_gen.settings import Settings +from .config import Config +from .misc import Logging, CrashGenError, Helper +from .types import QueryResult class DbConn: TYPE_NATIVE = "native-c" @@ -250,7 +251,13 @@ class MyTDSql: def _execInternal(self, sql): startTime = time.time() # Logging.debug("Executing SQL: " + sql) + # ret = None # TODO: use strong type here + # try: # Let's not capture the error, and let taos.error.ProgrammingError pass through ret = self._cursor.execute(sql) + # except taos.error.ProgrammingError as err: + # Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql)) + # raise CrashGenError(err.msg) + # print("\nSQL success: {}".format(sql)) queryTime = time.time() - startTime # Record the query time @@ -262,7 +269,7 @@ class MyTDSql: cls.lqStartTime = startTime # Now write to the shadow database - if Settings.getConfig().use_shadow_db: + if Config.isSet('use_shadow_db'): if sql[:11] == "INSERT INTO": if sql[:16] == "INSERT INTO db_0": sql2 = "INSERT INTO db_s" + sql[16:] diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/shared/misc.py similarity index 96% rename from tests/pytest/crash_gen/misc.py rename to tests/pytest/crash_gen/shared/misc.py index cb98bc7c5b..90ad802ff1 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/shared/misc.py @@ -47,7 +47,7 @@ class Logging: return cls.logger @classmethod - def clsInit(cls, gConfig): # TODO: refactor away gConfig + def clsInit(cls, debugMode: bool): if cls.logger: return @@ -62,12 +62,8 @@ class Logging: # print("setting logger variable") # global logger cls.logger = MyLoggingAdapter(_logger, {}) - - if (gConfig.debug): - cls.logger.setLevel(logging.DEBUG) # default seems to be INFO - else: - cls.logger.setLevel(logging.INFO) - + cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO + @classmethod def info(cls, msg): cls.logger.info(msg) diff --git a/tests/pytest/crash_gen/shared/types.py b/tests/pytest/crash_gen/shared/types.py new file mode 100644 index 0000000000..814a821917 --- /dev/null +++ b/tests/pytest/crash_gen/shared/types.py @@ -0,0 +1,28 @@ +from typing import Any, List, Dict, NewType +from enum import Enum + +DirPath = NewType('DirPath', str) + +QueryResult = NewType('QueryResult', List[List[Any]]) + +class TdDataType(Enum): + ''' + Use a Python Enum types of represent all the data types in TDengine. + + Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type + ''' + TIMESTAMP = 'TIMESTAMP' + INT = 'INT' + BIGINT = 'BIGINT' + FLOAT = 'FLOAT' + DOUBLE = 'DOUBLE' + BINARY = 'BINARY' + BINARY16 = 'BINARY(16)' # TODO: get rid of this hack + BINARY200 = 'BINARY(200)' + SMALLINT = 'SMALLINT' + TINYINT = 'TINYINT' + BOOL = 'BOOL' + NCHAR = 'NCHAR' + +TdColumns = Dict[str, TdDataType] +TdTags = Dict[str, TdDataType] diff --git a/tests/pytest/crash_gen/types.py b/tests/pytest/crash_gen/types.py deleted file mode 100644 index fdf8ea2478..0000000000 --- a/tests/pytest/crash_gen/types.py +++ /dev/null @@ -1,5 +0,0 @@ -from typing import Any, List, NewType - -DirPath = NewType('DirPath', str) - -QueryResult = NewType('QueryResult', List[List[Any]]) \ No newline at end of file -- GitLab