提交 5a92c415 编写于 作者: S Steven Li

Enhanced crash_gen tool to run clusters, with a new README file

上级 f7a0b6b8
<center><h1>User's Guide to the Crash_Gen Tool</h1></center>
# Introduction
To effectively test and debug our TDengine product, we have developed a simple tool to
exercise various functions of the system in a randomized fashion, hoping to expose
maximum number of problems, hopefully without a pre-determined scenario.
# Preparation
To run this tool, please ensure the followed preparation work is done first.
1. Fetch a copy of the TDengine source code, and build it successfully in the `build/`
directory
1. Ensure that the system has Python3.8 or above properly installed. We use
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
an environment if possible.
# Simple Execution
To run the tool with the simplest method, follow the steps below:
1. Open a terminal window, start the `taosd` service in the `build/` directory
(or however you prefer to start the `taosd` service)
1. Open another terminal window, go into the `tests/pytest/` directory, and
run `./crash_gen.sh -p -t 3 -s 10` (change the two parameters here as you wish)
1. Watch the output to the end and see if you get a `SUCCESS` or `FAILURE`
That's it!
# Running Clusters
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
can start a cluster quite easily with the following command:
```
$ cd tests/pytest/
$ ./crash_gen.sh -e -o 3
```
The `-e` option above tells the tool to start the service, and do not run any tests, while
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
Obviously you can adjust the the number here.
## Behind the Scenes
When the tool runs a cluster, it users a number of directories, each holding the information
for a single DNode, see:
```
$ ls build/cluster*
build/cluster_dnode_0:
cfg data log
build/cluster_dnode_1:
cfg data log
build/cluster_dnode_2:
cfg data log
```
Therefore, when something goes wrong and you want to reset everything with the cluster, simple
erase all the files:
```
$ rm -rf build/cluster_dnode_*
```
## Addresses and Ports
The DNodes in the cluster all binds the the `127.0.0.1` IP address (for now anyway), and
uses port 6030 for the first DNode, and 6130 for the 2nd one, and so on.
## Testing Against a Cluster
In a separate terminal window, you can invoke the tool in client mode and test against
a cluster, such as:
```
$ ./crash_gen.sh -p -t 10 -s 100 -i 3
```
Here the `-i` option tells the tool to always create tables with 3 replicas, and run
all tests against such tables.
# Additional Features
The exhaustive features of the tool is available through the `-h` option:
```
$ ./crash_gen.sh -h
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
---------------------------------------------------------------------
1. You build TDengine in the top level ./build directory, as described in offical docs
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
optional arguments:
-h, --help show this help message and exit
-a, --auto-start-service
Automatically start/stop the TDengine service (default: false)
-b MAX_DBS, --max-dbs MAX_DBS
Maximum number of DBs to keep, set to disable dropping DB. (default: 0)
-c CONNECTOR_TYPE, --connector-type CONNECTOR_TYPE
Connector type to use: native, rest, or mixed (default: 10)
-d, --debug Turn on DEBUG mode for more logging (default: false)
-e, --run-tdengine Run TDengine service in foreground (default: false)
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
Ignore error codes, comma separated, 0x supported (default: None)
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
Maximum number of replicas to use, when testing against clusters. (default: 1)
-l, --larger-data Write larger amount of data during write operations (default: false)
-n, --dynamic-db-table-names
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
-o NUM_DNODES, --num-dnodes NUM_DNODES
Number of Dnodes to initialize, used with -e option. (default: 1)
-p, --per-thread-db-connection
Use a single shared db connection (default: false)
-r, --record-ops Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)
-s MAX_STEPS, --max-steps MAX_STEPS
Maximum number of steps to run (default: 100)
-t NUM_THREADS, --num-threads NUM_THREADS
Number of threads to run (default: 10)
-v, --verify-data Verify data written in a number of places by reading back (default: false)
-x, --continue-on-exception
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
```
...@@ -18,6 +18,7 @@ from __future__ import annotations ...@@ -18,6 +18,7 @@ from __future__ import annotations
from typing import Set from typing import Set
from typing import Dict from typing import Dict
from typing import List from typing import List
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
import textwrap import textwrap
import time import time
...@@ -62,9 +63,10 @@ gContainer: Container ...@@ -62,9 +63,10 @@ gContainer: Container
class WorkerThread: class WorkerThread:
def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
# te: TaskExecutor, """
): # note: main thread context! Note: this runs in the main thread context
"""
# self._curStep = -1 # self._curStep = -1
self._pool = pool self._pool = pool
self._tid = tid self._tid = tid
...@@ -1007,6 +1009,8 @@ class Database: ...@@ -1007,6 +1009,8 @@ class Database:
possibly in a cluster environment. possibly in a cluster environment.
For now we use it to manage state transitions in that database For now we use it to manage state transitions in that database
TODO: consider moving, but keep in mind it contains "StateMachine"
''' '''
_clsLock = threading.Lock() # class wide lock _clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer _lastInt = 101 # next one is initial integer
...@@ -1182,7 +1186,7 @@ class Task(): ...@@ -1182,7 +1186,7 @@ class Task():
def __init__(self, execStats: ExecutionStats, db: Database): def __init__(self, execStats: ExecutionStats, db: Database):
self._workerThread = None self._workerThread = None
self._err = None # type: Exception self._err: Optional[Exception] = None
self._aborted = False self._aborted = False
self._curStep = None self._curStep = None
self._numRows = None # Number of rows affected self._numRows = None # Number of rows affected
...@@ -1318,10 +1322,11 @@ class Task(): ...@@ -1318,10 +1322,11 @@ class Task():
self._aborted = True self._aborted = True
traceback.print_exc() traceback.print_exc()
except BaseException: # TODO: what is this again??!! except BaseException: # TODO: what is this again??!!
self.logDebug( raise RuntimeError("Punt")
"[=] Unexpected exception, SQL: {}".format( # self.logDebug(
wt.getDbConn().getLastSql())) # "[=] Unexpected exception, SQL: {}".format(
raise # wt.getDbConn().getLastSql()))
# raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format( self.logDebug("[X] task execution completed, {}, status: {}".format(
...@@ -1498,7 +1503,8 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1498,7 +1503,8 @@ class TaskCreateDb(StateTransitionTask):
# was: self.execWtSql(wt, "create database db") # was: self.execWtSql(wt, "create database db")
repStr = "" repStr = ""
if gConfig.max_replicas != 1: if gConfig.max_replicas != 1:
numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
numReplica = gConfig.max_replicas # fixed, always
repStr = "replica {}".format(numReplica) repStr = "replica {}".format(numReplica)
self.execWtSql(wt, "create database {} {}" self.execWtSql(wt, "create database {} {}"
.format(self._db.getName(), repStr) ) .format(self._db.getName(), repStr) )
...@@ -2050,7 +2056,7 @@ class ClientManager: ...@@ -2050,7 +2056,7 @@ class ClientManager:
class MainExec: class MainExec:
def __init__(self): def __init__(self):
self._clientMgr = None self._clientMgr = None
self._svcMgr = None self._svcMgr = None # type: ServiceManager
signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler)
...@@ -2063,17 +2069,16 @@ class MainExec: ...@@ -2063,17 +2069,16 @@ class MainExec:
self._svcMgr.sigUsrHandler(signalNumber, frame) self._svcMgr.sigUsrHandler(signalNumber, frame)
def sigIntHandler(self, signalNumber, frame): def sigIntHandler(self, signalNumber, frame):
if self._svcMgr: if self._svcMgr:
self._svcMgr.sigIntHandler(signalNumber, frame) self._svcMgr.sigIntHandler(signalNumber, frame)
if self._clientMgr: if self._clientMgr:
self._clientMgr.sigIntHandler(signalNumber, frame) self._clientMgr.sigIntHandler(signalNumber, frame)
def runClient(self): def runClient(self):
global gSvcMgr global gSvcMgr
if gConfig.auto_start_service: if gConfig.auto_start_service:
self._svcMgr = ServiceManager() gSvcMgr = self._svcMgr = ServiceManager() # hack alert
gSvcMgr = self._svcMgr # hack alert gSvcMgr.startTaosService() # we start, don't run
self._svcMgr.startTaosService() # we start, don't run
self._clientMgr = ClientManager() self._clientMgr = ClientManager()
ret = None ret = None
...@@ -2086,12 +2091,10 @@ class MainExec: ...@@ -2086,12 +2091,10 @@ class MainExec:
def runService(self): def runService(self):
global gSvcMgr global gSvcMgr
self._svcMgr = ServiceManager() gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert
self._svcMgr.run() # run to some end state gSvcMgr.run() # run to some end state
self._svcMgr = None gSvcMgr = self._svcMgr = None
gSvcMgr = None
def init(self): # TODO: refactor def init(self): # TODO: refactor
global gContainer global gContainer
...@@ -2165,6 +2168,13 @@ class MainExec: ...@@ -2165,6 +2168,13 @@ class MainExec:
'--dynamic-db-table-names', '--dynamic-db-table-names',
action='store_true', action='store_true',
help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)') help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)')
parser.add_argument(
'-o',
'--num-dnodes',
action='store',
default=1,
type=int,
help='Number of Dnodes to initialize, used with -e option. (default: 1)')
parser.add_argument( parser.add_argument(
'-p', '-p',
'--per-thread-db-connection', '--per-thread-db-connection',
...@@ -2209,7 +2219,12 @@ class MainExec: ...@@ -2209,7 +2219,12 @@ class MainExec:
def run(self): def run(self):
if gConfig.run_tdengine: # run server if gConfig.run_tdengine: # run server
self.runService() try:
self.runService()
return 0 # success
except ConnectionError as err:
Logging.error("Failed to make DB connection, please check DB instance manually")
return -1 # failure
else: else:
return self.runClient() return self.runClient()
......
...@@ -12,7 +12,9 @@ from util.cases import * ...@@ -12,7 +12,9 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.log import * from util.log import *
from .misc import Logging, CrashGenError, Helper from .misc import Logging, CrashGenError, Helper, Dice
import os
import datetime
# from .service_manager import TdeInstance # from .service_manager import TdeInstance
class DbConn: class DbConn:
...@@ -44,6 +46,9 @@ class DbConn: ...@@ -44,6 +46,9 @@ class DbConn:
self._lastSql = None self._lastSql = None
self._dbTarget = dbTarget self._dbTarget = dbTarget
def __repr__(self):
return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
def getLastSql(self): def getLastSql(self):
return self._lastSql return self._lastSql
...@@ -54,7 +59,7 @@ class DbConn: ...@@ -54,7 +59,7 @@ class DbConn:
# below implemented by child classes # below implemented by child classes
self.openByType() self.openByType()
Logging.debug("[DB] data connection opened, type = {}".format(self._type)) Logging.debug("[DB] data connection opened: {}".format(self))
self.isOpen = True self.isOpen = True
def close(self): def close(self):
...@@ -277,15 +282,18 @@ class DbTarget: ...@@ -277,15 +282,18 @@ class DbTarget:
self.cfgPath = cfgPath self.cfgPath = cfgPath
self.hostAddr = hostAddr self.hostAddr = hostAddr
self.port = port self.port = port
def __repr__(self): def __repr__(self):
return "[DbTarget: cfgPath={}, host={}:{}]".format( return "[DbTarget: cfgPath={}, host={}:{}]".format(
self.cfgPath, self.hostAddr, self.port) Helper.getFriendlyPath(self.cfgPath), self.hostAddr, self.port)
def getEp(self):
return "{}:{}".format(self.hostAddr, self.port)
class DbConnNative(DbConn): class DbConnNative(DbConn):
# Class variables # Class variables
_lock = threading.Lock() _lock = threading.Lock()
_connInfoDisplayed = False # _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private totalConnections = 0 # Not private
def __init__(self, dbTarget): def __init__(self, dbTarget):
...@@ -304,9 +312,9 @@ class DbConnNative(DbConn): ...@@ -304,9 +312,9 @@ class DbConnNative(DbConn):
cls = self.__class__ # Get the class, to access class variables cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
dbTarget = self._dbTarget dbTarget = self._dbTarget
if not cls._connInfoDisplayed: # if not cls._connInfoDisplayed:
cls._connInfoDisplayed = True # updating CLASS variable # cls._connInfoDisplayed = True # updating CLASS variable
Logging.info("Initiating TAOS native connection to {}".format(dbTarget)) Logging.debug("Initiating TAOS native connection to {}".format(dbTarget))
# Make the connection # Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable # self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor() # self._cursor = self._conn.cursor()
...@@ -424,3 +432,4 @@ class DbManager(): ...@@ -424,3 +432,4 @@ class DbManager():
def cleanUp(self): def cleanUp(self):
self._dbConn.close() self._dbConn.close()
import threading import threading
import random import random
import logging import logging
import os
class CrashGenError(Exception): class CrashGenError(Exception):
...@@ -26,7 +27,7 @@ class LoggingFilter(logging.Filter): ...@@ -26,7 +27,7 @@ class LoggingFilter(logging.Filter):
class MyLoggingAdapter(logging.LoggerAdapter): class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs): def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
...@@ -71,12 +72,44 @@ class Logging: ...@@ -71,12 +72,44 @@ class Logging:
def warning(cls, msg): def warning(cls, msg):
cls.logger.warning(msg) cls.logger.warning(msg)
@classmethod
def error(cls, msg):
cls.logger.error(msg)
class Status: class Status:
STATUS_STARTING = 1 STATUS_STARTING = 1
STATUS_RUNNING = 2 STATUS_RUNNING = 2
STATUS_STOPPING = 3 STATUS_STOPPING = 3
STATUS_STOPPED = 4 STATUS_STOPPED = 4
def __init__(self, status):
self.set(status)
def __repr__(self):
return "[Status: v={}]".format(self._status)
def set(self, status):
self._status = status
def get(self):
return self._status
def isStarting(self):
return self._status == Status.STATUS_STARTING
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == Status.STATUS_RUNNING
def isStopping(self):
return self._status == Status.STATUS_STOPPING
def isStopped(self):
return self._status == Status.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Deterministic random number generator # Deterministic random number generator
class Dice(): class Dice():
seeded = False # static, uninitialized seeded = False # static, uninitialized
...@@ -118,14 +151,23 @@ class Helper: ...@@ -118,14 +151,23 @@ class Helper:
def convertErrno(cls, errno): def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno return errno if (errno > 0) else 0x80000000 + errno
@classmethod
def getFriendlyPath(cls, path): # returns .../xxx/yyy
ht1 = os.path.split(path)
ht2 = os.path.split(ht1[0])
return ".../" + ht2[1] + '/' + ht1[1]
class Progress: class Progress:
STEP_BOUNDARY = 0 STEP_BOUNDARY = 0
BEGIN_THREAD_STEP = 1 BEGIN_THREAD_STEP = 1
END_THREAD_STEP = 2 END_THREAD_STEP = 2
SERVICE_HEART_BEAT= 3
tokens = { tokens = {
STEP_BOUNDARY: '.', STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[', BEGIN_THREAD_STEP: '[',
END_THREAD_STEP: '] ' END_THREAD_STEP: '] ',
SERVICE_HEART_BEAT: '.Y.'
} }
@classmethod @classmethod
......
...@@ -7,7 +7,7 @@ import logging ...@@ -7,7 +7,7 @@ import logging
import time import time
import subprocess import subprocess
from typing import IO from typing import IO, List
try: try:
import psutil import psutil
...@@ -17,7 +17,7 @@ except: ...@@ -17,7 +17,7 @@ except:
from queue import Queue, Empty from queue import Queue, Empty
from .misc import Logging, Status, CrashGenError, Dice from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from .db import DbConn, DbTarget from .db import DbConn, DbTarget
class TdeInstance(): class TdeInstance():
...@@ -47,12 +47,15 @@ class TdeInstance(): ...@@ -47,12 +47,15 @@ class TdeInstance():
.format(selfPath, projPath)) .format(selfPath, projPath))
return buildPath return buildPath
def __init__(self, subdir='test', port=6030, fepPort=6030): def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
self._buildDir = self._getBuildPath() self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/" self._subdir = '/' + subdir # TODO: tolerate "/"
self._port = port # TODO: support different IP address too self._port = port # TODO: support different IP address too
self._fepPort = fepPort self._fepPort = fepPort
self._tInstNum = tInstNum
self._smThread = ServiceManagerThread()
def getDbTarget(self): def getDbTarget(self):
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port) return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
...@@ -60,7 +63,8 @@ class TdeInstance(): ...@@ -60,7 +63,8 @@ class TdeInstance():
return self._port return self._port
def __repr__(self): def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir) return "[TdeInstance: {}, subdir={}]".format(
self._buildDir, Helper.getFriendlyPath(self._subdir))
def generateCfgFile(self): def generateCfgFile(self):
# print("Logger = {}".format(logger)) # print("Logger = {}".format(logger))
...@@ -146,8 +150,52 @@ walLevel 1 ...@@ -146,8 +150,52 @@ walLevel 1
def getHostAddr(self): def getHostAddr(self):
return "127.0.0.1" return "127.0.0.1"
def getServiceCommand(self): # to start the instance def getServiceCmdLine(self): # to start the instance
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def _getDnodes(self, dbc):
dbc.query("show dnodes")
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'}
def createDnode(self, dbt: DbTarget):
"""
With a connection to the "first" EP, let's create a dnode for someone else who
wants to join.
"""
dbc = DbConn.createNative(self.getDbTarget())
dbc.open()
if dbt.getEp() in self._getDnodes(dbc):
Logging.info("Skipping DNode creation for: {}".format(dbt))
dbc.close()
return
sql = "CREATE DNODE \"{}\"".format(dbt.getEp())
dbc.execute(sql)
dbc.close()
def getStatus(self):
return self._smThread.getStatus()
def getSmThread(self):
return self._smThread
def start(self):
if not self.getStatus().isStopped():
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
Logging.info("Starting TDengine instance: {}".format(self))
self.generateCfgFile() # service side generates config file, client does not
self.rotateLogs()
self._smThread.start(self.getServiceCmdLine())
def stop(self):
self._smThread.stop()
def isFirst(self):
return self._tInstNum == 0
class TdeSubProcess: class TdeSubProcess:
...@@ -159,11 +207,15 @@ class TdeSubProcess: ...@@ -159,11 +207,15 @@ class TdeSubProcess:
"a sub process runs an instance". "a sub process runs an instance".
""" """
def __init__(self, tInst : TdeInstance): # RET_ALREADY_STOPPED = -1
# RET_TIME_OUT = -3
# RET_SUCCESS = -4
def __init__(self):
self.subProcess = None self.subProcess = None
if tInst is None: # if tInst is None:
raise CrashGenError("Empty instance not allowed in TdeSubProcess") # raise CrashGenError("Empty instance not allowed in TdeSubProcess")
self._tInst = tInst # Default create at ServiceManagerThread # self._tInst = tInst # Default create at ServiceManagerThread
def getStdOut(self): def getStdOut(self):
return self.subProcess.stdout return self.subProcess.stdout
...@@ -177,38 +229,15 @@ class TdeSubProcess: ...@@ -177,38 +229,15 @@ class TdeSubProcess:
def getPid(self): def getPid(self):
return self.subProcess.pid return self.subProcess.pid
# Repalced by TdeInstance class def start(self, cmdLine):
# def getBuildPath(self):
# selfPath = os.path.dirname(os.path.realpath(__file__))
# if ("community" in selfPath):
# projPath = selfPath[:selfPath.find("communit")]
# else:
# projPath = selfPath[:selfPath.find("tests")]
# for root, dirs, files in os.walk(projPath):
# if ("taosd" in files):
# rootRealPath = os.path.dirname(os.path.realpath(root))
# if ("packaging" not in rootRealPath):
# buildPath = root[:len(root) - len("/build/bin")]
# break
# return buildPath
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check # Sanity check
if self.subProcess: # already there if self.subProcess: # already there
raise RuntimeError("Corrupt process state") raise RuntimeError("Corrupt process state")
# global gContainer
# tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance
self._tInst.generateCfgFile() # service side generates config file, client does not
self._tInst.rotateLogs()
print("Starting TDengine instance: {}".format(self._tInst))
self.subProcess = subprocess.Popen( self.subProcess = subprocess.Popen(
self._tInst.getServiceCommand(), cmdLine,
shell=False, shell=False,
# svcCmdSingle, shell=True, # capture core dump? # svcCmdSingle, shell=True, # capture core dump?
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -218,31 +247,50 @@ class TdeSubProcess: ...@@ -218,31 +247,50 @@ class TdeSubProcess:
) # had text=True, which interferred with reading EOF ) # had text=True, which interferred with reading EOF
def stop(self): def stop(self):
"""
Stop a sub process, and try to return a meaningful return code.
Common POSIX signal values (from man -7 signal):
SIGHUP 1
SIGINT 2
SIGQUIT 3
SIGILL 4
SIGTRAP 5
SIGABRT 6
SIGIOT 6
SIGBUS 7
SIGEMT -
SIGFPE 8
SIGKILL 9
SIGUSR1 10
SIGSEGV 11
SIGUSR2 12
"""
if not self.subProcess: if not self.subProcess:
print("Sub process already stopped") print("Sub process already stopped")
return -1 return # -1
retCode = self.subProcess.poll() # contains real sub process return code retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
if retCode: # valid return code, process ended if retCode: # valid return code, process ended
retCode = -retCode # only if valid
Logging.warning("TSP.stop(): process ended itself")
self.subProcess = None self.subProcess = None
else: # process still alive, let's interrupt it return retCode
print(
"Sub process is running, sending SIG_INT and waiting for it to terminate...") # process still alive, let's interrupt it
# sub process should end, then IPC queue should end, causing IO print("Terminate running process, send SIG_INT and wait...")
# thread to end # sub process should end, then IPC queue should end, causing IO thread to end
self.subProcess.send_signal(signal.SIGINT) self.subProcess.send_signal(signal.SIGINT)
try: self.subProcess.wait(20)
self.subProcess.wait(10) retCode = self.subProcess.returncode # should always be there
retCode = self.subProcess.returncode # May throw subprocess.TimeoutExpired exception above, therefore
except subprocess.TimeoutExpired as err: # The process is guranteed to have ended by now
print("Time out waiting for TDengine service process to exit") self.subProcess = None
retCode = -3 if retCode != 0: # != (- signal.SIGINT):
else: Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
print("TDengine service process terminated successfully from SIG_INT") else:
retCode = -4 Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
self.subProcess = None return - retCode
return retCode
class ServiceManager: class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
...@@ -259,19 +307,25 @@ class ServiceManager: ...@@ -259,19 +307,25 @@ class ServiceManager:
# self._status = MainExec.STATUS_RUNNING # set inside # self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService() # _startTaosService()
self._runCluster = (numDnodes >= 1) self._runCluster = (numDnodes >= 1)
self.svcMgrThreads = [] # type: List[ServiceManagerThread] self._tInsts : List[TdeInstance] = []
for i in range(0, numDnodes): for i in range(0, numDnodes):
self.svcMgrThreads.append(ServiceManagerThread(i)) ti = self._createTdeInstance(i) # construct tInst
self._tInsts.append(ti)
# self.svcMgrThreads : List[ServiceManagerThread] = []
# for i in range(0, numDnodes):
# thread = self._createThread(i) # construct tInst
# self.svcMgrThreads.append(thread)
def _createThread(self, dnIndex): def _createTdeInstance(self, dnIndex):
if not self._runCluster: # single instance # if not self._runCluster: # single instance
return ServiceManagerThread(0) # return ServiceManagerThread(0)
# Create all threads in a cluster # Create all threads in a cluster
subdir = 'cluster_dnode_{}'.format(dnIndex) subdir = 'cluster_dnode_{}'.format(dnIndex)
fepPort= 6030 # firstEP Port fepPort= 6030 # firstEP Port
port = fepPort + dnIndex * 100 port = fepPort + dnIndex * 100
ti = TdeInstance(subdir, port, fepPort) return TdeInstance(subdir, dnIndex, port, fepPort)
return ServiceManagerThread(dnIndex, ti) # return ServiceManagerThread(dnIndex, ti)
def _doMenu(self): def _doMenu(self):
choice = "" choice = ""
...@@ -336,8 +390,8 @@ class ServiceManager: ...@@ -336,8 +390,8 @@ class ServiceManager:
Determine if the service/cluster is active at all, i.e. at least Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped". one thread is not "stopped".
""" """
for thread in self.svcMgrThreads: for ti in self._tInsts:
if not thread.isStopped(): if not ti.getStatus().isStopped():
return True return True
return False return False
...@@ -356,28 +410,31 @@ class ServiceManager: ...@@ -356,28 +410,31 @@ class ServiceManager:
Determine if the service/cluster is "stable", i.e. all of the Determine if the service/cluster is "stable", i.e. all of the
threads are in "stable" status. threads are in "stable" status.
""" """
for thread in self.svcMgrThreads: for ti in self._tInsts:
if not thread.isStable(): if not ti.isStable():
return False return False
return True return True
def _procIpcAll(self): def _procIpcAll(self):
while self.isActive(): while self.isActive():
for thread in self.svcMgrThreads: # all thread objects should always be valid Progress.emit(Progress.SERVICE_HEART_BEAT)
for ti in self._tInsts: # all thread objects should always be valid
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here # while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
if thread.isRunning(): status = ti.getStatus()
thread.procIpcBatch() # regular processing, if status.isRunning():
if thread.isStopped(): th = ti.getSmThread()
thread.procIpcBatch() # one last time? th.procIpcBatch() # regular processing,
if status.isStopped():
th.procIpcBatch() # one last time?
# self._updateThreadStatus() # self._updateThreadStatus()
elif thread.isRetarting():
print("Service restarting...")
# else this thread is stopped
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy") # raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...") print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def _getFirstInstance(self):
return self._tInsts[0]
def startTaosServices(self): def startTaosServices(self):
with self._lock: with self._lock:
if self.isActive(): if self.isActive():
...@@ -386,15 +443,19 @@ class ServiceManager: ...@@ -386,15 +443,19 @@ class ServiceManager:
# Find if there's already a taosd service, and then kill it # Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter(): for proc in psutil.process_iter():
if proc.name() == 'taosd': if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe") print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
time.sleep(2.0) time.sleep(2.0)
proc.kill() proc.kill()
# print("Process: {}".format(proc.name())) # print("Process: {}".format(proc.name()))
# self.svcMgrThread = ServiceManagerThread() # create the object # self.svcMgrThread = ServiceManagerThread() # create the object
for thread in self.svcMgrThreads:
thread.start() for ti in self._tInsts:
thread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines ti.start()
if not ti.isFirst():
tFirst = self._getFirstInstance()
tFirst.createDnode(ti.getDbTarget())
ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self): def stopTaosServices(self):
with self._lock: with self._lock:
...@@ -402,8 +463,8 @@ class ServiceManager: ...@@ -402,8 +463,8 @@ class ServiceManager:
Logging.warning("Cannot stop TAOS service(s), already not active") Logging.warning("Cannot stop TAOS service(s), already not active")
return return
for thread in self.svcMgrThreads: for ti in self._tInsts:
thread.stop() ti.stop()
def run(self): def run(self):
self.startTaosServices() self.startTaosServices()
...@@ -412,7 +473,7 @@ class ServiceManager: ...@@ -412,7 +473,7 @@ class ServiceManager:
self.stopTaosServices() # should have started already self.stopTaosServices() # should have started already
def restart(self): def restart(self):
if not self.isStable(): if not self.getStatus().isStable():
Logging.warning("Cannot restart service/cluster, when not stable") Logging.warning("Cannot restart service/cluster, when not stable")
return return
...@@ -440,42 +501,27 @@ class ServiceManagerThread: ...@@ -440,42 +501,27 @@ class ServiceManagerThread:
""" """
MAX_QUEUE_SIZE = 10000 MAX_QUEUE_SIZE = 10000
def __init__(self, tInstNum = 0, tInst : TdeInstance = None): def __init__(self):
# Set the sub process # Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance # Arrange the TDengine instance
self._tInstNum = tInstNum # instance serial number in cluster, ZERO based # self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
self._tInst = tInst or TdeInstance() # Need an instance # self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None # The actual thread, # type: threading.Thread self._thread = None # The actual thread, # type: threading.Thread
self._status = Status.STATUS_STOPPED # The status of the underlying service, actually. self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
def __repr__(self): def __repr__(self):
return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum) return "[SvcMgrThread: status={}, subProc={}]".format(
self.getStatus(), self._tdeSubProcess)
def getStatus(self): def getStatus(self):
return self._status return self._status
def isStarting(self):
return self._status == Status.STATUS_STARTING
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == Status.STATUS_RUNNING
def isStopping(self):
return self._status == Status.STATUS_STOPPING
def isStopped(self):
return self._status == Status.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Start the thread (with sub process), and wait for the sub service # Start the thread (with sub process), and wait for the sub service
# to become fully operational # to become fully operational
def start(self): def start(self, cmdLine):
if self._thread: if self._thread:
raise RuntimeError("Unexpected _thread") raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess: if self._tdeSubProcess:
...@@ -483,9 +529,9 @@ class ServiceManagerThread: ...@@ -483,9 +529,9 @@ class ServiceManagerThread:
Logging.info("Attempting to start TAOS service: {}".format(self)) Logging.info("Attempting to start TAOS service: {}".format(self))
self._status = Status.STATUS_STARTING self._status.set(Status.STATUS_STARTING)
self._tdeSubProcess = TdeSubProcess(self._tInst) self._tdeSubProcess = TdeSubProcess()
self._tdeSubProcess.start() self._tdeSubProcess.start(cmdLine)
self._ipcQueue = Queue() self._ipcQueue = Queue()
self._thread = threading.Thread( # First thread captures server OUTPUT self._thread = threading.Thread( # First thread captures server OUTPUT
...@@ -505,10 +551,11 @@ class ServiceManagerThread: ...@@ -505,10 +551,11 @@ class ServiceManagerThread:
time.sleep(1.0) time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up # self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True) print("_zz_", end="", flush=True)
if self._status == Status.STATUS_RUNNING: if self._status.isRunning():
Logging.info("[] TDengine service READY to process requests") Logging.info("[] TDengine service READY to process requests")
Logging.info("[] TAOS service started: {}".format(self)) Logging.info("[] TAOS service started: {}".format(self))
self._verifyDnode(self._tInst) # query and ensure dnode is ready # self._verifyDnode(self._tInst) # query and ensure dnode is ready
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
return # now we've started return # now we've started
# TODO: handle failure-to-start better? # TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
...@@ -523,25 +570,27 @@ class ServiceManagerThread: ...@@ -523,25 +570,27 @@ class ServiceManagerThread:
# ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type # ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
isValid = False isValid = False
for col in cols: for col in cols:
print("col = {}".format(col)) # print("col = {}".format(col))
ep = col[1].split(':') # 10.1.30.2:6030 ep = col[1].split(':') # 10.1.30.2:6030
print("ep={}".format(ep)) print("Found ep={}".format(ep))
if tInst.getPort() == int(ep[1]): # That's us if tInst.getPort() == int(ep[1]): # That's us
print("Valid Dnode matched!") # print("Valid Dnode matched!")
isValid = True # now we are valid isValid = True # now we are valid
break break
if not isValid: if not isValid:
raise RuntimeError("Failed to start Dnode, port = {}, expected: {}". print("Failed to start dnode, sleep for a while")
format(ep[1], tInst.getPort())) time.sleep(600)
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
format(tInst.getPort()))
dbc.close() dbc.close()
def stop(self): def stop(self):
# can be called from both main thread or signal handler # can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...") print("Terminating TDengine service running as the sub process...")
if self.isStopped(): if self.getStatus().isStopped():
print("Service already stopped") print("Service already stopped")
return return
if self.isStopping(): if self.getStatus().isStopping():
print("Service is already being stopped") print("Service is already being stopped")
return return
# Linux will send Control-C generated SIGINT to the TDengine process # Linux will send Control-C generated SIGINT to the TDengine process
...@@ -550,39 +599,42 @@ class ServiceManagerThread: ...@@ -550,39 +599,42 @@ class ServiceManagerThread:
if not self._tdeSubProcess: if not self._tdeSubProcess:
raise RuntimeError("sub process object missing") raise RuntimeError("sub process object missing")
self._status = Status.STATUS_STOPPING self._status.set(Status.STATUS_STOPPING)
retCode = self._tdeSubProcess.stop() # retCode = self._tdeSubProcess.stop()
print("Attempted to stop sub process, got return code: {}".format(retCode)) try:
if (retCode==-11): # SGV retCode = self._tdeSubProcess.stop()
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)") # print("Attempted to stop sub process, got return code: {}".format(retCode))
if retCode == signal.SIGSEGV : # SGV
if self._tdeSubProcess.isRunning(): # still running Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
print("FAILED to stop sub process, it is still running... pid = {}".format( except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
else:
if self._tdeSubProcess.isRunning(): # still running, should now never happen
print("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid())) self._tdeSubProcess.getPid()))
else: else:
self._tdeSubProcess = None # not running any more self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc. self.join() # stop the thread, change the status, etc.
# Check if it's really stopped # Check if it's really stopped
outputLines = 20 # for last output outputLines = 10 # for last output
if self.isStopped(): if self.getStatus().isStopped():
self.procIpcBatch(outputLines) # one last time self.procIpcBatch(outputLines) # one last time
print("End of TDengine Service Output: {}".format(self)) Logging.debug("End of TDengine Service Output: {}".format(self))
print("----- TDengine Service (managed by SMT) is now terminated -----\n") Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
else: else:
print("WARNING: SMT did not terminate as expected: {}".format(self)) print("WARNING: SMT did not terminate as expected: {}".format(self))
def join(self): def join(self):
# TODO: sanity check # TODO: sanity check
if not self.isStopping(): if not self.getStatus().isStopping():
raise RuntimeError( raise RuntimeError(
"Unexpected status when ending svc mgr thread: {}".format( "SMT.Join(): Unexpected status: {}".format(self._status))
self._status))
if self._thread: if self._thread:
self._thread.join() self._thread.join()
self._thread = None self._thread = None
self._status = Status.STATUS_STOPPED self._status.set(Status.STATUS_STOPPED)
# STD ERR thread # STD ERR thread
self._thread2.join() self._thread2.join()
self._thread2 = None self._thread2 = None
...@@ -651,25 +703,27 @@ class ServiceManagerThread: ...@@ -651,25 +703,27 @@ class ServiceManagerThread:
queue.put(line) queue.put(line)
self._printProgress("_i") self._printProgress("_i")
if self._status == Status.STATUS_STARTING: # we are starting, let's see if we have started if self._status.isStarting(): # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found if line.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY") Logging.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service instance #{} is now FULLY READY".format(self._tInstNum)) Logging.info("Service is now FULLY READY") # TODO: more ID info here?
self._status = Status.STATUS_RUNNING self._status.set(Status.STATUS_RUNNING)
# Trim the queue if necessary: TODO: try this 1 out of 10 times # Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping(): # TODO: use thread status instead if self._status.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu # WAITING for stopping sub process to finish its outptu
print("_w", end="", flush=True) print("_w", end="", flush=True)
# queue.put(line) # queue.put(line)
# meaning sub process must have died # meaning sub process must have died
print("\nNo more output from IO thread managing TDengine service") Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self))
out.close() out.close()
def svcErrorReader(self, err: IO, queue): def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''): for line in iter(err.readline, b''):
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line)) print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self))
err.close()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册