diff --git a/src/connector/python/linux/python3/taos/__init__.py b/src/connector/python/linux/python3/taos/__init__.py index 973263573808232e4e71dc0158585624a8e7d2ab..1b086f36ec5172beef8858358a436665806a8402 100644 --- a/src/connector/python/linux/python3/taos/__init__.py +++ b/src/connector/python/linux/python3/taos/__init__.py @@ -1,6 +1,7 @@ from .connection import TDengineConnection from .cursor import TDengineCursor +from .error import Error # Globals threadsafety = 0 diff --git a/tests/pytest/crash_gen/__init__.py b/tests/pytest/crash_gen/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..fe03bde35473600b27359b81aace21277224fac1 --- /dev/null +++ b/tests/pytest/crash_gen/__init__.py @@ -0,0 +1,2 @@ +# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645 +from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 44295e8bee72bd4af398569887da69ab06234be5..644aa799164acd4b44224ebd9ed30e39e722371c 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -1,6 +1,6 @@ # -----!/usr/bin/python3.7 ################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. +# Copyright (c) 2016-2021 by TAOS Technologies, Inc. # All rights reserved. # # This file is proprietary and confidential to TAOS Technologies. @@ -15,7 +15,7 @@ # https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel from __future__ import annotations -from typing import Set +from typing import Any, Set, Tuple from typing import Dict from typing import List from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none @@ -24,29 +24,34 @@ import textwrap import time import datetime import random -import logging import threading -import copy import argparse -import getopt import sys import os +import io import signal import traceback -import resource +import requests # from guppy import hpy import gc +import taos -from crash_gen.service_manager import ServiceManager, TdeInstance -from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager -import crash_gen.settings +from .shared.types import TdColumns, TdTags -import taos -import requests +# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess +# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \ +# CrashGenError, Progress, MyTDSql, \ +# TdeInstance -crash_gen.settings.init() +from .service_manager import ServiceManager, TdeInstance + +from .shared.config import Config +from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql +from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress +from .shared.types import TdDataType + +# Config.init() # Require Python 3 if sys.version_info[0] < 3: @@ -56,8 +61,8 @@ if sys.version_info[0] < 3: # Command-line/Environment Configurations, will set a bit later # ConfigNameSpace = argparse.Namespace -gConfig: argparse.Namespace -gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection +# gConfig: argparse.Namespace +gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injection # logger: logging.Logger gContainer: Container @@ -80,20 +85,20 @@ class WorkerThread: self._stepGate = threading.Event() # Let us have a DB connection of our own - if (gConfig.per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore # print("connector_type = {}".format(gConfig.connector_type)) tInst = gContainer.defTdeInstance - if gConfig.connector_type == 'native': + if Config.getConfig().connector_type == 'native': self._dbConn = DbConn.createNative(tInst.getDbTarget()) - elif gConfig.connector_type == 'rest': + elif Config.getConfig().connector_type == 'rest': self._dbConn = DbConn.createRest(tInst.getDbTarget()) - elif gConfig.connector_type == 'mixed': + elif Config.getConfig().connector_type == 'mixed': if Dice.throw(2) == 0: # 1/2 chance - self._dbConn = DbConn.createNative() + self._dbConn = DbConn.createNative(tInst.getDbTarget()) else: - self._dbConn = DbConn.createRest() + self._dbConn = DbConn.createRest(tInst.getDbTarget()) else: - raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type)) + raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type)) # self._dbInUse = False # if "use db" was executed already @@ -122,14 +127,14 @@ class WorkerThread: # self.isSleeping = False Logging.info("Starting to run thread: {}".format(self._tid)) - if (gConfig.per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore Logging.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() # clean up - if (gConfig.per_thread_db_connection): # type: ignore + if (Config.getConfig().per_thread_db_connection): # type: ignore if self._dbConn.isOpen: #sometimes it is not open self._dbConn.close() else: @@ -157,7 +162,7 @@ class WorkerThread: # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: - if (gConfig.per_thread_db_connection): # most likely TRUE + if (Config.getConfig().per_thread_db_connection): # most likely TRUE if not self._dbConn.isOpen: # might have been closed during server auto-restart self._dbConn.open() # self.useDb() # might encounter exceptions. TODO: catch @@ -231,7 +236,7 @@ class WorkerThread: return self.getDbConn().getQueryResult() def getDbConn(self) -> DbConn : - if (gConfig.per_thread_db_connection): + if (Config.getConfig().per_thread_db_connection): return self._dbConn else: return self._tc.getDbManager().getDbConn() @@ -253,7 +258,7 @@ class ThreadCoordinator: self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbManager = dbManager + self._dbManager = dbManager # type: Optional[DbManager] # may be freed self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -265,9 +270,13 @@ class ThreadCoordinator: self._stepStartTime = None # Track how long it takes to execute each step def getTaskExecutor(self): + if self._te is None: + raise CrashGenError("Unexpected empty TE") return self._te def getDbManager(self) -> DbManager: + if self._dbManager is None: + raise ChildProcessError("Unexpected empty _dbManager") return self._dbManager def crossStepBarrier(self, timeout=None): @@ -278,7 +287,7 @@ class ThreadCoordinator: self._execStats.registerFailure("User Interruption") def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): - maxSteps = gConfig.max_steps # type: ignore + maxSteps = Config.getConfig().max_steps # type: ignore if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 return True if self._runStatus != Status.STATUS_RUNNING: @@ -383,7 +392,7 @@ class ThreadCoordinator: hasAbortedTask = False workerTimeout = False while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): - if not gConfig.debug: # print this only if we are not in debug mode + if not Config.getConfig().debug: # print this only if we are not in debug mode Progress.emit(Progress.STEP_BOUNDARY) # print(".", end="", flush=True) # if (self._curStep % 2) == 0: # print memory usage once every 10 steps @@ -468,7 +477,7 @@ class ThreadCoordinator: self._pool = None self._te = None self._dbManager = None - self._executedTasks = None + self._executedTasks = [] self._lock = None self._stepBarrier = None self._execStats = None @@ -507,18 +516,18 @@ class ThreadCoordinator: ''' Initialize multiple databases, invoked at __ini__() time ''' self._dbs = [] # type: List[Database] dbc = self.getDbManager().getDbConn() - if gConfig.max_dbs == 0: + if Config.getConfig().max_dbs == 0: self._dbs.append(Database(0, dbc)) else: baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic - )*333) % 888 if gConfig.dynamic_db_table_names else 0 - for i in range(gConfig.max_dbs): + )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0 + for i in range(Config.getConfig().max_dbs): self._dbs.append(Database(baseDbNumber + i, dbc)) def pickDatabase(self): idxDb = 0 - if gConfig.max_dbs != 0 : - idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1 + if Config.getConfig().max_dbs != 0 : + idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1 db = self._dbs[idxDb] # type: Database return db @@ -562,7 +571,7 @@ class ThreadPool: workerThread._thread.join() def cleanup(self): - self.threadList = None # maybe clean up each? + self.threadList = [] # maybe clean up each? # A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers # for new table names @@ -672,7 +681,7 @@ class AnyState: # Each sub state tells us the "info", about itself, so we can determine # on things like canDropDB() - def getInfo(self): + def getInfo(self) -> List[Any]: raise RuntimeError("Must be overriden by child classes") def equals(self, other): @@ -700,7 +709,7 @@ class AnyState: def canDropDb(self): # If user requests to run up to a number of DBs, # we'd then not do drop_db operations any more - if gConfig.max_dbs > 0 or gConfig.use_shadow_db : + if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db : return False return self._info[self.CAN_DROP_DB] @@ -708,7 +717,7 @@ class AnyState: return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] def canDropFixedSuperTable(self): - if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table + if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] @@ -910,7 +919,7 @@ class StateMechine: # May be slow, use cautionsly... def getTaskTypes(self): # those that can run (directly/indirectly) from the current state - def typesToStrings(types): + def typesToStrings(types) -> List: ss = [] for t in types: ss.append(t.__name__) @@ -1029,13 +1038,14 @@ class StateMechine: # ref: # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ - def _weighted_choice_sub(self, weights): + def _weighted_choice_sub(self, weights) -> int: # TODO: use our dice to ensure it being determinstic? rnd = random.random() * sum(weights) for i, w in enumerate(weights): rnd -= w if rnd < 0: return i + raise CrashGenError("Unexpected no choice") class Database: ''' We use this to represent an actual TDengine database inside a service instance, @@ -1047,8 +1057,8 @@ class Database: ''' _clsLock = threading.Lock() # class wide lock _lastInt = 101 # next one is initial integer - _lastTick = 0 - _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions + _lastTick = None # Optional[datetime] + _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc self._dbNum = dbNum # we assign a number to databases, for our testing purpose @@ -1104,7 +1114,7 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - Logging.info("Setting up TICKS to start from: {}".format(t4)) + Logging.debug("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod @@ -1113,14 +1123,14 @@ class Database: Fetch a timestamp tick, with some random factor, may not be unique. ''' with cls._clsLock: # prevent duplicate tick - if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized + if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized # 10k at 1/20 chance, should be enough to avoid overlaps tick = cls.setupLastTick() cls._lastTick = tick cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast # if : # should be quite a bit into the future - if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick + if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence return cls._lastLaggingTick else: # regular @@ -1302,10 +1312,10 @@ class Task(): ]: return True # These are the ALWAYS-ACCEPTABLE ones # This case handled below already. - # elif (errno in [ 0x0B ]) and gConfig.auto_start_service: + # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service: # return True # We may get "network unavilable" when restarting service - elif gConfig.ignore_errors: # something is specified on command line - moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')] + elif Config.getConfig().ignore_errors: # something is specified on command line + moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')] if errno in moreErrnos: return True elif errno == 0x200 : # invalid SQL, we need to div in a bit more @@ -1341,7 +1351,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: errno2 = Helper.convertErrno(err.errno) - if (gConfig.continue_on_exception): # user choose to continue + if (Config.getConfig().continue_on_exception): # user choose to continue self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) self._err = err @@ -1356,7 +1366,7 @@ class Task(): self.__class__.__name__, errno2, err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) - if gConfig.debug: + if Config.getConfig().debug: # raise # so that we see full stack traceback.print_exc() print( @@ -1370,13 +1380,13 @@ class Task(): self._err = e self._aborted = True traceback.print_exc() - except BaseException as e: + except BaseException as e2: self.logInfo("Python base exception encountered") - self._err = e + # self._err = e2 # Exception/BaseException incompatible! self._aborted = True traceback.print_exc() - except BaseException: # TODO: what is this again??!! - raise RuntimeError("Punt") + # except BaseException: # TODO: what is this again??!! + # raise RuntimeError("Punt") # self.logDebug( # "[=] Unexpected exception, SQL: {}".format( # wt.getDbConn().getLastSql())) @@ -1421,11 +1431,11 @@ class Task(): class ExecutionStats: def __init__(self): # total/success times for a task - self._execTimes: Dict[str, [int, int]] = {} + self._execTimes: Dict[str, List[int]] = {} self._tasksInProgress = 0 self._lock = threading.Lock() - self._firstTaskStartTime = None - self._execStartTime = None + self._firstTaskStartTime = 0.0 + self._execStartTime = 0.0 self._errors = {} self._elapsedTime = 0.0 # total elapsed time self._accRunTime = 0.0 # accumulated run time @@ -1470,7 +1480,7 @@ class ExecutionStats: self._tasksInProgress -= 1 if self._tasksInProgress == 0: # all tasks have stopped self._accRunTime += (time.time() - self._firstTaskStartTime) - self._firstTaskStartTime = None + self._firstTaskStartTime = 0.0 def registerFailure(self, reason): self._failed = True @@ -1554,7 +1564,7 @@ class StateTransitionTask(Task): def getRegTableName(cls, i): if ( StateTransitionTask._baseTableNumber is None): # Set it one time StateTransitionTask._baseTableNumber = Dice.throw( - 999) if gConfig.dynamic_db_table_names else 0 + 999) if Config.getConfig().dynamic_db_table_names else 0 return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) def execute(self, wt: WorkerThread): @@ -1574,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # was: self.execWtSql(wt, "create database db") repStr = "" - if gConfig.num_replicas != 1: - # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N - numReplica = gConfig.num_replicas # fixed, always + if Config.getConfig().num_replicas != 1: + # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N + numReplica = Config.getConfig().num_replicas # fixed, always repStr = "replica {}".format(numReplica) - updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active + updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active dbName = self._db.getName() self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) - if dbName == "db_0" and gConfig.use_shadow_db: + if dbName == "db_0" and Config.getConfig().use_shadow_db: self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) class TaskDropDb(StateTransitionTask): @@ -1614,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask): sTable = self._db.getFixedSuperTable() # type: TdSuperTable # wt.execSql("use db") # should always be in place - sTable.create(wt.getDbConn(), - {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'}, - dropIfExists = True - ) + sTable.create(wt.getDbConn(), + {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, { + 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT}, + dropIfExists=True + ) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that # automatically @@ -1645,9 +1656,7 @@ class TdSuperTable: return dbc.existsSuperTable(self._stName) # TODO: odd semantic, create() method is usually static? - def create(self, dbc, cols: dict, tags: dict, - dropIfExists = False - ): + def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False): '''Creating a super table''' dbName = self._dbName @@ -1658,17 +1667,17 @@ class TdSuperTable: dbc.execute("DROP TABLE {}".format(fullTableName)) else: # error raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName)) - + # Now let's create sql = "CREATE TABLE {} ({})".format( fullTableName, - ",".join(['%s %s'%(k,v) for (k,v) in cols.items()])) - if tags is None : - sql += " TAGS (dummy int) " - else: + ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()])) + if tags : sql += " TAGS ({})".format( - ",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) - ) + ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()]) + ) + else: + sql += " TAGS (dummy int) " dbc.execute(sql) def getRegTables(self, dbc: DbConn): @@ -1686,7 +1695,7 @@ class TdSuperTable: def hasRegTables(self, dbc: DbConn): return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 - def ensureTable(self, task: Task, dbc: DbConn, regTableName: str): + def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): dbName = self._dbName sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already @@ -1694,7 +1703,7 @@ class TdSuperTable: # acquire a lock first, so as to be able to *verify*. More details in TD-1471 fullTableName = dbName + '.' + regTableName - if task is not None: # optional lock + if task is not None: # TODO: what happens if we don't lock the table task.lockTable(fullTableName) Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table # print("(" + fullTableName[-3:] + ")", end="", flush=True) @@ -1886,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask): if Dice.throw(2) == 0: # print("_7_", end="", flush=True) tblSeq = list(range( - 2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) + 2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES))) random.shuffle(tblSeq) tickOutput = False # if we have spitted out a "d" character for "drop regular table" isSuccess = True @@ -1952,13 +1961,13 @@ class TaskRestartService(StateTransitionTask): @classmethod def canBeginFrom(cls, state: AnyState): - if gConfig.auto_start_service: + if Config.getConfig().auto_start_service: return state.canDropFixedSuperTable() # Basicallly when we have the super table return False # don't run this otherwise CHANCE_TO_RESTART_SERVICE = 200 def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - if not gConfig.auto_start_service: # only execute when we are in -a mode + if not Config.getConfig().auto_start_service: # only execute when we are in -a mode print("_a", end="", flush=True) return @@ -1980,12 +1989,12 @@ class TaskAddData(StateTransitionTask): activeTable: Set[int] = set() # We use these two files to record operations to DB, useful for power-off tests - fAddLogReady = None # type: TextIOWrapper - fAddLogDone = None # type: TextIOWrapper + fAddLogReady = None # type: Optional[io.TextIOWrapper] + fAddLogDone = None # type: Optional[io.TextIOWrapper] @classmethod def prepToRecordOps(cls): - if gConfig.record_ops: + if Config.getConfig().record_ops: if (cls.fAddLogReady is None): Logging.info( "Recording in a file operations to be performed...") @@ -2003,7 +2012,7 @@ class TaskAddData(StateTransitionTask): return state.canAddData() def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS fullTableName = db.getName() + '.' + regTableName sql = "INSERT INTO {} VALUES ".format(fullTableName) @@ -2015,21 +2024,23 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS for j in range(numRecords): # number of records per table nextInt = db.getNextInt() nextTick = db.getNextTick() nextColor = db.getNextColor() - if gConfig.record_ops: + if Config.getConfig().record_ops: self.prepToRecordOps() + if self.fAddLogReady is None: + raise CrashGenError("Unexpected empty fAddLogReady") self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) self.fAddLogReady.flush() - os.fsync(self.fAddLogReady) + os.fsync(self.fAddLogReady.fileno()) # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName - if gConfig.verify_data: + if Config.getConfig().verify_data: self.lockTable(fullTableName) # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written @@ -2042,7 +2053,7 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) # Quick hack, attach an update statement here. TODO: create an "update" task - if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB nextInt = db.getNextInt() nextColor = db.getNextColor() sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here @@ -2053,12 +2064,12 @@ class TaskAddData(StateTransitionTask): dbc.execute(sql) except: # Any exception at all - if gConfig.verify_data: + if Config.getConfig().verify_data: self.unlockTable(fullTableName) raise # Now read it back and verify, we might encounter an error if table is dropped - if gConfig.verify_data: # only if command line asks for it + if Config.getConfig().verify_data: # only if command line asks for it try: readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) @@ -2085,17 +2096,19 @@ class TaskAddData(StateTransitionTask): # Successfully wrote the data into the DB, let's record it somehow te.recordDataMark(nextInt) - if gConfig.record_ops: + if Config.getConfig().record_ops: + if self.fAddLogDone is None: + raise CrashGenError("Unexpected empty fAddLogDone") self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() - os.fsync(self.fAddLogDone) + os.fsync(self.fAddLogDone.fileno()) def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access db = self._db dbc = wt.getDbConn() - numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES - numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS + numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES + numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS tblSeq = list(range(numTables )) random.shuffle(tblSeq) # now we have random sequence for i in tblSeq: @@ -2110,7 +2123,7 @@ class TaskAddData(StateTransitionTask): regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) fullTableName = dbName + '.' + regTableName # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" - sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists + sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists # self._unlockTable(fullTableName) if Dice.throw(1) == 0: # 1 in 2 chance @@ -2125,7 +2138,9 @@ class ThreadStacks: # stack info for all threads def __init__(self): self._allStacks = {} allFrames = sys._current_frames() - for th in threading.enumerate(): + for th in threading.enumerate(): + if th.ident is None: + continue stack = traceback.extract_stack(allFrames[th.ident]) self._allStacks[th.native_id] = stack @@ -2246,14 +2261,15 @@ class ClientManager: def run(self, svcMgr): # self._printLastNumbers() - global gConfig + # global gConfig # Prepare Tde Instance global gContainer tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" - dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function - thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + cfg = Config.getConfig() + dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function + thPool = ThreadPool(cfg.num_threads, cfg.max_steps) self.tc = ThreadCoordinator(thPool, dbManager) Logging.info("Starting client instance: {}".format(tInst)) @@ -2266,7 +2282,8 @@ class ClientManager: # Release global variables - gConfig = None + # gConfig = None + Config.clearConfig() gSvcMgr = None logger = None @@ -2297,7 +2314,7 @@ class ClientManager: class MainExec: def __init__(self): self._clientMgr = None - self._svcMgr = None # type: ServiceManager + self._svcMgr = None # type: Optional[ServiceManager] signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler) @@ -2317,7 +2334,7 @@ class MainExec: def runClient(self): global gSvcMgr - if gConfig.auto_start_service: + if Config.getConfig().auto_start_service: gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert gSvcMgr.startTaosServices() # we start, don't run @@ -2326,26 +2343,18 @@ class MainExec: try: ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside except requests.exceptions.ConnectionError as err: - Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) + Logging.warning("Failed to open REST connection to DB: {}".format(err)) # don't raise return ret def runService(self): global gSvcMgr - gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert + gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr.run() # run to some end state gSvcMgr = self._svcMgr = None - def init(self): # TODO: refactor - global gContainer - gContainer = Container() # micky-mouse DI - - global gSvcMgr # TODO: refactor away - gSvcMgr = None - - # Super cool Python argument library: - # https://docs.python.org/3/library/argparse.html + def _buildCmdLineParser(self): parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent('''\ @@ -2466,20 +2475,29 @@ class MainExec: action='store_true', help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') - global gConfig - gConfig = parser.parse_args() - crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var + return parser + + + def init(self): # TODO: refactor + global gContainer + gContainer = Container() # micky-mouse DI + + global gSvcMgr # TODO: refactor away + gSvcMgr = None + + parser = self._buildCmdLineParser() + Config.init(parser) # Sanity check for arguments - if gConfig.use_shadow_db and gConfig.max_dbs>1 : + if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 : raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") - Logging.clsInit(gConfig) + Logging.clsInit(Config.getConfig().debug) Dice.seed(0) # initial seeding of dice def run(self): - if gConfig.run_tdengine: # run server + if Config.getConfig().run_tdengine: # run server try: self.runService() return 0 # success diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index cdbf2db4dab81ab07b51597eb0a2805cc931e34e..95507f0142036f36f9a5249a48800bd481513844 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -1,25 +1,33 @@ +from __future__ import annotations + import os import io import sys +from enum import Enum import threading import signal import logging import time -import subprocess - -from typing import IO, List +from subprocess import PIPE, Popen, TimeoutExpired +from typing import BinaryIO, Generator, IO, List, NewType, Optional +import typing try: import psutil except: print("Psutil module needed, please install: sudo pip3 install psutil") sys.exit(-1) - from queue import Queue, Empty -from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress -from .db import DbConn, DbTarget -import crash_gen.settings +from .shared.config import Config +from .shared.db import DbTarget, DbConn +from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice +from .shared.types import DirPath + +# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status +# from crash_gen.db import DbConn, DbTarget +# from crash_gen.settings import Config +# from crash_gen.types import DirPath class TdeInstance(): """ @@ -68,7 +76,10 @@ class TdeInstance(): self._fepPort = fepPort self._tInstNum = tInstNum - self._smThread = ServiceManagerThread() + + # An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally + # self._smThread = ServiceManagerThread() + self._subProcess = None # type: Optional[TdeSubProcess] def getDbTarget(self): return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) @@ -153,23 +164,24 @@ quorum 2 def getExecFile(self): # .../taosd return self._buildDir + "/build/bin/taosd" - def getRunDir(self): # TODO: rename to "root dir" ?! - return self._buildDir + self._subdir + def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?! + return DirPath(self._buildDir + self._subdir) - def getCfgDir(self): # path, not file - return self.getRunDir() + "/cfg" + def getCfgDir(self) -> DirPath : # path, not file + return DirPath(self.getRunDir() + "/cfg") - def getLogDir(self): - return self.getRunDir() + "/log" + def getLogDir(self) -> DirPath : + return DirPath(self.getRunDir() + "/log") def getHostAddr(self): return "127.0.0.1" def getServiceCmdLine(self): # to start the instance cmdLine = [] - if crash_gen.settings.gConfig.track_memory_leaks: + if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") cmdLine = ['valgrind', '--leak-check=yes'] + # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return cmdLine @@ -196,27 +208,46 @@ quorum 2 dbc.close() def getStatus(self): - return self._smThread.getStatus() + # return self._smThread.getStatus() + if self._subProcess is None: + return Status(Status.STATUS_EMPTY) + return self._subProcess.getStatus() - def getSmThread(self): - return self._smThread + # def getSmThread(self): + # return self._smThread def start(self): - if not self.getStatus().isStopped(): + if self.getStatus().isActive(): raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus())) Logging.info("Starting TDengine instance: {}".format(self)) self.generateCfgFile() # service side generates config file, client does not self.rotateLogs() - self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions + # self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions + self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir()) def stop(self): - self._smThread.stop() + self._subProcess.stop() + self._subProcess = None def isFirst(self): return self._tInstNum == 0 + def printFirst10Lines(self): + if self._subProcess is None: + Logging.warning("Incorrect TI status for procIpcBatch-10 operation") + return + self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True) + + def procIpcBatch(self): + if self._subProcess is None: + Logging.warning("Incorrect TI status for procIpcBatch operation") + return + self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED + if self._subProcess.getStatus().isStopped(): + self._subProcess.stop() + self._subProcess = None class TdeSubProcess: """ @@ -225,42 +256,57 @@ class TdeSubProcess: It takes a TdeInstance object as its parameter, with the rationale being "a sub process runs an instance". + + We aim to ensure that this object has exactly the same life-cycle as the + underlying sub process. """ # RET_ALREADY_STOPPED = -1 # RET_TIME_OUT = -3 # RET_SUCCESS = -4 - def __init__(self): - self.subProcess = None # type: subprocess.Popen - # if tInst is None: - # raise CrashGenError("Empty instance not allowed in TdeSubProcess") - # self._tInst = tInst # Default create at ServiceManagerThread + def __init__(self, cmdLine: List[str], logDir: DirPath): + # Create the process + managing thread immediately - def __repr__(self): - if self.subProcess is None: - return '[TdeSubProc: Empty]' - return '[TdeSubProc: pid = {}]'.format(self.getPid()) + Logging.info("Attempting to start TAOS sub process...") + self._popen = self._start(cmdLine) # the actual sub process + self._smThread = ServiceManagerThread(self, logDir) # A thread to manage the sub process, mostly to process the IO + Logging.info("Successfully started TAOS process: {}".format(self)) - def getStdOut(self): - return self.subProcess.stdout - def getStdErr(self): - return self.subProcess.stderr - def isRunning(self): - return self.subProcess is not None + def __repr__(self): + # if self.subProcess is None: + # return '[TdeSubProc: Empty]' + return '[TdeSubProc: pid = {}, status = {}]'.format( + self.getPid(), self.getStatus() ) + + def getStdOut(self) -> BinaryIO : + if self._popen.universal_newlines : # alias of text_mode + raise CrashGenError("We need binary mode for STDOUT IPC") + # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout))) + return typing.cast(BinaryIO, self._popen.stdout) + + def getStdErr(self) -> BinaryIO : + if self._popen.universal_newlines : # alias of text_mode + raise CrashGenError("We need binary mode for STDERR IPC") + return typing.cast(BinaryIO, self._popen.stderr) + + # Now it's always running, since we matched the life cycle + # def isRunning(self): + # return self.subProcess is not None def getPid(self): - return self.subProcess.pid + return self._popen.pid - def start(self, cmdLine): + def _start(self, cmdLine) -> Popen : ON_POSIX = 'posix' in sys.builtin_module_names # Sanity check - if self.subProcess: # already there - raise RuntimeError("Corrupt process state") + # if self.subProcess: # already there + # raise RuntimeError("Corrupt process state") + # Prepare environment variables for coverage information # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment myEnv = os.environ.copy() @@ -270,15 +316,12 @@ class TdeSubProcess: # print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine via Shell: {}".format(cmdLineStr)) - useShell = True # Needed to pass environments into it - self.subProcess = subprocess.Popen( - # ' '.join(cmdLine) if useShell else cmdLine, - # shell=useShell, - ' '.join(cmdLine), - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - # bufsize=1, # not supported in binary mode + # useShell = True # Needed to pass environments into it + return Popen( + ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, + shell=True, # Always use shell, since we need to pass ENV vars + stdout=PIPE, + stderr=PIPE, close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF @@ -288,7 +331,9 @@ class TdeSubProcess: def stop(self): """ - Stop a sub process, DO NOT return anything, process all conditions INSIDE + Stop a sub process, DO NOT return anything, process all conditions INSIDE. + + Calling function should immediately delete/unreference the object Common POSIX signal values (from man -7 signal): SIGHUP 1 @@ -306,29 +351,39 @@ class TdeSubProcess: SIGSEGV 11 SIGUSR2 12 """ - if not self.subProcess: - Logging.error("Sub process already stopped") + # self._popen should always be valid. + + Logging.info("Terminating TDengine service running as the sub process...") + if self.getStatus().isStopped(): + Logging.info("Service already stopped") + return + if self.getStatus().isStopping(): + Logging.info("Service is already being stopped, pid: {}".format(self.getPid())) return - retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N) + self.setStatus(Status.STATUS_STOPPING) + + retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N) if retCode: # valid return code, process ended # retCode = -retCode # only if valid Logging.warning("TSP.stop(): process ended itself") - self.subProcess = None + # self.subProcess = None return # process still alive, let's interrupt it - self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception - self.subProcess = None + self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception + + # sub process should end, then IPC queue should end, causing IO thread to end + self._smThread.stop() # stop for sure too - # sub process should end, then IPC queue should end, causing IO thread to end + self.setStatus(Status.STATUS_STOPPED) @classmethod - def _stopForSure(cls, proc: subprocess.Popen, sig: int): + def _stopForSure(cls, proc: Popen, sig: int): ''' Stop a process and all sub processes with a singal, and SIGKILL if necessary ''' - def doKillTdService(proc: subprocess.Popen, sig: int): + def doKillTdService(proc: Popen, sig: int): Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig)) proc.send_signal(sig) try: @@ -340,7 +395,7 @@ class TdeSubProcess: else: Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) return True # terminated successfully - except subprocess.TimeoutExpired as err: + except TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig)) return False # failed to terminate @@ -349,22 +404,22 @@ class TdeSubProcess: Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig)) child.send_signal(sig) try: - retCode = child.wait(20) - if (- retCode) == signal.SIGSEGV: # Crashed + retCode = child.wait(20) # type: ignore + if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid)) - elif (- retCode) == sig : + elif (- retCode) == sig : # type: ignore Logging.info("Sub-sub process terminated with expected return code {}".format(sig)) else: - Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) + Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore return True # terminated successfully except psutil.TimeoutExpired as err: Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig)) return False # did not terminate - def doKill(proc: subprocess.Popen, sig: int): + def doKill(proc: Popen, sig: int): pid = proc.pid try: - topSubProc = psutil.Process(pid) + topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False Logging.warning("Unexpected child to be killed") doKillChild(child, sig) @@ -389,19 +444,26 @@ class TdeSubProcess: return doKill(proc, sig) def hardKill(proc): - return doKill(proc, signal.SIGKILL) - - + return doKill(proc, signal.SIGKILL) pid = proc.pid Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig)) if softKill(proc, sig): - return# success + return # success if sig != signal.SIGKILL: # really was soft above if hardKill(proc): - return + return raise CrashGenError("Failed to stop process, pid={}".format(pid)) + def getStatus(self): + return self._smThread.getStatus() + + def setStatus(self, status): + self._smThread.setStatus(status) + + def procIpcBatch(self, trimToTarget=0, forceOutput=False): + self._smThread.procIpcBatch(trimToTarget, forceOutput) + class ServiceManager: PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process @@ -498,10 +560,10 @@ class ServiceManager: def isActive(self): """ Determine if the service/cluster is active at all, i.e. at least - one thread is not "stopped". + one instance is active """ for ti in self._tInsts: - if not ti.getStatus().isStopped(): + if ti.getStatus().isActive(): return True return False @@ -539,10 +601,10 @@ class ServiceManager: # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here status = ti.getStatus() if status.isRunning(): - th = ti.getSmThread() - th.procIpcBatch() # regular processing, + # th = ti.getSmThread() + ti.procIpcBatch() # regular processing, if status.isStopped(): - th.procIpcBatch() # one last time? + ti.procIpcBatch() # one last time? # self._updateThreadStatus() time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round @@ -572,7 +634,8 @@ class ServiceManager: if not ti.isFirst(): tFirst = self._getFirstInstance() tFirst.createDnode(ti.getDbTarget()) - ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines + ti.printFirst10Lines() + # ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines def stopTaosServices(self): with self._lock: @@ -618,21 +681,24 @@ class ServiceManagerThread: """ MAX_QUEUE_SIZE = 10000 - def __init__(self): + def __init__(self, subProc: TdeSubProcess, logDir: str): # Set the sub process - self._tdeSubProcess = None # type: TdeSubProcess + # self._tdeSubProcess = None # type: TdeSubProcess # Arrange the TDengine instance # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based # self._tInst = tInst or TdeInstance() # Need an instance - self._thread = None # The actual thread, # type: threading.Thread - self._thread2 = None # watching stderr + # self._thread = None # type: Optional[threading.Thread] # The actual thread, # type: threading.Thread + # self._thread2 = None # type: Optional[threading.Thread] Thread # watching stderr self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually. + self._start(subProc, logDir) + def __repr__(self): - return "[SvcMgrThread: status={}, subProc={}]".format( - self.getStatus(), self._tdeSubProcess) + raise CrashGenError("SMT status moved to TdeSubProcess") + # return "[SvcMgrThread: status={}, subProc={}]".format( + # self.getStatus(), self._tdeSubProcess) def getStatus(self): ''' @@ -640,30 +706,33 @@ class ServiceManagerThread: ''' return self._status + def setStatus(self, statusVal: int): + self._status.set(statusVal) + # Start the thread (with sub process), and wait for the sub service # to become fully operational - def start(self, cmdLine : str, logDir: str): + def _start(self, subProc :TdeSubProcess, logDir: str): ''' Request the manager thread to start a new sub process, and manage it. :param cmdLine: the command line to invoke :param logDir: the logging directory, to hold stdout/stderr files ''' - if self._thread: - raise RuntimeError("Unexpected _thread") - if self._tdeSubProcess: - raise RuntimeError("TDengine sub process already created/running") + # if self._thread: + # raise RuntimeError("Unexpected _thread") + # if self._tdeSubProcess: + # raise RuntimeError("TDengine sub process already created/running") - Logging.info("Attempting to start TAOS service: {}".format(self)) + # Moved to TdeSubProcess + # Logging.info("Attempting to start TAOS service: {}".format(self)) self._status.set(Status.STATUS_STARTING) - self._tdeSubProcess = TdeSubProcess() - self._tdeSubProcess.start(cmdLine) # TODO: verify process is running + # self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running - self._ipcQueue = Queue() + self._ipcQueue = Queue() # type: Queue self._thread = threading.Thread( # First thread captures server OUTPUT target=self.svcOutputReader, - args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir)) + args=(subProc.getStdOut(), self._ipcQueue, logDir)) self._thread.daemon = True # thread dies with the program self._thread.start() time.sleep(0.01) @@ -675,7 +744,7 @@ class ServiceManagerThread: self._thread2 = threading.Thread( # 2nd thread captures server ERRORs target=self.svcErrorReader, - args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir)) + args=(subProc.getStdErr(), self._ipcQueue, logDir)) self._thread2.daemon = True # thread dies with the program self._thread2.start() time.sleep(0.01) @@ -690,14 +759,14 @@ class ServiceManagerThread: Progress.emit(Progress.SERVICE_START_NAP) # print("_zz_", end="", flush=True) if self._status.isRunning(): - Logging.info("[] TDengine service READY to process requests") - Logging.info("[] TAOS service started: {}".format(self)) + Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid())) + # Logging.info("[] TAOS service started: {}".format(self)) # self._verifyDnode(self._tInst) # query and ensure dnode is ready # Logging.debug("[] TAOS Dnode verified: {}".format(self)) return # now we've started # TODO: handle failure-to-start better? self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output - raise RuntimeError("TDengine service did not start successfully: {}".format(self)) + raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid())) def _verifyDnode(self, tInst: TdeInstance): dbc = DbConn.createNative(tInst.getDbTarget()) @@ -717,70 +786,45 @@ class ServiceManagerThread: break if not isValid: print("Failed to start dnode, sleep for a while") - time.sleep(600) + time.sleep(10.0) raise RuntimeError("Failed to start Dnode, expected port not found: {}". format(tInst.getPort())) dbc.close() def stop(self): # can be called from both main thread or signal handler - Logging.info("Terminating TDengine service running as the sub process...") - if self.getStatus().isStopped(): - Logging.info("Service already stopped") - return - if self.getStatus().isStopping(): - Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid())) - return - # Linux will send Control-C generated SIGINT to the TDengine process - # already, ref: + + # Linux will send Control-C generated SIGINT to the TDengine process already, ref: # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes - if not self._tdeSubProcess: - raise RuntimeError("sub process object missing") - - self._status.set(Status.STATUS_STOPPING) - # retCode = self._tdeSubProcess.stop() - # try: - # retCode = self._tdeSubProcess.stop() - # # print("Attempted to stop sub process, got return code: {}".format(retCode)) - # if retCode == signal.SIGSEGV : # SGV - # Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") - # except subprocess.TimeoutExpired as err: - # Logging.info("Time out waiting for TDengine service process to exit") - if not self._tdeSubProcess.stop(): # everything withing - if self._tdeSubProcess.isRunning(): # still running, should now never happen - Logging.error("FAILED to stop sub process, it is still running... pid = {}".format( - self._tdeSubProcess.getPid())) - else: - self._tdeSubProcess = None # not running any more - self.join() # stop the thread, change the status, etc. + + self.join() # stop the thread, status change moved to TdeSubProcess # Check if it's really stopped outputLines = 10 # for last output if self.getStatus().isStopped(): self.procIpcBatch(outputLines) # one last time - Logging.debug("End of TDengine Service Output: {}".format(self)) + Logging.debug("End of TDengine Service Output") Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n") else: - print("WARNING: SMT did not terminate as expected: {}".format(self)) + print("WARNING: SMT did not terminate as expected") def join(self): # TODO: sanity check - if not self.getStatus().isStopping(): + s = self.getStatus() + if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others + if self._thread or self._thread2 : + if self._thread: + self._thread.join() + self._thread = None + if self._thread2: # STD ERR thread + self._thread2.join() + self._thread2 = None + else: + Logging.warning("Joining empty thread, doing nothing") + else: raise RuntimeError( "SMT.Join(): Unexpected status: {}".format(self._status)) - if self._thread or self._thread2 : - if self._thread: - self._thread.join() - self._thread = None - if self._thread2: # STD ERR thread - self._thread2.join() - self._thread2 = None - else: - print("Joining empty thread, doing nothing") - - self._status.set(Status.STATUS_STOPPED) - def _trimQueue(self, targetSize): if targetSize <= 0: return # do nothing @@ -799,6 +843,10 @@ class ServiceManagerThread: TD_READY_MSG = "TDengine is initialized successfully" def procIpcBatch(self, trimToTarget=0, forceOutput=False): + ''' + Process a batch of STDOUT/STDERR data, until we read EMPTY from + the queue. + ''' self._trimQueue(trimToTarget) # trim if necessary # Process all the output generated by the underlying sub process, # managed by IO thread @@ -827,35 +875,54 @@ class ServiceManagerThread: print(pBar, end="", flush=True) print('\b\b\b\b', end="", flush=True) - def svcOutputReader(self, out: IO, queue, logDir: str): + BinaryChunk = NewType('BinaryChunk', bytes) # line with binary data, directly from STDOUT, etc. + TextChunk = NewType('TextChunk', str) # properly decoded, suitable for printing, etc. + + @classmethod + def _decodeBinaryChunk(cls, bChunk: bytes) -> Optional[TextChunk] : + try: + tChunk = bChunk.decode("utf-8").rstrip() + return cls.TextChunk(tChunk) + except UnicodeError: + print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437'))) + return None + + def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str + ) -> Generator[TextChunk, None, None]: + ''' + Take an input stream with binary data, produced a generator of decoded + "text chunks", and also save the original binary data in a log file. + ''' + os.makedirs(logDir, exist_ok=True) + logF = open(os.path.join(logDir, logFile), 'wb') + for bChunk in iter(streamIn.readline, b''): + logF.write(bChunk) # Write to log file immediately + tChunk = self._decodeBinaryChunk(bChunk) # decode + if tChunk is not None: + yield tChunk # TODO: split into actual text lines + + # At the end... + streamIn.close() # Close the stream + logF.close() # Close the output file + + def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str): ''' The infinite routine that processes the STDOUT stream for the sub process being managed. - :param out: the IO stream object used to fetch the data from - :param queue: the queue where we dump the roughly parsed line-by-line data + :param stdOut: the IO stream object used to fetch the data from + :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data :param logDir: where we should dump a verbatim output file ''' - os.makedirs(logDir, exist_ok=True) - logFile = os.path.join(logDir,'stdout.log') - fOut = open(logFile, 'wb') + # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # print("This is the svcOutput Reader...") - # for line in out : - for line in iter(out.readline, b''): - fOut.write(line) - # print("Finished reading a line: {}".format(line)) - # print("Adding item to queue...") - try: - line = line.decode("utf-8").rstrip() - except UnicodeError: - print("\nNon-UTF8 server output: {}\n".format(line)) - - # This might block, and then causing "out" buffer to block - queue.put(line) + # stdOut.readline() # Skip the first output? TODO: remove? + for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') : + queue.put(tChunk) # tChunk garanteed not to be None self._printProgress("_i") if self._status.isStarting(): # we are starting, let's see if we have started - if line.find(self.TD_READY_MSG) != -1: # found + if tChunk.find(self.TD_READY_MSG) != -1: # found Logging.info("Waiting for the service to become FULLY READY") time.sleep(1.0) # wait for the server to truly start. TODO: remove this Logging.info("Service is now FULLY READY") # TODO: more ID info here? @@ -869,18 +936,17 @@ class ServiceManagerThread: print("_w", end="", flush=True) # queue.put(line) - # meaning sub process must have died - Logging.info("EOF for TDengine STDOUT: {}".format(self)) - out.close() # Close the stream - fOut.close() # Close the output file - - def svcErrorReader(self, err: IO, queue, logDir: str): - os.makedirs(logDir, exist_ok=True) - logFile = os.path.join(logDir,'stderr.log') - fErr = open(logFile, 'wb') - for line in iter(err.readline, b''): - fErr.write(line) - Logging.info("TDengine STDERR: {}".format(line)) - Logging.info("EOF for TDengine STDERR: {}".format(self)) - err.close() - fErr.close() \ No newline at end of file + # stdOut has no more data, meaning sub process must have died + Logging.info("EOF found TDengine STDOUT, marking the process as terminated") + self.setStatus(Status.STATUS_STOPPED) + + def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str): + # os.makedirs(logDir, exist_ok=True) + # logFile = os.path.join(logDir,'stderr.log') + # fErr = open(logFile, 'wb') + # for line in iter(err.readline, b''): + for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') : + queue.put(tChunk) # tChunk garanteed not to be None + # fErr.write(line) + Logging.info("TDengine STDERR: {}".format(tChunk)) + Logging.info("EOF for TDengine STDERR") diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py deleted file mode 100644 index 3c4c91e6e077c325c53d15918624db783957fc20..0000000000000000000000000000000000000000 --- a/tests/pytest/crash_gen/settings.py +++ /dev/null @@ -1,8 +0,0 @@ -from __future__ import annotations -import argparse - -gConfig: argparse.Namespace - -def init(): - global gConfig - gConfig = [] \ No newline at end of file diff --git a/tests/pytest/crash_gen/shared/config.py b/tests/pytest/crash_gen/shared/config.py new file mode 100644 index 0000000000000000000000000000000000000000..7b9f7c3873a3b5d30cc36384bf7a486454c864cb --- /dev/null +++ b/tests/pytest/crash_gen/shared/config.py @@ -0,0 +1,42 @@ +from __future__ import annotations +import argparse + +from typing import Optional + +from .misc import CrashGenError + +# from crash_gen.misc import CrashGenError + +# gConfig: Optional[argparse.Namespace] + +class Config: + _config = None # type Optional[argparse.Namespace] + + @classmethod + def init(cls, parser: argparse.ArgumentParser): + if cls._config is not None: + raise CrashGenError("Config can only be initialized once") + cls._config = parser.parse_args() + # print(cls._config) + + @classmethod + def setConfig(cls, config: argparse.Namespace): + cls._config = config + + @classmethod + # TODO: check items instead of exposing everything + def getConfig(cls) -> argparse.Namespace: + if cls._config is None: + raise CrashGenError("invalid state") + return cls._config + + @classmethod + def clearConfig(cls): + cls._config = None + + @classmethod + def isSet(cls, cfgKey): + cfg = cls.getConfig() + if cfgKey not in cfg: + return False + return cfg.__getattribute__(cfgKey) \ No newline at end of file diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/shared/db.py similarity index 93% rename from tests/pytest/crash_gen/db.py rename to tests/pytest/crash_gen/shared/db.py index 62a369c41a7ed0d73ab847232a206c2cabb53d53..75931ace48ed65708c7dfa97d01a426a0baa8203 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/shared/db.py @@ -1,24 +1,26 @@ from __future__ import annotations import sys +import os +import datetime import time import threading import requests from requests.auth import HTTPBasicAuth + import taos from util.sql import * from util.cases import * from util.dnodes import * from util.log import * -from .misc import Logging, CrashGenError, Helper, Dice -import os -import datetime import traceback # from .service_manager import TdeInstance -import crash_gen.settings +from .config import Config +from .misc import Logging, CrashGenError, Helper +from .types import QueryResult class DbConn: TYPE_NATIVE = "native-c" @@ -79,7 +81,7 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") nRows = self.query(sql) if nRows != 1: - raise taos.error.ProgrammingError( + raise CrashGenError( "Unexpected result for query: {}, rows = {}".format(sql, nRows), (CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT) ) @@ -115,7 +117,7 @@ class DbConn: try: self.execute(sql) return True # ignore num of results, return success - except taos.error.ProgrammingError as err: + except taos.error.Error as err: return False # failed, for whatever TAOS reason # Not possile to reach here, non-TAOS exception would have been thrown @@ -126,7 +128,7 @@ class DbConn: def openByType(self): raise RuntimeError("Unexpected execution, should be overriden") - def getQueryResult(self): + def getQueryResult(self) -> QueryResult : raise RuntimeError("Unexpected execution, should be overriden") def getResultRows(self): @@ -221,7 +223,7 @@ class DbConnRest(DbConn): class MyTDSql: # Class variables _clsLock = threading.Lock() # class wide locking - longestQuery = None # type: str + longestQuery = '' # type: str longestQueryTime = 0.0 # seconds lqStartTime = 0.0 # lqEndTime = 0.0 # Not needed, as we have the two above already @@ -249,7 +251,13 @@ class MyTDSql: def _execInternal(self, sql): startTime = time.time() # Logging.debug("Executing SQL: " + sql) + # ret = None # TODO: use strong type here + # try: # Let's not capture the error, and let taos.error.ProgrammingError pass through ret = self._cursor.execute(sql) + # except taos.error.ProgrammingError as err: + # Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql)) + # raise CrashGenError(err.msg) + # print("\nSQL success: {}".format(sql)) queryTime = time.time() - startTime # Record the query time @@ -261,7 +269,7 @@ class MyTDSql: cls.lqStartTime = startTime # Now write to the shadow database - if crash_gen.settings.gConfig.use_shadow_db: + if Config.isSet('use_shadow_db'): if sql[:11] == "INSERT INTO": if sql[:16] == "INSERT INTO db_0": sql2 = "INSERT INTO db_s" + sql[16:] @@ -453,31 +461,11 @@ class DbManager(): ''' Release the underlying DB connection upon deletion of DbManager ''' self.cleanUp() - def getDbConn(self): + def getDbConn(self) -> DbConn : + if self._dbConn is None: + raise CrashGenError("Unexpected empty DbConn") return self._dbConn - # TODO: not used any more, to delete - def pickAndAllocateTable(self): # pick any table, and "use" it - return self.tableNumQueue.pickAndAllocate() - - # TODO: Not used any more, to delete - def addTable(self): - with self._lock: - tIndex = self.tableNumQueue.push() - return tIndex - - # Not used any more, to delete - def releaseTable(self, i): # return the table back, so others can use it - self.tableNumQueue.release(i) - - # TODO: not used any more, delete - def getTableNameToDelete(self): - tblNum = self.tableNumQueue.pop() # TODO: race condition! - if (not tblNum): # maybe false - return False - - return "table_{}".format(tblNum) - def cleanUp(self): if self._dbConn: self._dbConn.close() diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/shared/misc.py similarity index 90% rename from tests/pytest/crash_gen/misc.py rename to tests/pytest/crash_gen/shared/misc.py index 9774ec5455961392d82ea2b4b59c0657b5704f9a..90ad802ff1c130439cdc5f5587ecd0607e3e116b 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/shared/misc.py @@ -3,6 +3,7 @@ import random import logging import os import sys +from typing import Optional import taos @@ -39,14 +40,14 @@ class MyLoggingAdapter(logging.LoggerAdapter): class Logging: - logger = None + logger = None # type: Optional[MyLoggingAdapter] @classmethod def getLogger(cls): - return logger + return cls.logger @classmethod - def clsInit(cls, gConfig): # TODO: refactor away gConfig + def clsInit(cls, debugMode: bool): if cls.logger: return @@ -60,13 +61,9 @@ class Logging: # Logging adapter, to be used as a logger # print("setting logger variable") # global logger - cls.logger = MyLoggingAdapter(_logger, []) - - if (gConfig.debug): - cls.logger.setLevel(logging.DEBUG) # default seems to be INFO - else: - cls.logger.setLevel(logging.INFO) - + cls.logger = MyLoggingAdapter(_logger, {}) + cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO + @classmethod def info(cls, msg): cls.logger.info(msg) @@ -84,6 +81,7 @@ class Logging: cls.logger.error(msg) class Status: + STATUS_EMPTY = 99 STATUS_STARTING = 1 STATUS_RUNNING = 2 STATUS_STOPPING = 3 @@ -95,12 +93,16 @@ class Status: def __repr__(self): return "[Status: v={}]".format(self._status) - def set(self, status): + def set(self, status: int): self._status = status def get(self): return self._status + def isEmpty(self): + ''' Empty/Undefined ''' + return self._status == Status.STATUS_EMPTY + def isStarting(self): return self._status == Status.STATUS_STARTING @@ -117,6 +119,9 @@ class Status: def isStable(self): return self.isRunning() or self.isStopped() + def isActive(self): + return self.isStarting() or self.isRunning() or self.isStopping() + # Deterministic random number generator class Dice(): seeded = False # static, uninitialized diff --git a/tests/pytest/crash_gen/shared/types.py b/tests/pytest/crash_gen/shared/types.py new file mode 100644 index 0000000000000000000000000000000000000000..814a8219178d1c01ee0291447939984bdf81df9d --- /dev/null +++ b/tests/pytest/crash_gen/shared/types.py @@ -0,0 +1,28 @@ +from typing import Any, List, Dict, NewType +from enum import Enum + +DirPath = NewType('DirPath', str) + +QueryResult = NewType('QueryResult', List[List[Any]]) + +class TdDataType(Enum): + ''' + Use a Python Enum types of represent all the data types in TDengine. + + Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type + ''' + TIMESTAMP = 'TIMESTAMP' + INT = 'INT' + BIGINT = 'BIGINT' + FLOAT = 'FLOAT' + DOUBLE = 'DOUBLE' + BINARY = 'BINARY' + BINARY16 = 'BINARY(16)' # TODO: get rid of this hack + BINARY200 = 'BINARY(200)' + SMALLINT = 'SMALLINT' + TINYINT = 'TINYINT' + BOOL = 'BOOL' + NCHAR = 'NCHAR' + +TdColumns = Dict[str, TdDataType] +TdTags = Dict[str, TdDataType] diff --git a/tests/pytest/perf_gen.py b/tests/pytest/perf_gen.py new file mode 100755 index 0000000000000000000000000000000000000000..f0402fbb6b86c3335f5b2a739bd7545886c8f0d1 --- /dev/null +++ b/tests/pytest/perf_gen.py @@ -0,0 +1,485 @@ +#!/usr/bin/python3.8 + +from abc import abstractmethod + +import time +from datetime import datetime + +from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi +from influxdb_client.client.write_api import SYNCHRONOUS + +import argparse +import textwrap +import subprocess +import sys + +import taos + +from crash_gen.crash_gen_main import Database, TdSuperTable +from crash_gen.service_manager import TdeInstance + +from crash_gen.shared.config import Config +from crash_gen.shared.db import DbConn +from crash_gen.shared.misc import Dice, Logging, Helper +from crash_gen.shared.types import TdDataType + + +# NUM_PROCESSES = 10 +# NUM_REPS = 1000 + +tick = int(time.time() - 5000000.0) # for now we will create max 5M record +value = 101 + +DB_NAME = 'mydb' +TIME_SERIES_NAME = 'widget' + +MAX_SHELF = 500 # shelf number runs up to this, non-inclusive +ITEMS_PER_SHELF = 5 +BATCH_SIZE = 2000 # Number of data points per request + +# None_RW: +# INFLUX_TOKEN='RRzVQZs8ERCpV9cS2RXqgtM_Y6FEZuJ7Tuk0aHtZItFTfcM9ajixtGDhW8HzqNIBmG3hmztw-P4sHOstfJvjFA==' +# DevOrg_RW: +# INFLUX_TOKEN='o1P8sEhBmXKhxBmNuiCyOUKv8d7qm5wUjMff9AbskBu2LcmNPQzU77NrAn5hDil8hZ0-y1AGWpzpL-4wqjFdkA==' +# DevOrg_All_Access +INFLUX_TOKEN='T2QTr4sloJhINH_oSrwSS-WIIZYjDfD123NK4ou3b7ajRs0c0IphCh3bNc0OsDZQRW1HyCby7opdEndVYFGTWQ==' +INFLUX_ORG="DevOrg" +INFLUX_BUCKET="Bucket01" + +def writeTaosBatch(dbc, tblName): + # Database.setupLastTick() + global value, tick + + data = [] + for i in range(0, 100): + data.append("('{}', {})".format(Database.getNextTick(), value) ) + value += 1 + + sql = "INSERT INTO {} VALUES {}".format(tblName, ''.join(data)) + dbc.execute(sql) + +class PerfGenError(taos.error.ProgrammingError): + pass + +class Benchmark(): + + # @classmethod + # def create(cls, dbType): + # if dbType == 'taos': + # return TaosBenchmark() + # elif dbType == 'influx': + # return InfluxBenchmark() + # else: + # raise RuntimeError("Unknown DB type: {}".format(dbType)) + + def __init__(self, dbType, loopCount = 0): + self._dbType = dbType + self._setLoopCount(loopCount) + + def _setLoopCount(self, loopCount): + cfgLoopCount = Config.getConfig().loop_count + if loopCount == 0: # use config + self._loopCount = cfgLoopCount + else: + if cfgLoopCount : + Logging.warning("Ignoring loop count for fixed-loop-count benchmarks: {}".format(cfgLoopCount)) + self._loopCount = loopCount + + @abstractmethod + def doIterate(self): + ''' + Execute the benchmark directly, without invoking sub processes, + effectively using one execution thread. + ''' + pass + + @abstractmethod + def prepare(self): + ''' + Preparation needed to run a certain benchmark + ''' + pass + + @abstractmethod + def execute(self): + ''' + Actually execute the benchmark + ''' + Logging.warning("Unexpected execution") + + @property + def name(self): + return self.__class__.__name__ + + def run(self): + print("Running benchmark: {}, class={} ...".format(self.name, self.__class__)) + startTime = time.time() + + # Prepare to execute the benchmark + self.prepare() + + # Actually execute the benchmark + self.execute() + + # if Config.getConfig().iterate_directly: # execute directly + # Logging.debug("Iterating...") + # self.doIterate() + # else: + # Logging.debug("Executing via sub process...") + # startTime = time.time() + # self.prepare() + # self.spawnProcesses() + # self.waitForProcecess() + # duration = time.time() - startTime + # Logging.info("Benchmark execution completed in {:.3f} seconds".format(duration)) + Logging.info("Benchmark {} finished in {:.3f} seconds".format( + self.name, time.time()-startTime)) + + def spawnProcesses(self): + self._subProcs = [] + for j in range(0, Config.getConfig().subprocess_count): + ON_POSIX = 'posix' in sys.builtin_module_names + tblName = 'cars_reg_{}'.format(j) + cmdLineStr = './perf_gen.sh -t {} -i -n {} -l {}'.format( + self._dbType, + tblName, + Config.getConfig().loop_count + ) + if Config.getConfig().debug: + cmdLineStr += ' -d' + subProc = subprocess.Popen(cmdLineStr, + shell = True, + close_fds = ON_POSIX) + self._subProcs.append(subProc) + + def waitForProcecess(self): + for sp in self._subProcs: + sp.wait(300) + + +class TaosBenchmark(Benchmark): + + def __init__(self, loopCount): + super().__init__('taos', loopCount) + # self._dbType = 'taos' + tInst = TdeInstance() + self._dbc = DbConn.createNative(tInst.getDbTarget()) + self._dbc.open() + self._sTable = TdSuperTable(TIME_SERIES_NAME + '_s', DB_NAME) + + def doIterate(self): + tblName = Config.getConfig().target_table_name + print("Benchmarking TAOS database (1 pass) for: {}".format(tblName)) + self._dbc.execute("USE {}".format(DB_NAME)) + + self._sTable.ensureRegTable(None, self._dbc, tblName) + try: + lCount = Config.getConfig().loop_count + print("({})".format(lCount)) + for i in range(0, lCount): + writeTaosBatch(self._dbc, tblName) + except taos.error.ProgrammingError as err: + Logging.error("Failed to write batch") + + def prepare(self): + self._dbc.execute("CREATE DATABASE IF NOT EXISTS {}".format(DB_NAME)) + self._dbc.execute("USE {}".format(DB_NAME)) + # Create the super table + self._sTable.drop(self._dbc, True) + self._sTable.create(self._dbc, + {'ts': TdDataType.TIMESTAMP, + 'temperature': TdDataType.INT, + 'pressure': TdDataType.INT, + 'notes': TdDataType.BINARY200 + }, + {'rack': TdDataType.INT, + 'shelf': TdDataType.INT, + 'barcode': TdDataType.BINARY16 + }) + + def execSql(self, sql): + try: + self._dbc.execute(sql) + except taos.error.ProgrammingError as err: + Logging.warning("SQL Error: 0x{:X}, {}, SQL: {}".format( + Helper.convertErrno(err.errno), err.msg, sql)) + raise + + def executeWrite(self): + # Sample: INSERT INTO t1 USING st TAGS(1) VALUES(now, 1) t2 USING st TAGS(2) VALUES(now, 2) + sqlPrefix = "INSERT INTO " + dataTemplate = "{} USING {} TAGS({},{},'barcode_{}') VALUES('{}',{},{},'{}') " + + stName = self._sTable.getName() + BATCH_SIZE = 2000 # number of items per request batch + ITEMS_PER_SHELF = 5 + + # rackSize = 10 # shelves per rack + # shelfSize = 100 # items per shelf + batchCount = self._loopCount // BATCH_SIZE + lastRack = 0 + for i in range(batchCount): + sql = sqlPrefix + for j in range(BATCH_SIZE): + n = i*BATCH_SIZE + j # serial number + # values first + # rtName = 'rt_' + str(n) # table name contains serial number, has info + temperature = 20 + (n % 10) + pressure = 70 + (n % 10) + # tags + shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number + rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number + barcode = rack + shelf + # table name + tableName = "reg_" + str(rack) + '_' + str(shelf) + # now the SQL + sql += dataTemplate.format(tableName, stName,# table name + rack, shelf, barcode, # tags + Database.getNextTick(), temperature, pressure, 'xxx') # values + lastRack = rack + self.execSql(sql) + Logging.info("Last Rack: {}".format(lastRack)) + +class TaosWriteBenchmark(TaosBenchmark): + def execute(self): + self.executeWrite() + +class Taos100kWriteBenchmark(TaosWriteBenchmark): + def __init__(self): + super().__init__(100*1000) + +class Taos10kWriteBenchmark(TaosWriteBenchmark): + def __init__(self): + super().__init__(10*1000) + +class Taos1mWriteBenchmark(TaosWriteBenchmark): + def __init__(self): + super().__init__(1000*1000) + +class Taos5mWriteBenchmark(TaosWriteBenchmark): + def __init__(self): + super().__init__(5*1000*1000) + +class Taos1kQueryBenchmark(TaosBenchmark): + def __init__(self): + super().__init__(1000) + +class Taos1MCreationBenchmark(TaosBenchmark): + def __init__(self): + super().__init__(1000000) + + +class InfluxBenchmark(Benchmark): + def __init__(self, loopCount): + super().__init__('influx', loopCount) + # self._dbType = 'influx' + + + # self._client = InfluxDBClient(host='localhost', port=8086) + + # def _writeBatch(self, tblName): + # global value, tick + # data = [] + # for i in range(0, 100): + # line = "{},device={} value={} {}".format( + # TIME_SERIES_NAME, + # tblName, + # value, + # tick*1000000000) + # # print(line) + # data.append(line) + # value += 1 + # tick +=1 + + # self._client.write(data, {'db':DB_NAME}, protocol='line') + + def executeWrite(self): + global tick # influx tick #TODO refactor + + lineTemplate = TIME_SERIES_NAME + ",rack={},shelf={},barcode='barcode_{}' temperature={},pressure={} {}" + + batchCount = self._loopCount // BATCH_SIZE + for i in range(batchCount): + lineBatch = [] + for j in range(BATCH_SIZE): + n = i*BATCH_SIZE + j # serial number + # values first + # rtName = 'rt_' + str(n) # table name contains serial number, has info + temperature = 20 + (n % 10) + pressure = 70 + (n % 10) + # tags + shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number + rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number + barcode = rack + shelf + # now the SQL + line = lineTemplate.format( + rack, shelf, barcode, # tags + temperature, pressure, # values + tick * 1000000000 ) + tick += 1 + lineBatch.append(line) + write_api = self._client.write_api(write_options=SYNCHRONOUS) + write_api.write(INFLUX_BUCKET, INFLUX_ORG, lineBatch) + # self._client.write(lineBatch, {'db':DB_NAME}, protocol='line') + + # def doIterate(self): + # tblName = Config.getConfig().target_table_name + # print("Benchmarking INFLUX database (1 pass) for: {}".format(tblName)) + + # for i in range(0, Config.getConfig().loop_count): + # self._writeBatch(tblName) + + def _getOrgIdByName(self, orgName): + """Find org by name. + + """ + orgApi = self._client.organizations_api() + orgs = orgApi.find_organizations() + for org in orgs: + if org.name == orgName: + return org.id + raise PerfGenError("Org not found with name: {}".format(orgName)) + + def _fetchAuth(self): + authApi = self._client.authorizations_api() + auths = authApi.find_authorizations() + for auth in auths: + if auth.token == INFLUX_TOKEN : + return auth + raise PerfGenError("No proper auth found") + + def _verifyPermissions(self, perms: list): + if list: + return #OK + raise PerfGenError("No permission found") + + def prepare(self): + self._client = InfluxDBClient( + url="http://127.0.0.1:8086", + token=INFLUX_TOKEN, + org=INFLUX_ORG) + + auth = self._fetchAuth() + + self._verifyPermissions(auth.permissions) + + bktApi = self._client.buckets_api() + # Delete + bkt = bktApi.find_bucket_by_name(INFLUX_BUCKET) + if bkt: + bktApi.delete_bucket(bkt) + # Recreate + + orgId = self._getOrgIdByName(INFLUX_ORG) + bktApi.create_bucket(bucket=None, bucket_name=INFLUX_BUCKET, org_id=orgId) + + # self._client.drop_database(DB_NAME) + # self._client.create_database(DB_NAME) + # self._client.switch_database(DB_NAME) + +class InfluxWriteBenchmark(InfluxBenchmark): + def execute(self): + return self.executeWrite() + +class Influx10kWriteBenchmark(InfluxWriteBenchmark): + def __init__(self): + super().__init__(10*1000) + +class Influx100kWriteBenchmark(InfluxWriteBenchmark): + def __init__(self): + super().__init__(100*1000) + +class Influx1mWriteBenchmark(InfluxWriteBenchmark): + def __init__(self): + super().__init__(1000*1000) + +class Influx5mWriteBenchmark(InfluxWriteBenchmark): + def __init__(self): + super().__init__(5*1000*1000) + +def _buildCmdLineParser(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent('''\ + TDengine Performance Benchmarking Tool + --------------------------------------------------------------------- + + ''')) + + parser.add_argument( + '-b', + '--benchmark-name', + action='store', + default='Taos1kQuery', + type=str, + help='Benchmark to use (default: Taos1kQuery)') + + parser.add_argument( + '-d', + '--debug', + action='store_true', + help='Turn on DEBUG mode for more logging (default: false)') + + parser.add_argument( + '-i', + '--iterate-directly', + action='store_true', + help='Execution operations directly without sub-process (default: false)') + + parser.add_argument( + '-l', + '--loop-count', + action='store', + default=1000, + type=int, + help='Number of loops to perform, 100 operations per loop. (default: 1000)') + + parser.add_argument( + '-n', + '--target-table-name', + action='store', + default=None, + type=str, + help='Regular table name in target DB (default: None)') + + parser.add_argument( + '-s', + '--subprocess-count', + action='store', + default=4, + type=int, + help='Number of sub processes to spawn. (default: 10)') + + parser.add_argument( + '-t', + '--target-database', + action='store', + default='taos', + type=str, + help='Benchmark target: taos, influx (default: taos)') + + return parser + +def main(): + parser = _buildCmdLineParser() + Config.init(parser) + Logging.clsInit(Config.getConfig().debug) + Dice.seed(0) # initial seeding of dice + + bName = Config.getConfig().benchmark_name + bClassName = bName + 'Benchmark' + x = globals() + if bClassName in globals(): + bClass = globals()[bClassName] + bm = bClass() # Benchmark object + bm.run() + else: + raise PerfGenError("No such benchmark: {}".format(bName)) + + # bm = Benchmark.create(Config.getConfig().target_database) + # bm.run() + +if __name__ == "__main__": + main() + + diff --git a/tests/pytest/perf_gen.sh b/tests/pytest/perf_gen.sh new file mode 100755 index 0000000000000000000000000000000000000000..fcedd2d407b7e5a5d4b8e9373afd17376ae60c31 --- /dev/null +++ b/tests/pytest/perf_gen.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# This is the script for us to try to cause the TDengine server or client to crash +# +# PREPARATION +# +# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree +# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory +# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg +# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg +# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above +# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils +# +# RUNNING THIS SCRIPT +# +# This script assumes the source code directory is intact, and that the binaries has been built in the +# build/ directory, as such, will will load the Python libraries in the directory tree, and also load +# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env +# variables below. +# +# Running the script is simple, no parameter is needed (for now, but will change in the future). +# +# Happy Crashing... + + +# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory +EXEC_DIR=`dirname "$0"` +if [[ $EXEC_DIR != "." ]] +then + echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)" + exit -1 +fi + +CURR_DIR=`pwd` +IN_TDINTERNAL="community" +if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then + TAOS_DIR=$CURR_DIR/../../.. + TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1` + LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6,7|rev`/lib +else + TAOS_DIR=$CURR_DIR/../.. + TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1` + LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib +fi + +# Now getting ready to execute Python +# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk +PYTHON_EXEC=python3.8 + +# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work. +export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd) + +# Then let us set up the library path so that our compiled SO file can be loaded by Python +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR + +# Now we are all let, and let's see if we can find a crash. Note we pass all params +PERF_GEN_EXEC=perf_gen.py +$PYTHON_EXEC $PERF_GEN_EXEC $@ + +