提交 dc72a1a6 编写于 作者: S Steven Li

Split crash_gen tool into different functional files/modules

上级 e011827f
...@@ -19,17 +19,15 @@ from util.sql import * ...@@ -19,17 +19,15 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.log import * from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set from typing import Set
from typing import Dict from typing import Dict
from typing import List from typing import List
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
import textwrap import textwrap
import datetime
import logging
import time import time
import datetime
import random import random
import logging
import threading import threading
import requests import requests
import copy import copy
...@@ -38,19 +36,14 @@ import getopt ...@@ -38,19 +36,14 @@ import getopt
import sys import sys
import os import os
import io
import signal import signal
import traceback import traceback
import resource import resource
from guppy import hpy from guppy import hpy
import gc import gc
import subprocess
try: from .service_manager import ServiceManager, TdeInstance
import psutil from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
except:
print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1)
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
...@@ -62,19 +55,12 @@ if sys.version_info[0] < 3: ...@@ -62,19 +55,12 @@ if sys.version_info[0] < 3:
# ConfigNameSpace = argparse.Namespace # ConfigNameSpace = argparse.Namespace
gConfig: argparse.Namespace gConfig: argparse.Namespace
gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection
logger: logging.Logger # logger: logging.Logger
gContainer: Container gContainer: Container
# def runThread(wt: WorkerThread): # def runThread(wt: WorkerThread):
# wt.run() # wt.run()
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class WorkerThread: class WorkerThread:
def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
...@@ -107,10 +93,10 @@ class WorkerThread: ...@@ -107,10 +93,10 @@ class WorkerThread:
# self._dbInUse = False # if "use db" was executed already # self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg): def logDebug(self, msg):
logger.debug(" TRD[{}] {}".format(self._tid, msg)) Logging.debug(" TRD[{}] {}".format(self._tid, msg))
def logInfo(self, msg): def logInfo(self, msg):
logger.info(" TRD[{}] {}".format(self._tid, msg)) Logging.info(" TRD[{}] {}".format(self._tid, msg))
# def dbInUse(self): # def dbInUse(self):
# return self._dbInUse # return self._dbInUse
...@@ -129,10 +115,10 @@ class WorkerThread: ...@@ -129,10 +115,10 @@ class WorkerThread:
def run(self): def run(self):
# initialization after thread starts, in the thread context # initialization after thread starts, in the thread context
# self.isSleeping = False # self.isSleeping = False
logger.info("Starting to run thread: {}".format(self._tid)) Logging.info("Starting to run thread: {}".format(self._tid))
if (gConfig.per_thread_db_connection): # type: ignore if (gConfig.per_thread_db_connection): # type: ignore
logger.debug("Worker thread openning database connection") Logging.debug("Worker thread openning database connection")
self._dbConn.open() self._dbConn.open()
self._doTaskLoop() self._doTaskLoop()
...@@ -142,7 +128,7 @@ class WorkerThread: ...@@ -142,7 +128,7 @@ class WorkerThread:
if self._dbConn.isOpen: #sometimes it is not open if self._dbConn.isOpen: #sometimes it is not open
self._dbConn.close() self._dbConn.close()
else: else:
logger.warning("Cleaning up worker thread, dbConn already closed") Logging.warning("Cleaning up worker thread, dbConn already closed")
def _doTaskLoop(self): def _doTaskLoop(self):
# while self._curStep < self._pool.maxSteps: # while self._curStep < self._pool.maxSteps:
...@@ -153,15 +139,15 @@ class WorkerThread: ...@@ -153,15 +139,15 @@ class WorkerThread:
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
except threading.BrokenBarrierError as err: # main thread timed out except threading.BrokenBarrierError as err: # main thread timed out
print("_bto", end="") print("_bto", end="")
logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out") Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
break break
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid)) Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
self.crossStepGate() # then per-thread gate, after being tapped self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid)) Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
if not self._tc.isRunning(): if not self._tc.isRunning():
print("_wts", end="") print("_wts", end="")
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
...@@ -180,15 +166,15 @@ class WorkerThread: ...@@ -180,15 +166,15 @@ class WorkerThread:
raise raise
# Fetch a task from the Thread Coordinator # Fetch a task from the Thread Coordinator
logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid)) Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask() task = tc.fetchTask()
# Execute such a task # Execute such a task
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format( Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
self._tid, task.__class__.__name__)) self._tid, task.__class__.__name__))
task.execute(self) task.execute(self)
tc.saveExecutedTask(task) tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
# self._dbInUse = False # there may be changes between steps # self._dbInUse = False # there may be changes between steps
# print("_wtd", end=None) # worker thread died # print("_wtd", end=None) # worker thread died
...@@ -211,7 +197,7 @@ class WorkerThread: ...@@ -211,7 +197,7 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped" # Wait again at the "gate", waiting to be "tapped"
logger.debug( Logging.debug(
"[TRD] Worker thread {} about to cross the step gate".format( "[TRD] Worker thread {} about to cross the step gate".format(
self._tid)) self._tid))
self._stepGate.wait() self._stepGate.wait()
...@@ -224,7 +210,7 @@ class WorkerThread: ...@@ -224,7 +210,7 @@ class WorkerThread:
self.verifyThreadMain() # only allowed for main thread self.verifyThreadMain() # only allowed for main thread
if self._thread.is_alive(): if self._thread.is_alive():
logger.debug("[TRD] Tapping worker thread {}".format(self._tid)) Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up! self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit time.sleep(0) # let the released thread run a bit
else: else:
...@@ -269,7 +255,7 @@ class ThreadCoordinator: ...@@ -269,7 +255,7 @@ class ThreadCoordinator:
self._stepBarrier = threading.Barrier( self._stepBarrier = threading.Barrier(
self._pool.numThreads + 1) # one barrier for all threads self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats() self._execStats = ExecutionStats()
self._runStatus = MainExec.STATUS_RUNNING self._runStatus = Status.STATUS_RUNNING
self._initDbs() self._initDbs()
def getTaskExecutor(self): def getTaskExecutor(self):
...@@ -282,14 +268,14 @@ class ThreadCoordinator: ...@@ -282,14 +268,14 @@ class ThreadCoordinator:
self._stepBarrier.wait(timeout) self._stepBarrier.wait(timeout)
def requestToStop(self): def requestToStop(self):
self._runStatus = MainExec.STATUS_STOPPING self._runStatus = Status.STATUS_STOPPING
self._execStats.registerFailure("User Interruption") self._execStats.registerFailure("User Interruption")
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout): def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
maxSteps = gConfig.max_steps # type: ignore maxSteps = gConfig.max_steps # type: ignore
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9 if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
return True return True
if self._runStatus != MainExec.STATUS_RUNNING: if self._runStatus != Status.STATUS_RUNNING:
return True return True
if transitionFailed: if transitionFailed:
return True return True
...@@ -310,7 +296,7 @@ class ThreadCoordinator: ...@@ -310,7 +296,7 @@ class ThreadCoordinator:
def _releaseAllWorkerThreads(self, transitionFailed): def _releaseAllWorkerThreads(self, transitionFailed):
self._curStep += 1 # we are about to get into next step. TODO: race condition here! self._curStep += 1 # we are about to get into next step. TODO: race condition here!
# Now not all threads had time to go to sleep # Now not all threads had time to go to sleep
logger.debug( Logging.debug(
"--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
# A new TE for the new step # A new TE for the new step
...@@ -318,7 +304,7 @@ class ThreadCoordinator: ...@@ -318,7 +304,7 @@ class ThreadCoordinator:
if not transitionFailed: # only if not failed if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep) self._te = TaskExecutor(self._curStep)
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format( Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
self._curStep)) # Now not all threads had time to go to sleep self._curStep)) # Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's own task # Worker threads will wake up at this point, and each execute it's own task
self.tapAllThreads() # release all worker thread from their "gates" self.tapAllThreads() # release all worker thread from their "gates"
...@@ -327,10 +313,10 @@ class ThreadCoordinator: ...@@ -327,10 +313,10 @@ class ThreadCoordinator:
# Now main thread (that's us) is ready to enter a step # Now main thread (that's us) is ready to enter a step
# let other threads go past the pool barrier, but wait at the # let other threads go past the pool barrier, but wait at the
# thread gate # thread gate
logger.debug("[TRD] Main thread about to cross the barrier") Logging.debug("[TRD] Main thread about to cross the barrier")
self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT) self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
self._stepBarrier.reset() # Other worker threads should now be at the "gate" self._stepBarrier.reset() # Other worker threads should now be at the "gate"
logger.debug("[TRD] Main thread finished crossing the barrier") Logging.debug("[TRD] Main thread finished crossing the barrier")
def _doTransition(self): def _doTransition(self):
transitionFailed = False transitionFailed = False
...@@ -338,11 +324,11 @@ class ThreadCoordinator: ...@@ -338,11 +324,11 @@ class ThreadCoordinator:
for x in self._dbs: for x in self._dbs:
db = x # type: Database db = x # type: Database
sm = db.getStateMachine() sm = db.getStateMachine()
logger.debug("[STT] starting transitions for DB: {}".format(db.getName())) Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
# at end of step, transiton the DB state # at end of step, transiton the DB state
tasksForDb = db.filterTasks(self._executedTasks) tasksForDb = db.filterTasks(self._executedTasks)
sm.transition(tasksForDb, self.getDbManager().getDbConn()) sm.transition(tasksForDb, self.getDbManager().getDbConn())
logger.debug("[STT] transition ended for DB: {}".format(db.getName())) Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
# Due to limitation (or maybe not) of the TD Python library, # Due to limitation (or maybe not) of the TD Python library,
# we cannot share connections across threads # we cannot share connections across threads
...@@ -350,14 +336,14 @@ class ThreadCoordinator: ...@@ -350,14 +336,14 @@ class ThreadCoordinator:
# Moving below to task loop # Moving below to task loop
# if sm.hasDatabase(): # if sm.hasDatabase():
# for t in self._pool.threadList: # for t in self._pool.threadList:
# logger.debug("[DB] use db for all worker threads") # Logging.debug("[DB] use db for all worker threads")
# t.useDb() # t.useDb()
# t.execSql("use db") # main thread executing "use # t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread # db" on behalf of every worker thread
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
if (err.msg == 'network unavailable'): # broken DB connection if (err.msg == 'network unavailable'): # broken DB connection
logger.info("DB connection broken, execution failed") Logging.info("DB connection broken, execution failed")
traceback.print_stack() traceback.print_stack()
transitionFailed = True transitionFailed = True
self._te = None # Not running any more self._te = None # Not running any more
...@@ -370,7 +356,7 @@ class ThreadCoordinator: ...@@ -370,7 +356,7 @@ class ThreadCoordinator:
self.resetExecutedTasks() # clear the tasks after we are done self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step # Get ready for next step
logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed)) Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
return transitionFailed return transitionFailed
def run(self): def run(self):
...@@ -384,8 +370,9 @@ class ThreadCoordinator: ...@@ -384,8 +370,9 @@ class ThreadCoordinator:
hasAbortedTask = False hasAbortedTask = False
workerTimeout = False workerTimeout = False
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout): while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
if not gConfig.debug: # print this only if we are not in debug mode if not gConfig.debug: # print this only if we are not in debug mode
print(".", end="", flush=True) Progress.emit(Progress.STEP_BOUNDARY)
# print(".", end="", flush=True)
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
# memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss # memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage # print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
...@@ -397,8 +384,9 @@ class ThreadCoordinator: ...@@ -397,8 +384,9 @@ class ThreadCoordinator:
try: try:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP)
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
logger.info("Main loop aborted, caused by worker thread time-out") Logging.info("Main loop aborted, caused by worker thread time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, important thread info:") print("\n\nWorker Thread time-out detected, important thread info:")
ts = ThreadStacks() ts = ThreadStacks()
...@@ -411,7 +399,7 @@ class ThreadCoordinator: ...@@ -411,7 +399,7 @@ class ThreadCoordinator:
# threads are QUIET. # threads are QUIET.
hasAbortedTask = self._hasAbortedTask() # from previous step hasAbortedTask = self._hasAbortedTask() # from previous step
if hasAbortedTask: if hasAbortedTask:
logger.info("Aborted task encountered, exiting test program") Logging.info("Aborted task encountered, exiting test program")
self._execStats.registerFailure("Aborted Task Encountered") self._execStats.registerFailure("Aborted Task Encountered")
break # do transition only if tasks are error free break # do transition only if tasks are error free
...@@ -422,29 +410,30 @@ class ThreadCoordinator: ...@@ -422,29 +410,30 @@ class ThreadCoordinator:
transitionFailed = True transitionFailed = True
errno2 = Helper.convertErrno(err.errno) # correct error scheme errno2 = Helper.convertErrno(err.errno) # correct error scheme
errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err) errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
logger.info(errMsg) Logging.info(errMsg)
traceback.print_exc() traceback.print_exc()
self._execStats.registerFailure(errMsg) self._execStats.registerFailure(errMsg)
# Then we move on to the next step # Then we move on to the next step
Progress.emit(Progress.BEGIN_THREAD_STEP)
self._releaseAllWorkerThreads(transitionFailed) self._releaseAllWorkerThreads(transitionFailed)
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
logger.debug("Abnormal ending of main thraed") Logging.debug("Abnormal ending of main thraed")
elif workerTimeout: elif workerTimeout:
logger.debug("Abnormal ending of main thread, due to worker timeout") Logging.debug("Abnormal ending of main thread, due to worker timeout")
else: # regular ending, workers waiting at "barrier" else: # regular ending, workers waiting at "barrier"
logger.debug("Regular ending, main thread waiting for all worker threads to stop...") Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
self._syncAtBarrier() self._syncAtBarrier()
self._te = None # No more executor, time to end self._te = None # No more executor, time to end
logger.debug("Main thread tapping all threads one last time...") Logging.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time self.tapAllThreads() # Let the threads run one last time
logger.debug("\r\n\n--> Main thread ready to finish up...") Logging.debug("\r\n\n--> Main thread ready to finish up...")
logger.debug("Main thread joining all threads") Logging.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish self._pool.joinAll() # Get all threads to finish
logger.info("\nAll worker threads finished") Logging.info("\nAll worker threads finished")
self._execStats.endExec() self._execStats.endExec()
def cleanup(self): # free resources def cleanup(self): # free resources
...@@ -476,7 +465,7 @@ class ThreadCoordinator: ...@@ -476,7 +465,7 @@ class ThreadCoordinator:
wakeSeq.append(i) wakeSeq.append(i)
else: else:
wakeSeq.insert(0, i) wakeSeq.insert(0, i)
logger.debug( Logging.debug(
"[TRD] Main thread waking up worker threads: {}".format( "[TRD] Main thread waking up worker threads: {}".format(
str(wakeSeq))) str(wakeSeq)))
# TODO: set dice seed to a deterministic value # TODO: set dice seed to a deterministic value
...@@ -524,13 +513,6 @@ class ThreadCoordinator: ...@@ -524,13 +513,6 @@ class ThreadCoordinator:
with self._lock: with self._lock:
self._executedTasks.append(task) self._executedTasks.append(task)
# We define a class to run a number of threads in locking steps.
class Helper:
@classmethod
def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno
class ThreadPool: class ThreadPool:
def __init__(self, numThreads, maxSteps): def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads self.numThreads = numThreads
...@@ -548,7 +530,7 @@ class ThreadPool: ...@@ -548,7 +530,7 @@ class ThreadPool:
def joinAll(self): def joinAll(self):
for workerThread in self.threadList: for workerThread in self.threadList:
logger.debug("Joining thread...") Logging.debug("Joining thread...")
workerThread._thread.join() workerThread._thread.join()
def cleanup(self): def cleanup(self):
...@@ -605,7 +587,7 @@ class LinearQueue(): ...@@ -605,7 +587,7 @@ class LinearQueue():
def allocate(self, i): def allocate(self, i):
with self._lock: with self._lock:
# logger.debug("LQ allocating item {}".format(i)) # Logging.debug("LQ allocating item {}".format(i))
if (i in self.inUse): if (i in self.inUse):
raise RuntimeError( raise RuntimeError(
"Cannot re-use same index in queue: {}".format(i)) "Cannot re-use same index in queue: {}".format(i))
...@@ -613,7 +595,7 @@ class LinearQueue(): ...@@ -613,7 +595,7 @@ class LinearQueue():
def release(self, i): def release(self, i):
with self._lock: with self._lock:
# logger.debug("LQ releasing item {}".format(i)) # Logging.debug("LQ releasing item {}".format(i))
self.inUse.remove(i) # KeyError possible, TODO: why? self.inUse.remove(i) # KeyError possible, TODO: why?
def size(self): def size(self):
...@@ -673,9 +655,12 @@ class DbConn: ...@@ -673,9 +655,12 @@ class DbConn:
# below implemented by child classes # below implemented by child classes
self.openByType() self.openByType()
logger.debug("[DB] data connection opened, type = {}".format(self._type)) Logging.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True self.isOpen = True
def close(self):
raise RuntimeError("Unexpected execution, should be overriden")
def queryScalar(self, sql) -> int: def queryScalar(self, sql) -> int:
return self._queryAny(sql) return self._queryAny(sql)
...@@ -755,7 +740,7 @@ class DbConnRest(DbConn): ...@@ -755,7 +740,7 @@ class DbConnRest(DbConn):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open") raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST # Do nothing for REST
logger.debug("[DB] REST Database connection closed") Logging.debug("[DB] REST Database connection closed")
self.isOpen = False self.isOpen = False
def _doSql(self, sql): def _doSql(self, sql):
...@@ -793,9 +778,9 @@ class DbConnRest(DbConn): ...@@ -793,9 +778,9 @@ class DbConnRest(DbConn):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError(
"Cannot execute database commands until connection is open") "Cannot execute database commands until connection is open")
logger.debug("[SQL-REST] Executing SQL: {}".format(sql)) Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql) nRows = self._doSql(sql)
logger.debug( Logging.debug(
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows return nRows
...@@ -884,127 +869,6 @@ class MyTDSql: ...@@ -884,127 +869,6 @@ class MyTDSql:
raise raise
return self.affectedRows return self.affectedRows
class TdeInstance():
"""
A class to capture the *static* information of a TDengine instance,
including the location of the various files/directories, and basica
configuration.
"""
@classmethod
def _getBuildPath(cls):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("communit")]
else:
projPath = selfPath[:selfPath.find("tests")]
buildPath = None
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
.format(selfPath, projPath))
return buildPath
def __init__(self, subdir='test'):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir)
def generateCfgFile(self):
# buildPath = self.getBuildPath()
# taosdPath = self._buildPath + "/build/bin/taosd"
cfgDir = self.getCfgDir()
cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
if os.path.exists(cfgFile):
if os.path.isfile(cfgFile):
logger.warning("Config file exists already, skip creation: {}".format(cfgFile))
return # cfg file already exists, nothing to do
else:
raise CrashGenError("Invalid config file: {}".format(cfgFile))
# Now that the cfg file doesn't exist
if os.path.exists(cfgDir):
if not os.path.isdir(cfgDir):
raise CrashGenError("Invalid config dir: {}".format(cfgDir))
# else: good path
else:
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
# Now we have a good cfg dir
cfgValues = {
'runDir': self.getRunDir(),
'ip': '127.0.0.1', # TODO: change to a network addressable ip
'port': 6030,
}
cfgTemplate = """
dataDir {runDir}/data
logDir {runDir}/log
charset UTF-8
firstEp {ip}:{port}
fqdn {ip}
serverPort {port}
# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
#
# maxConnections 100
"""
cfgContent = cfgTemplate.format_map(cfgValues)
f = open(cfgFile, "w")
f.write(cfgContent)
f.close()
def rotateLogs(self):
logPath = self.getLogDir()
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd"
def getRunDir(self): # TODO: rename to "root dir" ?!
return self._buildDir + self._subdir
def getCfgDir(self): # path, not file
return self.getRunDir() + "/cfg"
def getLogDir(self):
return self.getRunDir() + "/log"
def getHostAddr(self):
return "127.0.0.1"
def getServiceCommand(self): # to start the instance
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
class DbConnNative(DbConn): class DbConnNative(DbConn):
# Class variables # Class variables
_lock = threading.Lock() _lock = threading.Lock()
...@@ -1028,7 +892,7 @@ class DbConnNative(DbConn): ...@@ -1028,7 +892,7 @@ class DbConnNative(DbConn):
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
if not cls._connInfoDisplayed: if not cls._connInfoDisplayed:
cls._connInfoDisplayed = True # updating CLASS variable cls._connInfoDisplayed = True # updating CLASS variable
logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath)) Logging.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
# Make the connection # Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor() # self._cursor = self._conn.cursor()
...@@ -1052,16 +916,16 @@ class DbConnNative(DbConn): ...@@ -1052,16 +916,16 @@ class DbConnNative(DbConn):
with cls._lock: with cls._lock:
cls.totalConnections -= 1 cls.totalConnections -= 1
logger.debug("[DB] Database connection closed") Logging.debug("[DB] Database connection closed")
self.isOpen = False self.isOpen = False
def execute(self, sql): def execute(self, sql):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open") raise RuntimeError("Cannot execute database commands until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.execute(sql) nRows = self._tdSql.execute(sql)
logger.debug( Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format( "[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
return nRows return nRows
...@@ -1070,10 +934,10 @@ class DbConnNative(DbConn): ...@@ -1070,10 +934,10 @@ class DbConnNative(DbConn):
if (not self.isOpen): if (not self.isOpen):
raise RuntimeError( raise RuntimeError(
"Cannot query database until connection is open") "Cannot query database until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.query(sql) nRows = self._tdSql.query(sql)
logger.debug( Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format( "[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
return nRows return nRows
...@@ -1337,7 +1201,7 @@ class StateMechine: ...@@ -1337,7 +1201,7 @@ class StateMechine:
def init(self, dbc: DbConn): # late initailization, don't save the dbConn def init(self, dbc: DbConn): # late initailization, don't save the dbConn
self._curState = self._findCurrentState(dbc) # starting state self._curState = self._findCurrentState(dbc) # starting state
logger.debug("Found Starting State: {}".format(self._curState)) Logging.debug("Found Starting State: {}".format(self._curState))
# TODO: seems no lnoger used, remove? # TODO: seems no lnoger used, remove?
def getCurrentState(self): def getCurrentState(self):
...@@ -1375,7 +1239,7 @@ class StateMechine: ...@@ -1375,7 +1239,7 @@ class StateMechine:
raise RuntimeError( raise RuntimeError(
"No suitable task types found for state: {}".format( "No suitable task types found for state: {}".format(
self._curState)) self._curState))
logger.debug( Logging.debug(
"[OPS] Tasks found for state {}: {}".format( "[OPS] Tasks found for state {}: {}".format(
self._curState, self._curState,
typesToStrings(taskTypes))) typesToStrings(taskTypes)))
...@@ -1385,27 +1249,27 @@ class StateMechine: ...@@ -1385,27 +1249,27 @@ class StateMechine:
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
dbName =self._db.getName() dbName =self._db.getName()
if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?! if not dbc.existsDatabase(dbName): # dbc.hasDatabases(): # no database?!
logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time())) Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty() return StateEmpty()
# did not do this when openning connection, and this is NOT the worker # did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own # thread, which does this on their own
dbc.use(dbName) dbc.use(dbName)
if not dbc.hasTables(): # no tables if not dbc.hasTables(): # no tables
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly() return StateDbOnly()
sTable = self._db.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc, dbName): # no regular tables if sTable.hasRegTables(dbc, dbName): # no regular tables
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly() return StateSuperTableOnly()
else: # has actual tables else: # has actual tables
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
return StateHasData() return StateHasData()
# We transition the system to a new state by examining the current state itself # We transition the system to a new state by examining the current state itself
def transition(self, tasks, dbc: DbConn): def transition(self, tasks, dbc: DbConn):
if (len(tasks) == 0): # before 1st step, or otherwise empty if (len(tasks) == 0): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState)) Logging.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing return # do nothing
# this should show up in the server log, separating steps # this should show up in the server log, separating steps
...@@ -1441,7 +1305,7 @@ class StateMechine: ...@@ -1441,7 +1305,7 @@ class StateMechine:
# Nothing for sure # Nothing for sure
newState = self._findCurrentState(dbc) newState = self._findCurrentState(dbc)
logger.debug("[STT] New DB state determined: {}".format(newState)) Logging.debug("[STT] New DB state determined: {}".format(newState))
# can old state move to new state through the tasks? # can old state move to new state through the tasks?
self._curState.verifyTasksToState(tasks, newState) self._curState.verifyTasksToState(tasks, newState)
self._curState = newState self._curState = newState
...@@ -1459,7 +1323,7 @@ class StateMechine: ...@@ -1459,7 +1323,7 @@ class StateMechine:
# read data task, default to 10: TODO: change to a constant # read data task, default to 10: TODO: change to a constant
weights.append(10) weights.append(10)
i = self._weighted_choice_sub(weights) i = self._weighted_choice_sub(weights)
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i] return taskTypes[i]
# ref: # ref:
...@@ -1538,7 +1402,7 @@ class Database: ...@@ -1538,7 +1402,7 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
logger.info("Setting up TICKS to start from: {}".format(t4)) Logging.info("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
@classmethod @classmethod
...@@ -1689,10 +1553,10 @@ class TaskExecutor(): ...@@ -1689,10 +1553,10 @@ class TaskExecutor():
self._boundedList.add(n) self._boundedList.add(n)
# def logInfo(self, msg): # def logInfo(self, msg):
# logger.info(" T[{}.x]: ".format(self._curStep) + msg) # Logging.info(" T[{}.x]: ".format(self._curStep) + msg)
# def logDebug(self, msg): # def logDebug(self, msg):
# logger.debug(" T[{}.x]: ".format(self._curStep) + msg) # Logging.debug(" T[{}.x]: ".format(self._curStep) + msg)
class Task(): class Task():
...@@ -1705,7 +1569,7 @@ class Task(): ...@@ -1705,7 +1569,7 @@ class Task():
@classmethod @classmethod
def allocTaskNum(cls): def allocTaskNum(cls):
Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
# logger.debug("Allocating taskSN: {}".format(Task.taskSn)) # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
return Task.taskSn return Task.taskSn
def __init__(self, execStats: ExecutionStats, db: Database): def __init__(self, execStats: ExecutionStats, db: Database):
...@@ -1717,7 +1581,7 @@ class Task(): ...@@ -1717,7 +1581,7 @@ class Task():
# Assign an incremental task serial number # Assign an incremental task serial number
self._taskNum = self.allocTaskNum() self._taskNum = self.allocTaskNum()
# logger.debug("Creating new task {}...".format(self._taskNum)) # Logging.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats self._execStats = execStats
self._db = db # A task is always associated/for a specific DB self._db = db # A task is always associated/for a specific DB
...@@ -1781,7 +1645,7 @@ class Task(): ...@@ -1781,7 +1645,7 @@ class Task():
elif msg.find("duplicated column names") != -1: # also alter table tag issues elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True return True
elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ... elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
logger.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg)) Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
return True return True
return False # Not an acceptable error return False # Not an acceptable error
...@@ -1922,13 +1786,13 @@ class ExecutionStats: ...@@ -1922,13 +1786,13 @@ class ExecutionStats:
self._failureReason = reason self._failureReason = reason
def printStats(self): def printStats(self):
logger.info( Logging.info(
"----------------------------------------------------------------------") "----------------------------------------------------------------------")
logger.info( Logging.info(
"| Crash_Gen test {}, with the following stats:". format( "| Crash_Gen test {}, with the following stats:". format(
"FAILED (reason: {})".format( "FAILED (reason: {})".format(
self._failureReason) if self._failed else "SUCCEEDED")) self._failureReason) if self._failed else "SUCCEEDED"))
logger.info("| Task Execution Times (success/total):") Logging.info("| Task Execution Times (success/total):")
execTimesAny = 0.001 # avoid div by zero execTimesAny = 0.001 # avoid div by zero
for k, n in self._execTimes.items(): for k, n in self._execTimes.items():
execTimesAny += n[0] execTimesAny += n[0]
...@@ -1939,28 +1803,28 @@ class ExecutionStats: ...@@ -1939,28 +1803,28 @@ class ExecutionStats:
errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()] errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
# print("error strings = {}".format(errStrs)) # print("error strings = {}".format(errStrs))
errStr = ", ".join(errStrs) errStr = ", ".join(errStrs)
logger.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr)) Logging.info("| {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
logger.info( Logging.info(
"| Total Tasks Executed (success or not): {} ".format(execTimesAny)) "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
logger.info( Logging.info(
"| Total Tasks In Progress at End: {}".format( "| Total Tasks In Progress at End: {}".format(
self._tasksInProgress)) self._tasksInProgress))
logger.info( Logging.info(
"| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format( "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
self._accRunTime)) self._accRunTime))
logger.info( Logging.info(
"| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny)) "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
logger.info( Logging.info(
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format( "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
self._elapsedTime)) self._elapsedTime))
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections)) Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
logger.info("| Longest native query time: {:.3f} seconds, started: {}". Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
format(MyTDSql.longestQueryTime, format(MyTDSql.longestQueryTime,
time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) ) time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
logger.info("| Longest native query: {}".format(MyTDSql.longestQuery)) Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
logger.info( Logging.info(
"----------------------------------------------------------------------") "----------------------------------------------------------------------")
...@@ -2030,7 +1894,7 @@ class TaskDropDb(StateTransitionTask): ...@@ -2030,7 +1894,7 @@ class TaskDropDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self.execWtSql(wt, "drop database {}".format(self._db.getName())) self.execWtSql(wt, "drop database {}".format(self._db.getName()))
logger.debug("[OPS] database dropped at {}".format(time.time())) Logging.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask):
@classmethod @classmethod
...@@ -2043,7 +1907,7 @@ class TaskCreateSuperTable(StateTransitionTask): ...@@ -2043,7 +1907,7 @@ class TaskCreateSuperTable(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not self._db.exists(wt.getDbConn()): if not self._db.exists(wt.getDbConn()):
logger.debug("Skipping task, no DB yet") Logging.debug("Skipping task, no DB yet")
return return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
...@@ -2078,7 +1942,7 @@ class TdSuperTable: ...@@ -2078,7 +1942,7 @@ class TdSuperTable:
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise raise
qr = dbc.getQueryResult() qr = dbc.getQueryResult()
...@@ -2193,7 +2057,7 @@ class TaskReadData(StateTransitionTask): ...@@ -2193,7 +2057,7 @@ class TaskReadData(StateTransitionTask):
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName())) dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise raise
class TaskDropSuperTable(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask):
...@@ -2224,7 +2088,7 @@ class TaskDropSuperTable(StateTransitionTask): ...@@ -2224,7 +2088,7 @@ class TaskDropSuperTable(StateTransitionTask):
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
if (errno2 in [0x362]): # mnode invalid table name if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False isSuccess = False
logger.debug("[DB] Acceptable error when dropping a table") Logging.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table continue # try to delete next regular table
if (not tickOutput): if (not tickOutput):
...@@ -2304,20 +2168,19 @@ class TaskAddData(StateTransitionTask): ...@@ -2304,20 +2168,19 @@ class TaskAddData(StateTransitionTask):
# Track which table is being actively worked on # Track which table is being actively worked on
activeTable: Set[int] = set() activeTable: Set[int] = set()
# We use these two files to record operations to DB, useful for power-off # We use these two files to record operations to DB, useful for power-off tests
# tests fAddLogReady = None # type: TextIOWrapper
fAddLogReady = None fAddLogDone = None # type: TextIOWrapper
fAddLogDone = None
@classmethod @classmethod
def prepToRecordOps(cls): def prepToRecordOps(cls):
if gConfig.record_ops: if gConfig.record_ops:
if (cls.fAddLogReady is None): if (cls.fAddLogReady is None):
logger.info( Logging.info(
"Recording in a file operations to be performed...") "Recording in a file operations to be performed...")
cls.fAddLogReady = open("add_log_ready.txt", "w") cls.fAddLogReady = open("add_log_ready.txt", "w")
if (cls.fAddLogDone is None): if (cls.fAddLogDone is None):
logger.info("Recording in a file operations completed...") Logging.info("Recording in a file operations completed...")
cls.fAddLogDone = open("add_log_done.txt", "w") cls.fAddLogDone = open("add_log_done.txt", "w")
@classmethod @classmethod
...@@ -2393,553 +2256,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2393,553 +2256,8 @@ class TaskAddData(StateTransitionTask):
self.activeTable.discard(i) # not raising an error, unlike remove self.activeTable.discard(i) # not raising an error, unlike remove
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
@classmethod
def seed(cls, s): # static
if (cls.seeded):
raise RuntimeError(
"Cannot seed the random generator more than once")
cls.verifyRNG()
random.seed(s)
cls.seeded = True # TODO: protect against multi-threading
@classmethod
def verifyRNG(cls): # Verify that the RNG is determinstic
random.seed(0)
x1 = random.randrange(0, 1000)
x2 = random.randrange(0, 1000)
x3 = random.randrange(0, 1000)
if (x1 != 864 or x2 != 394 or x3 != 776):
raise RuntimeError("System RNG is not deterministic")
@classmethod
def throw(cls, stop): # get 0 to stop-1
return cls.throwRange(0, stop)
@classmethod
def throwRange(cls, start, stop): # up to stop-1
if (not cls.seeded):
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
@classmethod
def choice(cls, cList):
return random.choice(cList)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if (record.levelno >= logging.INFO):
return True # info or above always log
# Commenting out below to adjust...
# if msg.startswith("[TRD]"):
# return False
return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self, numDnodes = 1):
logger.info("TDengine Service Manager (TSM) created")
self._numDnodes = numDnodes # >1 means we have a cluster
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self.svcMgrThreads = [] # type: List[ServiceManagerThread]
for i in range(0, numDnodes):
self.svcMgrThreads.append(ServiceManagerThread(i))
self._lock = threading.Lock()
# self._isRestarting = False
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Service Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Restart")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1")
if self.inSigHandler: # already
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2":
self.stopTaosServices()
elif choice == "3": # Restart
self.restart()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("ServiceManager: INT Signal Handler starting...")
if self.inSigHandler:
print("Ignoring repeated SIG_INT...")
return
self.inSigHandler = True
self.stopTaosServices()
print("ServiceManager: INT Signal Handler returning...")
self.inSigHandler = False
def sigHandlerResume(self):
print("Resuming TDengine service manager (main thread)...\n\n")
# def _updateThreadStatus(self):
# if self.svcMgrThread: # valid svc mgr thread
# if self.svcMgrThread.isStopped(): # done?
# self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
# self.svcMgrThread = None # no more
def isActive(self):
"""
Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped".
"""
for thread in self.svcMgrThreads:
if not thread.isStopped():
return True
return False
# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
# one thread is in "restarting" status
# """
# for thread in self.svcMgrThreads:
# if thread.isRestarting():
# return True
# return False
def isStable(self):
"""
Determine if the service/cluster is "stable", i.e. all of the
threads are in "stable" status.
"""
for thread in self.svcMgrThreads:
if not thread.isStable():
return False
return True
def _procIpcAll(self):
while self.isActive():
for thread in self.svcMgrThreads: # all thread objects should always be valid
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
if thread.isRunning():
thread.procIpcBatch() # regular processing,
if thread.isStopped():
thread.procIpcBatch() # one last time?
# self._updateThreadStatus()
elif thread.isRetarting():
print("Service restarting...")
# else this thread is stopped
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def startTaosServices(self):
with self._lock:
if self.isActive():
raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
time.sleep(2.0)
proc.kill()
# print("Process: {}".format(proc.name()))
# self.svcMgrThread = ServiceManagerThread() # create the object
for thread in self.svcMgrThreads:
thread.start()
thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self):
with self._lock:
if not self.isActive():
logger.warning("Cannot stop TAOS service(s), already not active")
return
for thread in self.svcMgrThreads:
thread.stop()
def run(self):
self.startTaosServices()
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
if self.isActive(): # if sig handler hasn't destroyed it by now
self.stopTaosServices() # should have started already
def restart(self):
if not self.isStable():
logger.warning("Cannot restart service/cluster, when not stable")
return
# self._isRestarting = True
if self.isActive():
self.stopTaosServices()
else:
logger.warning("Service not active when restart requested")
self.startTaosService()
# self._isRestarting = False
# def isRunning(self):
# return self.svcMgrThread != None
# def isRestarting(self):
# return self._isRestarting
class ServiceManagerThread:
"""
A class representing a dedicated thread which manages the "sub process"
of the TDengine service, interacting with its STDOUT/ERR.
It takes a TdeInstance parameter at creation time, or create a default
"""
MAX_QUEUE_SIZE = 10000
def __init__(self, tInstNum = 0, tInst : TdeInstance = None):
# Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance
self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None # The actual thread, # type: threading.Thread
self._status = MainExec.STATUS_STOPPED # The status of the underlying service, actually.
def __repr__(self):
return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum)
def getStatus(self):
return self._status
def isStarting(self):
return self._status == MainExec.STATUS_STARTING
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == MainExec.STATUS_RUNNING
def isStopping(self):
return self._status == MainExec.STATUS_STOPPING
def isStopped(self):
return self._status == MainExec.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self):
if self._thread:
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess:
raise RuntimeError("TDengine sub process already created/running")
logger.info("Attempting to start TAOS service: {}".format(self))
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess(self._tInst)
self._tdeSubProcess.start()
self._ipcQueue = Queue()
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
# wait for service to start
for i in range(0, 100):
time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status == MainExec.STATUS_RUNNING:
logger.info("[] TDengine service READY to process requests")
logger.info("[] TAOS service started: {}".format(self))
return # now we've started
# TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.isStopped():
print("Service already stopped")
return
if self.isStopping():
print("Service is already being stopped")
return
# Linux will send Control-C generated SIGINT to the TDengine process
# already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess:
raise RuntimeError("sub process object missing")
self._status = MainExec.STATUS_STOPPING
retCode = self._tdeSubProcess.stop()
print("Attempted to stop sub process, got return code: {}".format(retCode))
if (retCode==-11): # SGV
logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
if self._tdeSubProcess.isRunning(): # still running
print("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid()))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
# Check if it's really stopped
outputLines = 20 # for last output
if self.isStopped():
self.procIpcBatch(outputLines) # one last time
print("End of TDengine Service Output: {}".format(self))
print("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
print("WARNING: SMT did not terminate as expected: {}".format(self))
def join(self):
# TODO: sanity check
if not self.isStopping():
raise RuntimeError(
"Unexpected status when ending svc mgr thread: {}".format(
self._status))
if self._thread:
self._thread.join()
self._thread = None
self._status = MainExec.STATUS_STOPPED
# STD ERR thread
self._thread2.join()
self._thread2 = None
else:
print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize): # no need to trim
return
logger.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim):
try:
q.get_nowait()
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully"
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process,
# managed by IO thread
print("<", end="", flush=True)
while True:
try:
line = self._ipcQueue.get_nowait() # getting output at fast speed
self._printProgress("_o")
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH
else: # got line, printing out
if forceOutput:
logger.info(line)
else:
logger.debug(line)
print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
try:
line = line.decode("utf-8").rstrip()
except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(line))
# This might block, and then causing "out" buffer to block
queue.put(line)
self._printProgress("_i")
if self._status == MainExec.STATUS_STARTING: # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found
logger.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
logger.info("Service instance #{} is now FULLY READY".format(self._tInstNum))
self._status = MainExec.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu
print("_w", end="", flush=True)
# queue.put(line)
# meaning sub process must have died
print("\nNo more output from IO thread managing TDengine service")
out.close()
def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''):
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
class TdeSubProcess:
"""
A class to to represent the actual sub process that is the run-time
of a TDengine instance.
It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance".
"""
def __init__(self, tInst : TdeInstance):
self.subProcess = None
if tInst is None:
raise CrashGenError("Empty instance not allowed in TdeSubProcess")
self._tInst = tInst # Default create at ServiceManagerThread
def getStdOut(self):
return self.subProcess.stdout
def getStdErr(self):
return self.subProcess.stderr
def isRunning(self):
return self.subProcess is not None
def getPid(self):
return self.subProcess.pid
# Repalced by TdeInstance class
# def getBuildPath(self):
# selfPath = os.path.dirname(os.path.realpath(__file__))
# if ("community" in selfPath):
# projPath = selfPath[:selfPath.find("communit")]
# else:
# projPath = selfPath[:selfPath.find("tests")]
# for root, dirs, files in os.walk(projPath):
# if ("taosd" in files):
# rootRealPath = os.path.dirname(os.path.realpath(root))
# if ("packaging" not in rootRealPath):
# buildPath = root[:len(root) - len("/build/bin")]
# break
# return buildPath
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
if self.subProcess: # already there
raise RuntimeError("Corrupt process state")
# global gContainer
# tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance
self._tInst.generateCfgFile() # service side generates config file, client does not
self._tInst.rotateLogs()
print("Starting TDengine instance: {}".format(self._tInst))
self.subProcess = subprocess.Popen(
self._tInst.getServiceCommand(),
shell=False,
# svcCmdSingle, shell=True, # capture core dump?
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX
) # had text=True, which interferred with reading EOF
def stop(self):
if not self.subProcess:
print("Sub process already stopped")
return -1
retCode = self.subProcess.poll() # contains real sub process return code
if retCode: # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print(
"Sub process is running, sending SIG_INT and waiting for it to terminate...")
# sub process should end, then IPC queue should end, causing IO
# thread to end
self.subProcess.send_signal(signal.SIGINT)
try:
self.subProcess.wait(10)
retCode = self.subProcess.returncode
except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
retCode = -3
else:
print("TDengine service process terminated successfully from SIG_INT")
retCode = -4
self.subProcess = None
return retCode
class ThreadStacks: # stack info for all threads class ThreadStacks: # stack info for all threads
def __init__(self): def __init__(self):
...@@ -2976,17 +2294,17 @@ class ClientManager: ...@@ -2976,17 +2294,17 @@ class ClientManager:
# signal.signal(signal.SIGTERM, self.sigIntHandler) # signal.signal(signal.SIGTERM, self.sigIntHandler)
# signal.signal(signal.SIGINT, self.sigIntHandler) # signal.signal(signal.SIGINT, self.sigIntHandler)
self._status = MainExec.STATUS_RUNNING self._status = Status.STATUS_RUNNING
self.tc = None self.tc = None
self.inSigHandler = False self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
if self._status != MainExec.STATUS_RUNNING: if self._status != Status.STATUS_RUNNING:
print("Repeated SIGINT received, forced exit...") print("Repeated SIGINT received, forced exit...")
# return # do nothing if it's already not running # return # do nothing if it's already not running
sys.exit(-1) sys.exit(-1)
self._status = MainExec.STATUS_STOPPING # immediately set our status self._status = Status.STATUS_STOPPING # immediately set our status
print("ClientManager: Terminating program...") print("ClientManager: Terminating program...")
self.tc.requestToStop() self.tc.requestToStop()
...@@ -3110,11 +2428,6 @@ class ClientManager: ...@@ -3110,11 +2428,6 @@ class ClientManager:
self.tc.printStats() self.tc.printStats()
class MainExec: class MainExec:
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
STATUS_STOPPED = 4
def __init__(self): def __init__(self):
self._clientMgr = None self._clientMgr = None
self._svcMgr = None self._svcMgr = None
...@@ -3147,7 +2460,7 @@ class MainExec: ...@@ -3147,7 +2460,7 @@ class MainExec:
try: try:
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err: except requests.exceptions.ConnectionError as err:
logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage())) Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
# don't raise # don't raise
return ret return ret
...@@ -3255,20 +2568,7 @@ class MainExec: ...@@ -3255,20 +2568,7 @@ class MainExec:
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
# Logging Stuff Logging.clsInit(gConfig)
global logger
_logger = logging.getLogger('CrashGen') # real logger
_logger.addFilter(LoggingFilter())
ch = logging.StreamHandler()
_logger.addHandler(ch)
# Logging adapter, to be used as a logger
logger = MyLoggingAdapter(_logger, [])
if (gConfig.debug):
logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
logger.setLevel(logging.INFO)
Dice.seed(0) # initial seeding of dice Dice.seed(0) # initial seeding of dice
......
import threading
import random
import logging
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if (record.levelno >= logging.INFO):
return True # info or above always log
# Commenting out below to adjust...
# if msg.startswith("[TRD]"):
# return False
return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class Logging:
logger = None
@classmethod
def getLogger(cls):
return logger
@classmethod
def clsInit(cls, gConfig): # TODO: refactor away gConfig
if cls.logger:
return
# Logging Stuff
# global misc.logger
_logger = logging.getLogger('CrashGen') # real logger
_logger.addFilter(LoggingFilter())
ch = logging.StreamHandler()
_logger.addHandler(ch)
# Logging adapter, to be used as a logger
print("setting logger variable")
# global logger
cls.logger = MyLoggingAdapter(_logger, [])
if (gConfig.debug):
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
cls.logger.setLevel(logging.INFO)
@classmethod
def info(cls, msg):
cls.logger.info(msg)
@classmethod
def debug(cls, msg):
cls.logger.debug(msg)
@classmethod
def warning(cls, msg):
cls.logger.warning(msg)
class Status:
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
STATUS_STOPPED = 4
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
@classmethod
def seed(cls, s): # static
if (cls.seeded):
raise RuntimeError(
"Cannot seed the random generator more than once")
cls.verifyRNG()
random.seed(s)
cls.seeded = True # TODO: protect against multi-threading
@classmethod
def verifyRNG(cls): # Verify that the RNG is determinstic
random.seed(0)
x1 = random.randrange(0, 1000)
x2 = random.randrange(0, 1000)
x3 = random.randrange(0, 1000)
if (x1 != 864 or x2 != 394 or x3 != 776):
raise RuntimeError("System RNG is not deterministic")
@classmethod
def throw(cls, stop): # get 0 to stop-1
return cls.throwRange(0, stop)
@classmethod
def throwRange(cls, start, stop): # up to stop-1
if (not cls.seeded):
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
@classmethod
def choice(cls, cList):
return random.choice(cList)
class Helper:
@classmethod
def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno
class Progress:
STEP_BOUNDARY = 0
BEGIN_THREAD_STEP = 1
END_THREAD_STEP = 2
tokens = {
STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[',
END_THREAD_STEP: '] '
}
@classmethod
def emit(cls, token):
print(cls.tokens[token], end="", flush=True)
import os
import io
import sys
import threading
import signal
import logging
import time
import subprocess
from typing import IO
try:
import psutil
except:
print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1)
from queue import Queue, Empty
from .misc import Logging, Status, CrashGenError, Dice
class TdeInstance():
"""
A class to capture the *static* information of a TDengine instance,
including the location of the various files/directories, and basica
configuration.
"""
@classmethod
def _getBuildPath(cls):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("communit")]
else:
projPath = selfPath[:selfPath.find("tests")]
buildPath = None
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
.format(selfPath, projPath))
return buildPath
def __init__(self, subdir='test'):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir)
def generateCfgFile(self):
# print("Logger = {}".format(logger))
# buildPath = self.getBuildPath()
# taosdPath = self._buildPath + "/build/bin/taosd"
cfgDir = self.getCfgDir()
cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
if os.path.exists(cfgFile):
if os.path.isfile(cfgFile):
Logging.warning("Config file exists already, skip creation: {}".format(cfgFile))
return # cfg file already exists, nothing to do
else:
raise CrashGenError("Invalid config file: {}".format(cfgFile))
# Now that the cfg file doesn't exist
if os.path.exists(cfgDir):
if not os.path.isdir(cfgDir):
raise CrashGenError("Invalid config dir: {}".format(cfgDir))
# else: good path
else:
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
# Now we have a good cfg dir
cfgValues = {
'runDir': self.getRunDir(),
'ip': '127.0.0.1', # TODO: change to a network addressable ip
'port': 6030,
}
cfgTemplate = """
dataDir {runDir}/data
logDir {runDir}/log
charset UTF-8
firstEp {ip}:{port}
fqdn {ip}
serverPort {port}
# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
#
# maxConnections 100
"""
cfgContent = cfgTemplate.format_map(cfgValues)
f = open(cfgFile, "w")
f.write(cfgContent)
f.close()
def rotateLogs(self):
logPath = self.getLogDir()
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
Logging.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd"
def getRunDir(self): # TODO: rename to "root dir" ?!
return self._buildDir + self._subdir
def getCfgDir(self): # path, not file
return self.getRunDir() + "/cfg"
def getLogDir(self):
return self.getRunDir() + "/log"
def getHostAddr(self):
return "127.0.0.1"
def getServiceCommand(self): # to start the instance
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
class TdeSubProcess:
"""
A class to to represent the actual sub process that is the run-time
of a TDengine instance.
It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance".
"""
def __init__(self, tInst : TdeInstance):
self.subProcess = None
if tInst is None:
raise CrashGenError("Empty instance not allowed in TdeSubProcess")
self._tInst = tInst # Default create at ServiceManagerThread
def getStdOut(self):
return self.subProcess.stdout
def getStdErr(self):
return self.subProcess.stderr
def isRunning(self):
return self.subProcess is not None
def getPid(self):
return self.subProcess.pid
# Repalced by TdeInstance class
# def getBuildPath(self):
# selfPath = os.path.dirname(os.path.realpath(__file__))
# if ("community" in selfPath):
# projPath = selfPath[:selfPath.find("communit")]
# else:
# projPath = selfPath[:selfPath.find("tests")]
# for root, dirs, files in os.walk(projPath):
# if ("taosd" in files):
# rootRealPath = os.path.dirname(os.path.realpath(root))
# if ("packaging" not in rootRealPath):
# buildPath = root[:len(root) - len("/build/bin")]
# break
# return buildPath
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
if self.subProcess: # already there
raise RuntimeError("Corrupt process state")
# global gContainer
# tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance
self._tInst.generateCfgFile() # service side generates config file, client does not
self._tInst.rotateLogs()
print("Starting TDengine instance: {}".format(self._tInst))
self.subProcess = subprocess.Popen(
self._tInst.getServiceCommand(),
shell=False,
# svcCmdSingle, shell=True, # capture core dump?
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX
) # had text=True, which interferred with reading EOF
def stop(self):
if not self.subProcess:
print("Sub process already stopped")
return -1
retCode = self.subProcess.poll() # contains real sub process return code
if retCode: # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print(
"Sub process is running, sending SIG_INT and waiting for it to terminate...")
# sub process should end, then IPC queue should end, causing IO
# thread to end
self.subProcess.send_signal(signal.SIGINT)
try:
self.subProcess.wait(10)
retCode = self.subProcess.returncode
except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
retCode = -3
else:
print("TDengine service process terminated successfully from SIG_INT")
retCode = -4
self.subProcess = None
return retCode
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self, numDnodes = 1):
Logging.info("TDengine Service Manager (TSM) created")
self._numDnodes = numDnodes # >1 means we have a cluster
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self.svcMgrThreads = [] # type: List[ServiceManagerThread]
for i in range(0, numDnodes):
self.svcMgrThreads.append(ServiceManagerThread(i))
self._lock = threading.Lock()
# self._isRestarting = False
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Service Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Restart")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1")
if self.inSigHandler: # already
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2":
self.stopTaosServices()
elif choice == "3": # Restart
self.restart()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("ServiceManager: INT Signal Handler starting...")
if self.inSigHandler:
print("Ignoring repeated SIG_INT...")
return
self.inSigHandler = True
self.stopTaosServices()
print("ServiceManager: INT Signal Handler returning...")
self.inSigHandler = False
def sigHandlerResume(self):
print("Resuming TDengine service manager (main thread)...\n\n")
# def _updateThreadStatus(self):
# if self.svcMgrThread: # valid svc mgr thread
# if self.svcMgrThread.isStopped(): # done?
# self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
# self.svcMgrThread = None # no more
def isActive(self):
"""
Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped".
"""
for thread in self.svcMgrThreads:
if not thread.isStopped():
return True
return False
# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
# one thread is in "restarting" status
# """
# for thread in self.svcMgrThreads:
# if thread.isRestarting():
# return True
# return False
def isStable(self):
"""
Determine if the service/cluster is "stable", i.e. all of the
threads are in "stable" status.
"""
for thread in self.svcMgrThreads:
if not thread.isStable():
return False
return True
def _procIpcAll(self):
while self.isActive():
for thread in self.svcMgrThreads: # all thread objects should always be valid
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
if thread.isRunning():
thread.procIpcBatch() # regular processing,
if thread.isStopped():
thread.procIpcBatch() # one last time?
# self._updateThreadStatus()
elif thread.isRetarting():
print("Service restarting...")
# else this thread is stopped
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def startTaosServices(self):
with self._lock:
if self.isActive():
raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
time.sleep(2.0)
proc.kill()
# print("Process: {}".format(proc.name()))
# self.svcMgrThread = ServiceManagerThread() # create the object
for thread in self.svcMgrThreads:
thread.start()
thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self):
with self._lock:
if not self.isActive():
Logging.warning("Cannot stop TAOS service(s), already not active")
return
for thread in self.svcMgrThreads:
thread.stop()
def run(self):
self.startTaosServices()
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
if self.isActive(): # if sig handler hasn't destroyed it by now
self.stopTaosServices() # should have started already
def restart(self):
if not self.isStable():
Logging.warning("Cannot restart service/cluster, when not stable")
return
# self._isRestarting = True
if self.isActive():
self.stopTaosServices()
else:
Logging.warning("Service not active when restart requested")
self.startTaosService()
# self._isRestarting = False
# def isRunning(self):
# return self.svcMgrThread != None
# def isRestarting(self):
# return self._isRestarting
class ServiceManagerThread:
"""
A class representing a dedicated thread which manages the "sub process"
of the TDengine service, interacting with its STDOUT/ERR.
It takes a TdeInstance parameter at creation time, or create a default
"""
MAX_QUEUE_SIZE = 10000
def __init__(self, tInstNum = 0, tInst : TdeInstance = None):
# Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance
self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None # The actual thread, # type: threading.Thread
self._status = Status.STATUS_STOPPED # The status of the underlying service, actually.
def __repr__(self):
return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum)
def getStatus(self):
return self._status
def isStarting(self):
return self._status == Status.STATUS_STARTING
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == Status.STATUS_RUNNING
def isStopping(self):
return self._status == Status.STATUS_STOPPING
def isStopped(self):
return self._status == Status.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self):
if self._thread:
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess:
raise RuntimeError("TDengine sub process already created/running")
Logging.info("Attempting to start TAOS service: {}".format(self))
self._status = Status.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess(self._tInst)
self._tdeSubProcess.start()
self._ipcQueue = Queue()
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
# wait for service to start
for i in range(0, 100):
time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status == Status.STATUS_RUNNING:
Logging.info("[] TDengine service READY to process requests")
Logging.info("[] TAOS service started: {}".format(self))
return # now we've started
# TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.isStopped():
print("Service already stopped")
return
if self.isStopping():
print("Service is already being stopped")
return
# Linux will send Control-C generated SIGINT to the TDengine process
# already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess:
raise RuntimeError("sub process object missing")
self._status = Status.STATUS_STOPPING
retCode = self._tdeSubProcess.stop()
print("Attempted to stop sub process, got return code: {}".format(retCode))
if (retCode==-11): # SGV
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
if self._tdeSubProcess.isRunning(): # still running
print("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid()))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
# Check if it's really stopped
outputLines = 20 # for last output
if self.isStopped():
self.procIpcBatch(outputLines) # one last time
print("End of TDengine Service Output: {}".format(self))
print("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
print("WARNING: SMT did not terminate as expected: {}".format(self))
def join(self):
# TODO: sanity check
if not self.isStopping():
raise RuntimeError(
"Unexpected status when ending svc mgr thread: {}".format(
self._status))
if self._thread:
self._thread.join()
self._thread = None
self._status = Status.STATUS_STOPPED
# STD ERR thread
self._thread2.join()
self._thread2 = None
else:
print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize): # no need to trim
return
Logging.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim):
try:
q.get_nowait()
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully"
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process,
# managed by IO thread
print("<", end="", flush=True)
while True:
try:
line = self._ipcQueue.get_nowait() # getting output at fast speed
self._printProgress("_o")
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH
else: # got line, printing out
if forceOutput:
Logging.info(line)
else:
Logging.debug(line)
print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
try:
line = line.decode("utf-8").rstrip()
except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(line))
# This might block, and then causing "out" buffer to block
queue.put(line)
self._printProgress("_i")
if self._status == Status.STATUS_STARTING: # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service instance #{} is now FULLY READY".format(self._tInstNum))
self._status = Status.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu
print("_w", end="", flush=True)
# queue.put(line)
# meaning sub process must have died
print("\nNo more output from IO thread managing TDengine service")
out.close()
def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''):
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册