提交 456ea712 编写于 作者: S Steven Li

Refactored crash_gen tool with stronger typing

上级 7e1b1b1f
# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645 # Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645
from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
from crash_gen.settings import Settings
from crash_gen.types import DirPath
\ No newline at end of file
# -----!/usr/bin/python3.7 # -----!/usr/bin/python3.7
################################################################### ###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc. # Copyright (c) 2016-2021 by TAOS Technologies, Inc.
# All rights reserved. # All rights reserved.
# #
# This file is proprietary and confidential to TAOS Technologies. # This file is proprietary and confidential to TAOS Technologies.
...@@ -24,30 +24,34 @@ import textwrap ...@@ -24,30 +24,34 @@ import textwrap
import time import time
import datetime import datetime
import random import random
import logging
import threading import threading
import copy
import argparse import argparse
import getopt
import sys import sys
import os import os
import io import io
import signal import signal
import traceback import traceback
import resource import requests
# from guppy import hpy # from guppy import hpy
import gc import gc
import taos
from .shared.types import TdColumns, TdTags
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
from crash_gen import ServiceManager, Settings, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \ # from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
CrashGenError, Progress, MyTDSql, \ # CrashGenError, Progress, MyTDSql, \
TdeInstance # TdeInstance
import taos from .service_manager import ServiceManager, TdeInstance
import requests
from .shared.config import Config
from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql
from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress
from .shared.types import TdDataType
Settings.init() # Config.init()
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
...@@ -81,20 +85,20 @@ class WorkerThread: ...@@ -81,20 +85,20 @@ class WorkerThread:
self._stepGate = threading.Event() self._stepGate = threading.Event()
# Let us have a DB connection of our own # Let us have a DB connection of our own
if (Settings.getConfig().per_thread_db_connection): # type: ignore if (Config.getConfig().per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type)) # print("connector_type = {}".format(gConfig.connector_type))
tInst = gContainer.defTdeInstance tInst = gContainer.defTdeInstance
if Settings.getConfig().connector_type == 'native': if Config.getConfig().connector_type == 'native':
self._dbConn = DbConn.createNative(tInst.getDbTarget()) self._dbConn = DbConn.createNative(tInst.getDbTarget())
elif Settings.getConfig().connector_type == 'rest': elif Config.getConfig().connector_type == 'rest':
self._dbConn = DbConn.createRest(tInst.getDbTarget()) self._dbConn = DbConn.createRest(tInst.getDbTarget())
elif Settings.getConfig().connector_type == 'mixed': elif Config.getConfig().connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance if Dice.throw(2) == 0: # 1/2 chance
self._dbConn = DbConn.createNative(tInst.getDbTarget()) self._dbConn = DbConn.createNative(tInst.getDbTarget())
else: else:
self._dbConn = DbConn.createRest(tInst.getDbTarget()) self._dbConn = DbConn.createRest(tInst.getDbTarget())
else: else:
raise RuntimeError("Unexpected connector type: {}".format(Settings.getConfig().connector_type)) raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type))
# self._dbInUse = False # if "use db" was executed already # self._dbInUse = False # if "use db" was executed already
...@@ -123,14 +127,14 @@ class WorkerThread: ...@@ -123,14 +127,14 @@ class WorkerThread:
# self.isSleeping = False # self.isSleeping = False
Logging.info("Starting to run thread: {}".format(self._tid)) Logging.info("Starting to run thread: {}".format(self._tid))
if (Settings.getConfig().per_thread_db_connection): # type: ignore if (Config.getConfig().per_thread_db_connection): # type: ignore
Logging.debug("Worker thread openning database connection") Logging.debug("Worker thread openning database connection")
self._dbConn.open() self._dbConn.open()
self._doTaskLoop() self._doTaskLoop()
# clean up # clean up
if (Settings.getConfig().per_thread_db_connection): # type: ignore if (Config.getConfig().per_thread_db_connection): # type: ignore
if self._dbConn.isOpen: #sometimes it is not open if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close() self._dbConn.close()
else: else:
...@@ -158,7 +162,7 @@ class WorkerThread: ...@@ -158,7 +162,7 @@ class WorkerThread:
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try: try:
if (Settings.getConfig().per_thread_db_connection): # most likely TRUE if (Config.getConfig().per_thread_db_connection): # most likely TRUE
if not self._dbConn.isOpen: # might have been closed during server auto-restart if not self._dbConn.isOpen: # might have been closed during server auto-restart
self._dbConn.open() self._dbConn.open()
# self.useDb() # might encounter exceptions. TODO: catch # self.useDb() # might encounter exceptions. TODO: catch
...@@ -232,7 +236,7 @@ class WorkerThread: ...@@ -232,7 +236,7 @@ class WorkerThread:
return self.getDbConn().getQueryResult() return self.getDbConn().getQueryResult()
def getDbConn(self) -> DbConn : def getDbConn(self) -> DbConn :
if (Settings.getConfig().per_thread_db_connection): if (Config.getConfig().per_thread_db_connection):
return self._dbConn return self._dbConn
else: else:
return self._tc.getDbManager().getDbConn() return self._tc.getDbManager().getDbConn()
...@@ -283,7 +287,7 @@ class ThreadCoordinator: ...@@ -283,7 +287,7 @@ class ThreadCoordinator:
self._execStats.registerFailure("User Interruption") self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = Settings.getConfig().max_steps # type: ignore maxSteps = Config.getConfig().max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True return True
if self._runStatus != Status.STATUS_RUNNING: if self._runStatus != Status.STATUS_RUNNING:
...@@ -388,7 +392,7 @@ class ThreadCoordinator: ...@@ -388,7 +392,7 @@ class ThreadCoordinator:
hasAbortedTask = False hasAbortedTask = False
workerTimeout = False workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not Settings.getConfig().debug: # print this only if we are not in debug mode if not Config.getConfig().debug: # print this only if we are not in debug mode
Progress.emit(Progress.STEP_BOUNDARY) Progress.emit(Progress.STEP_BOUNDARY)
# print(".", end="", flush=True) # print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
...@@ -512,18 +516,18 @@ class ThreadCoordinator: ...@@ -512,18 +516,18 @@ class ThreadCoordinator:
''' Initialize multiple databases, invoked at __ini__() time ''' ''' Initialize multiple databases, invoked at __ini__() time '''
self._dbs = [] # type: List[Database] self._dbs = [] # type: List[Database]
dbc = self.getDbManager().getDbConn() dbc = self.getDbManager().getDbConn()
if Settings.getConfig().max_dbs == 0: if Config.getConfig().max_dbs == 0:
self._dbs.append(Database(0, dbc)) self._dbs.append(Database(0, dbc))
else: else:
baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
)*333) % 888 if Settings.getConfig().dynamic_db_table_names else 0 )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0
for i in range(Settings.getConfig().max_dbs): for i in range(Config.getConfig().max_dbs):
self._dbs.append(Database(baseDbNumber + i, dbc)) self._dbs.append(Database(baseDbNumber + i, dbc))
def pickDatabase(self): def pickDatabase(self):
idxDb = 0 idxDb = 0
if Settings.getConfig().max_dbs != 0 : if Config.getConfig().max_dbs != 0 :
idxDb = Dice.throw(Settings.getConfig().max_dbs) # 0 to N-1 idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1
db = self._dbs[idxDb] # type: Database db = self._dbs[idxDb] # type: Database
return db return db
...@@ -705,7 +709,7 @@ class AnyState: ...@@ -705,7 +709,7 @@ class AnyState:
def canDropDb(self): def canDropDb(self):
# If user requests to run up to a number of DBs, # If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more # we'd then not do drop_db operations any more
if Settings.getConfig().max_dbs > 0 or Settings.getConfig().use_shadow_db : if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db :
return False return False
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
...@@ -713,7 +717,7 @@ class AnyState: ...@@ -713,7 +717,7 @@ class AnyState:
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self): def canDropFixedSuperTable(self):
if Settings.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
return False return False
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
...@@ -1110,7 +1114,7 @@ class Database: ...@@ -1110,7 +1114,7 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
Logging.info("Setting up TICKS to start from: {}".format(t4)) Logging.debug("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
@classmethod @classmethod
...@@ -1126,7 +1130,7 @@ class Database: ...@@ -1126,7 +1130,7 @@ class Database:
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future # if : # should be quite a bit into the future
if Settings.getConfig().mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick return cls._lastLaggingTick
else: # regular else: # regular
...@@ -1310,8 +1314,8 @@ class Task(): ...@@ -1310,8 +1314,8 @@ class Task():
# This case handled below already. # This case handled below already.
# elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service: # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
# return True # We may get "network unavilable" when restarting service # return True # We may get "network unavilable" when restarting service
elif Settings.getConfig().ignore_errors: # something is specified on command line elif Config.getConfig().ignore_errors: # something is specified on command line
moreErrnos = [int(v, 0) for v in Settings.getConfig().ignore_errors.split(',')] moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')]
if errno in moreErrnos: if errno in moreErrnos:
return True return True
elif errno == 0x200 : # invalid SQL, we need to div in a bit more elif errno == 0x200 : # invalid SQL, we need to div in a bit more
...@@ -1347,7 +1351,7 @@ class Task(): ...@@ -1347,7 +1351,7 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
if (Settings.getConfig().continue_on_exception): # user choose to continue if (Config.getConfig().continue_on_exception): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
self._err = err self._err = err
...@@ -1362,7 +1366,7 @@ class Task(): ...@@ -1362,7 +1366,7 @@ class Task():
self.__class__.__name__, self.__class__.__name__,
errno2, err, wt.getDbConn().getLastSql()) errno2, err, wt.getDbConn().getLastSql())
self.logDebug(errMsg) self.logDebug(errMsg)
if Settings.getConfig().debug: if Config.getConfig().debug:
# raise # so that we see full stack # raise # so that we see full stack
traceback.print_exc() traceback.print_exc()
print( print(
...@@ -1560,7 +1564,7 @@ class StateTransitionTask(Task): ...@@ -1560,7 +1564,7 @@ class StateTransitionTask(Task):
def getRegTableName(cls, i): def getRegTableName(cls, i):
if ( StateTransitionTask._baseTableNumber is None): # Set it one time if ( StateTransitionTask._baseTableNumber is None): # Set it one time
StateTransitionTask._baseTableNumber = Dice.throw( StateTransitionTask._baseTableNumber = Dice.throw(
999) if Settings.getConfig().dynamic_db_table_names else 0 999) if Config.getConfig().dynamic_db_table_names else 0
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i) return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
...@@ -1580,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1580,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# was: self.execWtSql(wt, "create database db") # was: self.execWtSql(wt, "create database db")
repStr = "" repStr = ""
if Settings.getConfig().num_replicas != 1: if Config.getConfig().num_replicas != 1:
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
numReplica = Settings.getConfig().num_replicas # fixed, always numReplica = Config.getConfig().num_replicas # fixed, always
repStr = "replica {}".format(numReplica) repStr = "replica {}".format(numReplica)
updatePostfix = "update 1" if Settings.getConfig().verify_data else "" # allow update only when "verify data" is active updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
dbName = self._db.getName() dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
if dbName == "db_0" and Settings.getConfig().use_shadow_db: if dbName == "db_0" and Config.getConfig().use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
...@@ -1620,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -1620,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), sTable.create(wt.getDbConn(),
{'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'}, {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
dropIfExists = True 'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
) dropIfExists=True
)
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that # No need to create the regular tables, INSERT will do that
# automatically # automatically
...@@ -1651,9 +1656,7 @@ class TdSuperTable: ...@@ -1651,9 +1656,7 @@ class TdSuperTable:
return dbc.existsSuperTable(self._stName) return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static? # TODO: odd semantic, create() method is usually static?
def create(self, dbc, cols: dict, tags: dict, def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
dropIfExists = False
):
'''Creating a super table''' '''Creating a super table'''
dbName = self._dbName dbName = self._dbName
...@@ -1664,17 +1667,17 @@ class TdSuperTable: ...@@ -1664,17 +1667,17 @@ class TdSuperTable:
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
else: # error else: # error
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName)) raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
# Now let's create # Now let's create
sql = "CREATE TABLE {} ({})".format( sql = "CREATE TABLE {} ({})".format(
fullTableName, fullTableName,
",".join(['%s %s'%(k,v) for (k,v) in cols.items()])) ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()]))
if tags is None : if tags :
sql += " TAGS (dummy int) "
else:
sql += " TAGS ({})".format( sql += " TAGS ({})".format(
",".join(['%s %s'%(k,v) for (k,v) in tags.items()]) ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()])
) )
else:
sql += " TAGS (dummy int) "
dbc.execute(sql) dbc.execute(sql)
def getRegTables(self, dbc: DbConn): def getRegTables(self, dbc: DbConn):
...@@ -1692,7 +1695,7 @@ class TdSuperTable: ...@@ -1692,7 +1695,7 @@ class TdSuperTable:
def hasRegTables(self, dbc: DbConn): def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureTable(self, task: Task, dbc: DbConn, regTableName: str): def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
dbName = self._dbName dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already if dbc.query(sql) >= 1 : # reg table exists already
...@@ -1700,7 +1703,7 @@ class TdSuperTable: ...@@ -1700,7 +1703,7 @@ class TdSuperTable:
# acquire a lock first, so as to be able to *verify*. More details in TD-1471 # acquire a lock first, so as to be able to *verify*. More details in TD-1471
fullTableName = dbName + '.' + regTableName fullTableName = dbName + '.' + regTableName
if task is not None: # optional lock if task is not None: # TODO: what happens if we don't lock the table
task.lockTable(fullTableName) task.lockTable(fullTableName)
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
# print("(" + fullTableName[-3:] + ")", end="", flush=True) # print("(" + fullTableName[-3:] + ")", end="", flush=True)
...@@ -1892,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -1892,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask):
if Dice.throw(2) == 0: if Dice.throw(2) == 0:
# print("_7_", end="", flush=True) # print("_7_", end="", flush=True)
tblSeq = list(range( tblSeq = list(range(
2 + (self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES))) 2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq) random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table" tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True isSuccess = True
...@@ -1958,13 +1961,13 @@ class TaskRestartService(StateTransitionTask): ...@@ -1958,13 +1961,13 @@ class TaskRestartService(StateTransitionTask):
@classmethod @classmethod
def canBeginFrom(cls, state: AnyState): def canBeginFrom(cls, state: AnyState):
if Settings.getConfig().auto_start_service: if Config.getConfig().auto_start_service:
return state.canDropFixedSuperTable() # Basicallly when we have the super table return state.canDropFixedSuperTable() # Basicallly when we have the super table
return False # don't run this otherwise return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 200 CHANCE_TO_RESTART_SERVICE = 200
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not Settings.getConfig().auto_start_service: # only execute when we are in -a mode if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True) print("_a", end="", flush=True)
return return
...@@ -1991,7 +1994,7 @@ class TaskAddData(StateTransitionTask): ...@@ -1991,7 +1994,7 @@ class TaskAddData(StateTransitionTask):
@classmethod @classmethod
def prepToRecordOps(cls): def prepToRecordOps(cls):
if Settings.getConfig().record_ops: if Config.getConfig().record_ops:
if (cls.fAddLogReady is None): if (cls.fAddLogReady is None):
Logging.info( Logging.info(
"Recording in a file operations to be performed...") "Recording in a file operations to be performed...")
...@@ -2009,7 +2012,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2009,7 +2012,7 @@ class TaskAddData(StateTransitionTask):
return state.canAddData() return state.canAddData()
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
sql = "INSERT INTO {} VALUES ".format(fullTableName) sql = "INSERT INTO {} VALUES ".format(fullTableName)
...@@ -2021,13 +2024,13 @@ class TaskAddData(StateTransitionTask): ...@@ -2021,13 +2024,13 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
nextColor = db.getNextColor() nextColor = db.getNextColor()
if Settings.getConfig().record_ops: if Config.getConfig().record_ops:
self.prepToRecordOps() self.prepToRecordOps()
if self.fAddLogReady is None: if self.fAddLogReady is None:
raise CrashGenError("Unexpected empty fAddLogReady") raise CrashGenError("Unexpected empty fAddLogReady")
...@@ -2037,7 +2040,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2037,7 +2040,7 @@ class TaskAddData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
if Settings.getConfig().verify_data: if Config.getConfig().verify_data:
self.lockTable(fullTableName) self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
...@@ -2050,7 +2053,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2050,7 +2053,7 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not Settings.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextColor = db.getNextColor() nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
...@@ -2061,12 +2064,12 @@ class TaskAddData(StateTransitionTask): ...@@ -2061,12 +2064,12 @@ class TaskAddData(StateTransitionTask):
dbc.execute(sql) dbc.execute(sql)
except: # Any exception at all except: # Any exception at all
if Settings.getConfig().verify_data: if Config.getConfig().verify_data:
self.unlockTable(fullTableName) self.unlockTable(fullTableName)
raise raise
# Now read it back and verify, we might encounter an error if table is dropped # Now read it back and verify, we might encounter an error if table is dropped
if Settings.getConfig().verify_data: # only if command line asks for it if Config.getConfig().verify_data: # only if command line asks for it
try: try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick)) format(db.getName(), regTableName, nextTick))
...@@ -2093,7 +2096,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2093,7 +2096,7 @@ class TaskAddData(StateTransitionTask):
# Successfully wrote the data into the DB, let's record it somehow # Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt) te.recordDataMark(nextInt)
if Settings.getConfig().record_ops: if Config.getConfig().record_ops:
if self.fAddLogDone is None: if self.fAddLogDone is None:
raise CrashGenError("Unexpected empty fAddLogDone") raise CrashGenError("Unexpected empty fAddLogDone")
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
...@@ -2104,8 +2107,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2104,8 +2107,8 @@ class TaskAddData(StateTransitionTask):
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
db = self._db db = self._db
dbc = wt.getDbConn() dbc = wt.getDbConn()
numTables = self.LARGE_NUMBER_OF_TABLES if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
numRecords = self.LARGE_NUMBER_OF_RECORDS if Settings.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
tblSeq = list(range(numTables )) tblSeq = list(range(numTables ))
random.shuffle(tblSeq) # now we have random sequence random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq: for i in tblSeq:
...@@ -2120,7 +2123,7 @@ class TaskAddData(StateTransitionTask): ...@@ -2120,7 +2123,7 @@ class TaskAddData(StateTransitionTask):
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
fullTableName = dbName + '.' + regTableName fullTableName = dbName + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked" # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName) # self._unlockTable(fullTableName)
if Dice.throw(1) == 0: # 1 in 2 chance if Dice.throw(1) == 0: # 1 in 2 chance
...@@ -2264,7 +2267,7 @@ class ClientManager: ...@@ -2264,7 +2267,7 @@ class ClientManager:
global gContainer global gContainer
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance" tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
cfg = Settings.getConfig() cfg = Config.getConfig()
dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function
thPool = ThreadPool(cfg.num_threads, cfg.max_steps) thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager) self.tc = ThreadCoordinator(thPool, dbManager)
...@@ -2280,7 +2283,7 @@ class ClientManager: ...@@ -2280,7 +2283,7 @@ class ClientManager:
# Release global variables # Release global variables
# gConfig = None # gConfig = None
Settings.clearConfig() Config.clearConfig()
gSvcMgr = None gSvcMgr = None
logger = None logger = None
...@@ -2331,7 +2334,7 @@ class MainExec: ...@@ -2331,7 +2334,7 @@ class MainExec:
def runClient(self): def runClient(self):
global gSvcMgr global gSvcMgr
if Settings.getConfig().auto_start_service: if Config.getConfig().auto_start_service:
gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
gSvcMgr.startTaosServices() # we start, don't run gSvcMgr.startTaosServices() # we start, don't run
...@@ -2346,20 +2349,12 @@ class MainExec: ...@@ -2346,20 +2349,12 @@ class MainExec:
def runService(self): def runService(self):
global gSvcMgr global gSvcMgr
gSvcMgr = self._svcMgr = ServiceManager(Settings.getConfig().num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr.run() # run to some end state gSvcMgr.run() # run to some end state
gSvcMgr = self._svcMgr = None gSvcMgr = self._svcMgr = None
def init(self): # TODO: refactor def _buildCmdLineParser(self):
global gContainer
gContainer = Container() # micky-mouse DI
global gSvcMgr # TODO: refactor away
gSvcMgr = None
# Super cool Python argument library:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\ description=textwrap.dedent('''\
...@@ -2480,20 +2475,29 @@ class MainExec: ...@@ -2480,20 +2475,29 @@ class MainExec:
action='store_true', action='store_true',
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)') help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
# global gConfig return parser
config = parser.parse_args()
Settings.setConfig(config) # TODO: fix this hack, consolidate this global var
def init(self): # TODO: refactor
global gContainer
gContainer = Container() # micky-mouse DI
global gSvcMgr # TODO: refactor away
gSvcMgr = None
parser = self._buildCmdLineParser()
Config.init(parser)
# Sanity check for arguments # Sanity check for arguments
if Settings.getConfig().use_shadow_db and Settings.getConfig().max_dbs>1 : if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 :
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
Logging.clsInit(Settings.getConfig()) Logging.clsInit(Config.getConfig().debug)
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
def run(self): def run(self):
if Settings.getConfig().run_tdengine: # run server if Config.getConfig().run_tdengine: # run server
try: try:
self.runService() self.runService()
return 0 # success return 0 # success
......
...@@ -19,10 +19,15 @@ except: ...@@ -19,10 +19,15 @@ except:
sys.exit(-1) sys.exit(-1)
from queue import Queue, Empty from queue import Queue, Empty
from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status from .shared.config import Config
from crash_gen.db import DbConn, DbTarget from .shared.db import DbTarget, DbConn
from crash_gen.settings import Settings from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
from crash_gen.types import DirPath from .shared.types import DirPath
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
# from crash_gen.db import DbConn, DbTarget
# from crash_gen.settings import Config
# from crash_gen.types import DirPath
class TdeInstance(): class TdeInstance():
""" """
...@@ -173,7 +178,7 @@ quorum 2 ...@@ -173,7 +178,7 @@ quorum 2
def getServiceCmdLine(self): # to start the instance def getServiceCmdLine(self): # to start the instance
cmdLine = [] cmdLine = []
if Settings.getConfig().track_memory_leaks: if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...") Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes'] cmdLine = ['valgrind', '--leak-check=yes']
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
...@@ -789,22 +794,10 @@ class ServiceManagerThread: ...@@ -789,22 +794,10 @@ class ServiceManagerThread:
def stop(self): def stop(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
# Linux will send Control-C generated SIGINT to the TDengine process # Linux will send Control-C generated SIGINT to the TDengine process already, ref:
# already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
# if not self._tdeSubProcess:
# raise RuntimeError("sub process object missing") self.join() # stop the thread, status change moved to TdeSubProcess
# self._status.set(Status.STATUS_STOPPING)
# TdeSubProcess.stop(self._tdeSubProcess) # must stop, no matter what
# self._tdeSubProcess = None
# if not self._tdeSubProcess.stop(): # everything withing
# if self._tdeSubProcess.isRunning(): # still running, should now never happen
# Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
# self._tdeSubProcess.getPid()))
# else:
# self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
# Check if it's really stopped # Check if it's really stopped
outputLines = 10 # for last output outputLines = 10 # for last output
......
from __future__ import annotations from __future__ import annotations
import argparse import argparse
from typing import Optional from typing import Optional
from crash_gen.misc import CrashGenError from .misc import CrashGenError
# from crash_gen.misc import CrashGenError
# gConfig: Optional[argparse.Namespace] # gConfig: Optional[argparse.Namespace]
class Settings: class Config:
_config = None # type Optional[argparse.Namespace] _config = None # type Optional[argparse.Namespace]
@classmethod @classmethod
def init(cls): def init(cls, parser: argparse.ArgumentParser):
cls._config = None if cls._config is not None:
raise CrashGenError("Config can only be initialized once")
cls._config = parser.parse_args()
# print(cls._config)
@classmethod @classmethod
def setConfig(cls, config: argparse.Namespace): def setConfig(cls, config: argparse.Namespace):
...@@ -26,4 +32,11 @@ class Settings: ...@@ -26,4 +32,11 @@ class Settings:
@classmethod @classmethod
def clearConfig(cls): def clearConfig(cls):
cls._config = None cls._config = None
\ No newline at end of file
@classmethod
def isSet(cls, cfgKey):
cfg = cls.getConfig()
if cfgKey not in cfg:
return False
return cfg.__getattribute__(cfgKey)
\ No newline at end of file
from __future__ import annotations from __future__ import annotations
import sys import sys
import os
import datetime
import time import time
import threading import threading
import requests import requests
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from crash_gen.types import QueryResult
import taos import taos
from util.sql import * from util.sql import *
...@@ -13,13 +15,12 @@ from util.cases import * ...@@ -13,13 +15,12 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.log import * from util.log import *
from .misc import Logging, CrashGenError, Helper, Dice
import os
import datetime
import traceback import traceback
# from .service_manager import TdeInstance # from .service_manager import TdeInstance
from crash_gen.settings import Settings from .config import Config
from .misc import Logging, CrashGenError, Helper
from .types import QueryResult
class DbConn: class DbConn:
TYPE_NATIVE = "native-c" TYPE_NATIVE = "native-c"
...@@ -250,7 +251,13 @@ class MyTDSql: ...@@ -250,7 +251,13 @@ class MyTDSql:
def _execInternal(self, sql): def _execInternal(self, sql):
startTime = time.time() startTime = time.time()
# Logging.debug("Executing SQL: " + sql) # Logging.debug("Executing SQL: " + sql)
# ret = None # TODO: use strong type here
# try: # Let's not capture the error, and let taos.error.ProgrammingError pass through
ret = self._cursor.execute(sql) ret = self._cursor.execute(sql)
# except taos.error.ProgrammingError as err:
# Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql))
# raise CrashGenError(err.msg)
# print("\nSQL success: {}".format(sql)) # print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime queryTime = time.time() - startTime
# Record the query time # Record the query time
...@@ -262,7 +269,7 @@ class MyTDSql: ...@@ -262,7 +269,7 @@ class MyTDSql:
cls.lqStartTime = startTime cls.lqStartTime = startTime
# Now write to the shadow database # Now write to the shadow database
if Settings.getConfig().use_shadow_db: if Config.isSet('use_shadow_db'):
if sql[:11] == "INSERT INTO": if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0": if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:] sql2 = "INSERT INTO db_s" + sql[16:]
......
...@@ -47,7 +47,7 @@ class Logging: ...@@ -47,7 +47,7 @@ class Logging:
return cls.logger return cls.logger
@classmethod @classmethod
def clsInit(cls, gConfig): # TODO: refactor away gConfig def clsInit(cls, debugMode: bool):
if cls.logger: if cls.logger:
return return
...@@ -62,12 +62,8 @@ class Logging: ...@@ -62,12 +62,8 @@ class Logging:
# print("setting logger variable") # print("setting logger variable")
# global logger # global logger
cls.logger = MyLoggingAdapter(_logger, {}) cls.logger = MyLoggingAdapter(_logger, {})
cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO
if (gConfig.debug):
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
cls.logger.setLevel(logging.INFO)
@classmethod @classmethod
def info(cls, msg): def info(cls, msg):
cls.logger.info(msg) cls.logger.info(msg)
......
from typing import Any, List, Dict, NewType
from enum import Enum
DirPath = NewType('DirPath', str)
QueryResult = NewType('QueryResult', List[List[Any]])
class TdDataType(Enum):
'''
Use a Python Enum types of represent all the data types in TDengine.
Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type
'''
TIMESTAMP = 'TIMESTAMP'
INT = 'INT'
BIGINT = 'BIGINT'
FLOAT = 'FLOAT'
DOUBLE = 'DOUBLE'
BINARY = 'BINARY'
BINARY16 = 'BINARY(16)' # TODO: get rid of this hack
BINARY200 = 'BINARY(200)'
SMALLINT = 'SMALLINT'
TINYINT = 'TINYINT'
BOOL = 'BOOL'
NCHAR = 'NCHAR'
TdColumns = Dict[str, TdDataType]
TdTags = Dict[str, TdDataType]
from typing import Any, List, NewType
DirPath = NewType('DirPath', str)
QueryResult = NewType('QueryResult', List[List[Any]])
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册