未验证 提交 8e389188 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2570 from taosdata/feature/crash_gen

Feature/crash gen
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
import threading
# querySeqNum = 0
......@@ -37,6 +38,7 @@ class TDengineCursor(object):
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None:
self._connection = connection
......@@ -103,6 +105,12 @@ class TDengineCursor(object):
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
# if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if not operation:
return None
......@@ -188,6 +196,11 @@ class TDengineCursor(object):
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
# if threading.get_ident() != self._threadId:
# info ="[WARNING] Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
......@@ -232,6 +245,12 @@ class TDengineCursor(object):
def _handle_result(self):
"""Handle the return result from query.
"""
# if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
self._description = []
for ele in self._fields:
self._description.append(
......
#!/usr/bin/python3.7
#-----!/usr/bin/python3.7
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
......@@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h
import sys
import os
import io
import signal
import traceback
# Require Python 3
if sys.version_info[0] < 3:
......@@ -23,6 +25,7 @@ if sys.version_info[0] < 3:
import getopt
import argparse
import copy
import requests
import threading
import random
......@@ -30,10 +33,14 @@ import time
import logging
import datetime
import textwrap
import requests
from requests.auth import HTTPBasicAuth
from typing import List
from typing import Dict
from typing import Set
from typing import IO
from queue import Queue, Empty
from util.log import *
from util.dnodes import *
......@@ -76,7 +83,10 @@ class WorkerThread:
# Let us have a DB connection of our own
if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn = DbConn()
# print("connector_type = {}".format(gConfig.connector_type))
self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest()
self._dbInUse = False # if "use db" was executed already
def logDebug(self, msg):
logger.debug(" TRD[{}] {}".format(self._tid, msg))
......@@ -84,7 +94,14 @@ class WorkerThread:
def logInfo(self, msg):
logger.info(" TRD[{}] {}".format(self._tid, msg))
def dbInUse(self):
return self._dbInUse
def useDb(self):
if ( not self._dbInUse ):
self.execSql("use db")
self._dbInUse = True
def getTaskExecutor(self):
return self._tc.getTaskExecutor()
......@@ -97,6 +114,7 @@ class WorkerThread:
logger.info("Starting to run thread: {}".format(self._tid))
if ( gConfig.per_thread_db_connection ): # type: ignore
logger.debug("Worker thread openning database connection")
self._dbConn.open()
self._doTaskLoop()
......@@ -118,12 +136,17 @@ class WorkerThread:
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Fetch a task from the Thread Coordinator
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
task = tc.fetchTask()
# Execute such a task
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
task.execute(self)
tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
self._dbInUse = False # there may be changes between steps
def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ):
......@@ -163,6 +186,18 @@ class WorkerThread:
else:
return self._tc.getDbManager().getDbConn().execute(sql)
def querySql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
return self._dbConn.query(sql)
else:
return self._tc.getDbManager().getDbConn().query(sql)
def getQueryResult(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn.getQueryResult()
else:
return self._tc.getDbManager().getDbConn().getQueryResult()
def getDbConn(self):
if ( gConfig.per_thread_db_connection ):
return self._dbConn
......@@ -175,8 +210,9 @@ class WorkerThread:
# else:
# return self._tc.getDbState().getDbConn().query(sql)
# The coordinator of all worker threads, mostly running in main thread
class ThreadCoordinator:
def __init__(self, pool, dbManager):
def __init__(self, pool: ThreadPool, dbManager):
self._curStep = -1 # first step is 0
self._pool = pool
# self._wd = wd
......@@ -187,6 +223,7 @@ class ThreadCoordinator:
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats()
self._runStatus = MainExec.STATUS_RUNNING
def getTaskExecutor(self):
return self._te
......@@ -197,6 +234,10 @@ class ThreadCoordinator:
def crossStepBarrier(self):
self._stepBarrier.wait()
def requestToStop(self):
self._runStatus = MainExec.STATUS_STOPPING
self._execStats.registerFailure("User Interruption")
def run(self):
self._pool.createAndStartThreads(self)
......@@ -204,48 +245,73 @@ class ThreadCoordinator:
self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore
self._execStats.startExec() # start the stop watch
failed = False
while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9
transitionFailed = False
hasAbortedTask = False
while(self._curStep < maxSteps-1 and
(not transitionFailed) and
(self._runStatus==MainExec.STATUS_RUNNING) and
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
if not gConfig.debug:
print(".", end="", flush=True) # print this only if we are not in debug mode
logger.debug("[TRD] Main thread going to sleep")
# Now ready to enter a step
# Now main thread (that's us) is ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
try:
self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed")
traceback.print_stack()
failed = True
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
else:
raise
finally:
pass
# We use this period to do house keeping work, when all worker threads are QUIET.
hasAbortedTask = False
for task in self._executedTasks :
if task.isAborted() :
print("Task aborted: {}".format(task))
hasAbortedTask = True
break
if hasAbortedTask : # do transition only if tasks are error free
self._execStats.registerFailure("Aborted Task Encountered")
else:
try:
sm = self._dbManager.getStateMachine()
logger.debug("[STT] starting transitions")
sm.transition(self._executedTasks) # at end of step, transiton the DB state
logger.debug("[STT] transition ended")
# Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
if sm.hasDatabase() :
for t in self._pool.threadList:
logger.debug("[DB] use db for all worker threads")
t.useDb()
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
logger.info("DB connection broken, execution failed")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
else:
raise
# finally:
# pass
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step
logger.debug("<-- Step {} finished".format(self._curStep))
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
# A new TE for the new step
if not failed: # only if not failed
if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep)
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads()
self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task
logger.debug("Main thread ready to finish up...")
if not failed: # only in regular situations
if not transitionFailed: # only in regular situations
self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...")
......@@ -255,11 +321,17 @@ class ThreadCoordinator:
logger.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish
logger.info("All worker thread finished")
logger.info("\nAll worker threads finished")
self._execStats.endExec()
def logStats(self):
self._execStats.logStats()
def printStats(self):
self._execStats.printStats()
def isFailed(self):
return self._execStats.isFailed()
def getExecStats(self):
return self._execStats
def tapAllThreads(self): # in a deterministic manner
wakeSeq = []
......@@ -268,7 +340,7 @@ class ThreadCoordinator:
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value
for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
......@@ -306,7 +378,7 @@ class ThreadPool:
self.maxSteps = maxSteps
# Internal class variables
self.curStep = 0
self.threadList = []
self.threadList = [] # type: List[WorkerThread]
# starting to run all the threads, in locking steps
def createAndStartThreads(self, tc: ThreadCoordinator):
......@@ -397,26 +469,39 @@ class LinearQueue():
return ret
class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
@classmethod
def create(cls, connType):
if connType == cls.TYPE_NATIVE:
return DbConnNative()
elif connType == cls.TYPE_REST:
return DbConnRest()
else:
raise RuntimeError("Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls):
return cls.create(cls.TYPE_NATIVE)
@classmethod
def createRest(cls):
return cls.create(cls.TYPE_REST)
def __init__(self):
self._conn = None
self._cursor = None
self.isOpen = False
def open(self): # Open connection
self._type = self.TYPE_INVALID
def open(self):
if ( self.isOpen ):
raise RuntimeError("Cannot re-open an existing DB connection")
cfgPath = "../../build/test/cfg"
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# below implemented by child classes
self.openByType()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # note we do this in _findCurrenState
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
logger.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True
def resetDb(self): # reset the whole database, etc.
......@@ -424,17 +509,128 @@ class DbConn:
raise RuntimeError("Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc.
self._cursor.execute('drop database if exists db')
self.execute('drop database if exists db')
logger.debug("Resetting DB, dropped database")
# self._cursor.execute('create database db')
# self._cursor.execute('use db')
# tdSql.execute('show databases')
def queryScalar(self, sql) -> int :
return self._queryAny(sql)
def queryString(self, sql) -> str :
return self._queryAny(sql)
def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def execute(self, sql):
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
def __init__(self):
super().__init__()
self._type = self.TYPE_REST
self._url = "http://localhost:6020/rest/sql" # fixed for now
self._result = None
def openByType(self): # Open connection
pass # do nothing, always open
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
logger.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
rj = r.json()
# Sanity check for the "Json Result"
if (not 'status' in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if (not 'code' in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError("Unexpected REST return status: {}".format(rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql) : # return rows affected
return self.execute(sql)
def getQueryResult(self):
return self._result['data']
def getResultRows(self):
print(self._result)
raise RuntimeError("TBD")
# return self._tdSql.queryRows
def getResultCols(self):
print(self._result)
raise RuntimeError("TBD")
class DbConnNative(DbConn):
def __init__(self):
super().__init__()
self._type = self.TYPE_REST
self._conn = None
self._cursor = None
def openByType(self): # Open connection
cfgPath = "../../build/test/cfg"
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every step
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close()
logger.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
......@@ -450,29 +646,19 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
tSql = self._tdSql
nRows = tSql.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if tSql.queryRows != 1 or tSql.queryCols != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return tSql.queryResult[0][0]
def getResultRows(self):
return self._tdSql.queryRows
def queryScalar(self, sql) -> int :
return self._queryAny(sql)
def getResultCols(self):
return self._tdSql.queryCols
def queryString(self, sql) -> str :
return self._queryAny(sql)
class AnyState:
STATE_INVALID = -1
......@@ -620,10 +806,10 @@ class StateDbOnly(AnyState):
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
# self.assertAtMostOneSuccess(tasks, DropDbTask)
# self._state = self.STATE_EMPTY
if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
if ( not self.hasTask(tasks, TaskDropSuperTable) ):
self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
# # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
# if ( not self.hasTask(tasks, TaskDropSuperTable) ):
# self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
# self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
......@@ -648,7 +834,9 @@ class StateSuperTableOnly(AnyState):
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
#self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it
# self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
......@@ -680,18 +868,19 @@ class StateHasData(AnyState):
self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
self.hasSuccess(tasks, TaskDropSuperTable)
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, TaskDropDb)
self.assertNoTask(tasks, TaskDropSuperTable)
self.assertNoTask(tasks, TaskAddData)
# elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
# self.assertNoTask(tasks, TaskDropDb)
# self.assertNoTask(tasks, TaskDropSuperTable)
# self.assertNoTask(tasks, TaskAddData)
# self.hasSuccess(tasks, DeleteDataTasks)
else: # should be STATE_HAS_DATA
self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table
self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
class StateMechine :
class StateMechine:
def __init__(self, dbConn):
self._dbConn = dbConn
self._curState = self._findCurrentState() # starting state
......@@ -700,8 +889,17 @@ class StateMechine :
def getCurrentState(self):
return self._curState
def hasDatabase(self):
return self._curState.canDropDb() # ha, can drop DB means it has one
# May be slow, use cautionsly...
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
def typesToStrings(types):
ss = []
for t in types:
ss.append(t.__name__)
return ss
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
firstTaskTypes = []
for tc in allTaskClasses:
......@@ -720,7 +918,7 @@ class StateMechine :
if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._curState))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))
return taskTypes
def _findCurrentState(self):
......@@ -730,7 +928,7 @@ class StateMechine :
# logger.debug("Found EMPTY state")
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
return StateEmpty()
dbc.execute("use db") # did not do this when openning connection
dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
if dbc.query("show tables") == 0 : # no tables
# logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
......@@ -746,6 +944,7 @@ class StateMechine :
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
......@@ -807,14 +1006,14 @@ class DbManager():
self._lock = threading.RLock()
# self.openDbServerConnection()
self._dbConn = DbConn()
self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if ( err.msg == 'client disconnected' ): # cannot open DB connection
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit()
sys.exit(2)
else:
raise
except:
......@@ -829,7 +1028,7 @@ class DbManager():
def getDbConn(self):
return self._dbConn
def getStateMachine(self):
def getStateMachine(self) -> StateMechine :
return self._stateMachine
# def getState(self):
......@@ -869,8 +1068,11 @@ class DbManager():
def getNextTick(self):
with self._lock: # prevent duplicate tick
self._lastTick += datetime.timedelta(0, 1) # add one second to it
return self._lastTick
if Dice.throw(10) == 0 : # 1 in 10 chance
return self._lastTick + datetime.timedelta(0, -100)
else: # regular
self._lastTick += datetime.timedelta(0, 1) # add one second to it
return self._lastTick
def getNextInt(self):
with self._lock:
......@@ -894,15 +1096,60 @@ class DbManager():
self._dbConn.close()
class TaskExecutor():
class BoundedList:
def __init__(self, size = 10):
self._size = size
self._list = []
def add(self, n: int) :
if not self._list: # empty
self._list.append(n)
return
# now we should insert
nItems = len(self._list)
insPos = 0
for i in range(nItems):
insPos = i
if n <= self._list[i] : # smaller than this item, time to insert
break # found the insertion point
insPos += 1 # insert to the right
if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item
return # do nothing
# print("Inserting at postion {}, value: {}".format(insPos, n))
self._list.insert(insPos, n) # insert
newLen = len(self._list)
if newLen <= self._size :
return # do nothing
elif newLen == (self._size + 1) :
del self._list[0] # remove the first item
else :
raise RuntimeError("Corrupt Bounded List")
def __str__(self):
return repr(self._list)
_boundedList = BoundedList()
def __init__(self, curStep):
self._curStep = curStep
@classmethod
def getBoundedList(cls):
return cls._boundedList
def getCurStep(self):
return self._curStep
def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
task.execute(wt)
def recordDataMark(self, n: int):
# print("[{}]".format(n), end="", flush=True)
self._boundedList.add(n)
# def logInfo(self, msg):
# logger.info(" T[{}.x]: ".format(self._curStep) + msg)
......@@ -922,6 +1169,7 @@ class Task():
self._dbManager = dbManager
self._workerThread = None
self._err = None
self._aborted = False
self._curStep = None
self._numRows = None # Number of rows affected
......@@ -930,10 +1178,14 @@ class Task():
# logger.debug("Creating new task {}...".format(self._taskNum))
self._execStats = execStats
self._lastSql = "" # last SQL executed/attempted
def isSuccess(self):
return self._err == None
def isAborted(self):
return self._aborted
def clone(self): # TODO: why do we need this again?
newTask = self.__class__(self._dbManager, self._execStats)
return newTask
......@@ -960,10 +1212,31 @@ class Task():
try:
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err))
self._err = err
except:
self.logDebug("[=] Unexpected exception")
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
1000 # REST catch-all error
]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
print("_", end="", flush=True)
self._err = err
else:
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
self.logDebug(errMsg)
if gConfig.debug :
raise # so that we see full stack
else: # non-debug
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"----------------------------\n")
# sys.exit(-1)
self._err = err
self._aborted = True
except Exception as e :
self.logInfo("Non-TAOS exception encountered")
self._err = e
self._aborted = True
traceback.print_exc()
except :
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
......@@ -971,8 +1244,21 @@ class Task():
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
def execSql(self, sql):
self._lastSql = sql
return self._dbManager.execute(sql)
def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.execSql(sql)
def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
self._lastSql = sql
return wt.querySql(sql)
def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
return wt.getQueryResult()
class ExecutionStats:
def __init__(self):
......@@ -987,6 +1273,12 @@ class ExecutionStats:
self._failed = False
self._failureReason = None
def __str__(self):
return "[ExecStats: _failed={}, _failureReason={}".format(self._failed, self._failureReason)
def isFailed(self):
return self._failed == True
def startExec(self):
self._execStartTime = time.time()
......@@ -1018,7 +1310,7 @@ class ExecutionStats:
self._failed = True
self._failureReason = reason
def logStats(self):
def printStats(self):
logger.info("----------------------------------------------------------------------")
logger.info("| Crash_Gen test {}, with the following stats:".
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
......@@ -1033,11 +1325,17 @@ class ExecutionStats:
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("----------------------------------------------------------------------")
class StateTransitionTask(Task):
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
@classmethod
def getInfo(cls): # each sub class should supply their own information
raise RuntimeError("Overriding method expected")
......@@ -1060,6 +1358,10 @@ class StateTransitionTask(Task):
# return state.getValue() in cls.getBeginStates()
raise RuntimeError("must be overriden")
@classmethod
def getRegTableName(cls, i):
return "db.reg_table_{}".format(i)
def execute(self, wt: WorkerThread):
super().execute(wt)
......@@ -1073,7 +1375,7 @@ class TaskCreateDb(StateTransitionTask):
return state.canCreateDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db")
self.execWtSql(wt, "create database db")
class TaskDropDb(StateTransitionTask):
@classmethod
......@@ -1085,7 +1387,7 @@ class TaskDropDb(StateTransitionTask):
return state.canDropDb()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db")
self.execWtSql(wt, "drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask):
......@@ -1098,8 +1400,13 @@ class TaskCreateSuperTable(StateTransitionTask):
return state.canCreateFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
if not wt.dbInUse(): # no DB yet, to the best of our knowledge
logger.debug("Skipping task, no DB yet")
return
tblName = self._dbManager.getFixedSuperTableName()
# wt.execSql("use db") # should always be in place
self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
......@@ -1114,16 +1421,16 @@ class TaskReadData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn()
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
dbc.close()
dbc.open()
wt.getDbConn().close()
wt.getDbConn().open()
else:
rTables = dbc.getQueryResult()
rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
......@@ -1137,8 +1444,33 @@ class TaskDropSuperTable(StateTransitionTask):
return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
wt.execSql("drop table db.{}".format(tblName))
# 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
if Dice.throw(2) == 0 :
tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True
for i in tblSeq:
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
try:
self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme
if ( errno2 in [0x362]) : # mnode invalid table name
isSuccess = False
logger.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table
if (not tickOutput):
tickOutput = True # Print only one time
if isSuccess :
print("d", end="", flush=True)
else:
print("f", end="", flush=True)
# Drop the super table itself
tblName = self._dbManager.getFixedSuperTableName()
self.execWtSql(wt, "drop table db.{}".format(tblName))
class TaskAlterTags(StateTransitionTask):
@classmethod
......@@ -1153,20 +1485,18 @@ class TaskAlterTags(StateTransitionTask):
tblName = self._dbManager.getFixedSuperTableName()
dice = Dice.throw(4)
if dice == 0 :
wt.execSql("alter table db.{} add tag extraTag int".format(tblName))
sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1 :
wt.execSql("alter table db.{} drop tag extraTag".format(tblName))
sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2 :
wt.execSql("alter table db.{} drop tag newTag".format(tblName))
sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName))
sql = "alter table db.{} change tag extraTag newTag".format(tblName)
self.execWtSql(wt, sql)
class TaskAddData(StateTransitionTask):
activeTable : Set[int] = set() # Track which table is being actively worked on
LARGE_NUMBER_OF_TABLES = 35
SMALL_NUMBER_OF_TABLES = 3
LARGE_NUMBER_OF_RECORDS = 50
SMALL_NUMBER_OF_RECORDS = 3
# We use these two files to record operations to DB, useful for power-off tests
fAddLogReady = None
......@@ -1192,7 +1522,7 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq)
for i in tblSeq:
......@@ -1202,10 +1532,10 @@ class TaskAddData(StateTransitionTask):
print("x", end="", flush=True)
else:
self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
nextInt = ds.getNextInt()
regTableName = "db.reg_table_{}".format(i)
nextInt = ds.getNextInt()
if gConfig.record_ops:
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
......@@ -1216,7 +1546,9 @@ class TaskAddData(StateTransitionTask):
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt)
wt.execSql(sql)
self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.flush()
......@@ -1285,17 +1617,213 @@ class LoggingFilter(logging.Filter):
if ( record.levelno >= logging.INFO ) :
return True # info or above always log
msg = record.msg
# print("type = {}, value={}".format(type(msg), msg))
# sys.exit()
# Commenting out below to adjust...
# if msg.startswith("[TRD]"):
# return False
return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.ioThread = None
self.subProcess = None
self.shouldStop = False
self.status = MainExec.STATUS_RUNNING
def svcOutputReader(self, out: IO, queue):
# print("This is the svcOutput Reader...")
for line in out : # iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
queue.put(line.rstrip()) # get rid of new lines
print("No more output from incoming IO") # meaning sub process must have died
out.close()
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
self.subProcess.send_signal(signal.SIGINT)
self.shouldStop = True
self.joinIoThread()
def joinIoThread(self):
if self.ioThread :
self.ioThread.join()
self.ioThread = None
def run(self):
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
q = Queue()
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
while True :
try:
line = q.get_nowait() # getting output at fast speed
except Empty:
# print('no output yet')
time.sleep(2.3) # wait only if there's no output
else: # got line
print(line)
# print("----end of iteration----")
if self.shouldStop:
print("Ending main Svc thread")
break
print("end of loop")
self.joinIoThread()
print("Finished")
class ClientManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.status = MainExec.STATUS_RUNNING
self.tc = None
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
self.tc.requestToStop()
def _printLastNumbers(self): # to verify data durability
dbManager = DbManager(resetDb=False)
dbc = dbManager.getDbConn()
if dbc.query("show databases") == 0 : # no databae
return
if dbc.query("show tables") == 0 : # no tables
return
dbc.execute("use db")
sTbName = dbManager.getFixedSuperTableName()
# get all regular tables
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
rTables = dbc.getQueryResult()
bList = TaskExecutor.BoundedList()
for rTbName in rTables : # regular tables
dbc.query("select speed from db.{}".format(rTbName[0]))
numbers = dbc.getQueryResult()
for row in numbers :
# print("<{}>".format(n), end="", flush=True)
bList.add(row[0])
print("Top numbers in DB right now: {}".format(bList))
print("TDengine client execution is about to start in 2 seconds...")
time.sleep(2.0)
dbManager = None # release?
def prepare(self):
self._printLastNumbers()
def run(self):
self._printLastNumbers()
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
def conclude(self):
self.tc.printStats()
self.tc.getDbManager().cleanUp()
class MainExec:
STATUS_RUNNING = 1
STATUS_STOPPING = 2
# STATUS_STOPPED = 3 # Not used yet
@classmethod
def runClient(cls):
clientManager = ClientManager()
return clientManager.run()
@classmethod
def runService(cls):
svcManager = SvcManager()
svcManager.run()
@classmethod
def runTemp(cls): # for debugging purposes
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
return
def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
......@@ -1308,93 +1836,55 @@ def main():
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
'''))
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-d', '--debug', action='store_true',
help='Turn on DEBUG mode for more logging (default: false)')
parser.add_argument('-e', '--run-tdengine', action='store_true',
help='Run TDengine service in foreground (default: false)')
parser.add_argument('-l', '--larger-data', action='store_true',
help='Write larger amount of data during write operations (default: false)')
parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
help='Use a single shared db connection (default: false)')
parser.add_argument('-r', '--record-ops', action='store_true',
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
help='Maximum number of steps to run (default: 100)')
parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
help='Number of threads to run (default: 10)')
global gConfig
gConfig = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
# Logging Stuff
global logger
logger = logging.getLogger('CrashGen')
logger.addFilter(LoggingFilter())
_logger = logging.getLogger('CrashGen') # real logger
_logger.addFilter(LoggingFilter())
ch = logging.StreamHandler()
_logger.addHandler(ch)
logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger
if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)
# resetDb = False # DEBUG only
# dbState = DbState(resetDb) # DBEUG only!
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator(
ThreadPool(gConfig.num_threads, gConfig.max_steps),
# WorkDispatcher(dbState), # Obsolete?
dbManager
)
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Run server or client
if gConfig.run_tdengine : # run server
MainExec.runService()
else :
return MainExec.runClient()
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:
# rows = dbc.query("show databases")
# print("Rows: {}, time={}".format(rows, time.time()))
tc.run()
tc.logStats()
dbManager.cleanUp()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__":
main()
exitCode = main()
# print("Exiting with code: {}".format(exitCode))
sys.exit(exitCode)
......@@ -38,4 +38,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib
# Now we are all let, and let's see if we can find a crash. Note we pass all params
./crash_gen.py $@
python3 ./crash_gen.py $@
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册