提交 f9c9684b 编写于 作者: S Shuduo Sang

add crash_gen to CI

[TD-862]
上级 cb836b55
......@@ -61,6 +61,11 @@ matrix:
cd ${TRAVIS_BUILD_DIR}/tests
./test-all.sh smoke || travis_terminate $?
sleep 1
cd ${TRAVIS_BUILD_DIR}/tests/pytest
./crash_gen.sh -p -t 5 -s 50|| travis_terminate $?
sleep 1
cd ${TRAVIS_BUILD_DIR}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
......
#-----!/usr/bin/python3.7
# -----!/usr/bin/python3.7
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
......@@ -11,7 +11,31 @@
###################################################################
# -*- coding: utf-8 -*-
from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
import sys
import os
......@@ -22,33 +46,6 @@ import traceback
if sys.version_info[0] < 3:
raise Exception("Must be using Python 3")
import getopt
import argparse
import copy
import requests
import threading
import random
import time
import logging
import datetime
import textwrap
import requests
from requests.auth import HTTPBasicAuth
from typing import List
from typing import Dict
from typing import Set
from typing import IO
from queue import Queue, Empty
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *
import crash_gen
import taos
# Global variables, tried to keep a small number.
......@@ -57,9 +54,11 @@ import taos
gConfig = argparse.Namespace() # Dummy value, will be replaced later
logger = None
def runThread(wt: WorkerThread):
wt.run()
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
......@@ -68,6 +67,7 @@ class CrashGenError(Exception):
def __str__(self):
return self.msg
class WorkerThread:
def __init__(self, pool: ThreadPool, tid,
tc: ThreadCoordinator,
......@@ -82,9 +82,10 @@ class WorkerThread:
self._stepGate = threading.Event()
# Let us have a DB connection of our own
if ( gConfig.per_thread_db_connection ): # type: ignore
if (gConfig.per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type))
self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest()
self._dbConn = DbConn.createNative() if (
gConfig.connector_type == 'native') else DbConn.createRest()
self._dbInUse = False # if "use db" was executed already
......@@ -98,7 +99,7 @@ class WorkerThread:
return self._dbInUse
def useDb(self):
if ( not self._dbInUse ):
if (not self._dbInUse):
self.execSql("use db")
self._dbInUse = True
......@@ -113,51 +114,62 @@ class WorkerThread:
# self.isSleeping = False
logger.info("Starting to run thread: {}".format(self._tid))
if ( gConfig.per_thread_db_connection ): # type: ignore
if (gConfig.per_thread_db_connection): # type: ignore
logger.debug("Worker thread openning database connection")
self._dbConn.open()
self._doTaskLoop()
# clean up
if ( gConfig.per_thread_db_connection ): # type: ignore
if (gConfig.per_thread_db_connection): # type: ignore
self._dbConn.close()
def _doTaskLoop(self) :
def _doTaskLoop(self):
# while self._curStep < self._pool.maxSteps:
# tc = ThreadCoordinator(None)
while True:
tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
logger.debug(
"[TRD] Worker thread [{}] exited barrier...".format(
self._tid))
self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
logger.debug(
"[TRD] Worker thread [{}] exited step gate...".format(
self._tid))
if not self._tc.isRunning():
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
logger.debug(
"[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Fetch a task from the Thread Coordinator
logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
logger.debug(
"[TRD] Worker thread [{}] about to fetch task".format(
self._tid))
task = tc.fetchTask()
# Execute such a task
logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
logger.debug(
"[TRD] Worker thread [{}] about to execute task: {}".format(
self._tid, task.__class__.__name__))
task.execute(self)
tc.saveExecutedTask(task)
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
logger.debug(
"[TRD] Worker thread [{}] finished executing task".format(
self._tid))
self._dbInUse = False # there may be changes between steps
def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ):
if (threading.get_ident() != self._thread.ident):
raise RuntimeError("Unexpectly called from other threads")
def verifyThreadMain(self): # ensure we are called by the main thread
if ( threading.get_ident() != threading.main_thread().ident ):
if (threading.get_ident() != threading.main_thread().ident):
raise RuntimeError("Unexpectly called from other threads")
def verifyThreadAlive(self):
if ( not self._thread.is_alive() ):
if (not self._thread.is_alive()):
raise RuntimeError("Unexpected dead thread")
# A gate is different from a barrier in that a thread needs to be "tapped"
......@@ -166,7 +178,9 @@ class WorkerThread:
self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped"
logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
logger.debug(
"[TRD] Worker thread {} about to cross the step gate".format(
self._tid))
self._stepGate.wait()
self._stepGate.clear()
......@@ -181,25 +195,25 @@ class WorkerThread:
time.sleep(0) # let the released thread run a bit
def execSql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
if (gConfig.per_thread_db_connection):
return self._dbConn.execute(sql)
else:
return self._tc.getDbManager().getDbConn().execute(sql)
def querySql(self, sql): # TODO: expose DbConn directly
if ( gConfig.per_thread_db_connection ):
if (gConfig.per_thread_db_connection):
return self._dbConn.query(sql)
else:
return self._tc.getDbManager().getDbConn().query(sql)
def getQueryResult(self):
if ( gConfig.per_thread_db_connection ):
if (gConfig.per_thread_db_connection):
return self._dbConn.getQueryResult()
else:
return self._tc.getDbManager().getDbConn().getQueryResult()
def getDbConn(self):
if ( gConfig.per_thread_db_connection ):
if (gConfig.per_thread_db_connection):
return self._dbConn
else:
return self._tc.getDbManager().getDbConn()
......@@ -211,6 +225,8 @@ class WorkerThread:
# return self._tc.getDbState().getDbConn().query(sql)
# The coordinator of all worker threads, mostly running in main thread
class ThreadCoordinator:
def __init__(self, pool: ThreadPool, dbManager):
self._curStep = -1 # first step is 0
......@@ -221,14 +237,15 @@ class ThreadCoordinator:
self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
self._stepBarrier = threading.Barrier(
self._pool.numThreads + 1) # one barrier for all threads
self._execStats = ExecutionStats()
self._runStatus = MainExec.STATUS_RUNNING
def getTaskExecutor(self):
return self._te
def getDbManager(self) -> DbManager :
def getDbManager(self) -> DbManager:
return self._dbManager
def crossStepBarrier(self):
......@@ -247,50 +264,58 @@ class ThreadCoordinator:
self._execStats.startExec() # start the stop watch
transitionFailed = False
hasAbortedTask = False
while(self._curStep < maxSteps-1 and
while(self._curStep < maxSteps - 1 and
(not transitionFailed) and
(self._runStatus==MainExec.STATUS_RUNNING) and
(self._runStatus == MainExec.STATUS_RUNNING) and
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
if not gConfig.debug:
print(".", end="", flush=True) # print this only if we are not in debug mode
# print this only if we are not in debug mode
print(".", end="", flush=True)
logger.debug("[TRD] Main thread going to sleep")
# Now main thread (that's us) is ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
# let other threads go past the pool barrier, but wait at the
# thread gate
self.crossStepBarrier()
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
# We use this period to do house keeping work, when all worker threads are QUIET.
# We use this period to do house keeping work, when all worker
# threads are QUIET.
hasAbortedTask = False
for task in self._executedTasks :
if task.isAborted() :
for task in self._executedTasks:
if task.isAborted():
print("Task aborted: {}".format(task))
hasAbortedTask = True
break
if hasAbortedTask : # do transition only if tasks are error free
if hasAbortedTask: # do transition only if tasks are error free
self._execStats.registerFailure("Aborted Task Encountered")
else:
try:
sm = self._dbManager.getStateMachine()
logger.debug("[STT] starting transitions")
sm.transition(self._executedTasks) # at end of step, transiton the DB state
# at end of step, transiton the DB state
sm.transition(self._executedTasks)
logger.debug("[STT] transition ended")
# Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
if sm.hasDatabase() :
# Due to limitation (or maybe not) of the Python library,
# we cannot share connections across threads
if sm.hasDatabase():
for t in self._pool.threadList:
logger.debug("[DB] use db for all worker threads")
t.useDb()
# t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
# t.execSql("use db") # main thread executing "use
# db" on behalf of every worker thread
except taos.error.ProgrammingError as err:
if ( err.msg == 'network unavailable' ): # broken DB connection
if (err.msg == 'network unavailable'): # broken DB connection
logger.info("DB connection broken, execution failed")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at end, and maybe signal them to stop
# continue # don't do that, need to tap all threads at
# end, and maybe signal them to stop
else:
raise
# finally:
......@@ -301,14 +326,20 @@ class ThreadCoordinator:
# Get ready for next step
logger.debug("<-- Step {} finished".format(self._curStep))
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
# Now not all threads had time to go to sleep
logger.debug(
"\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
# A new TE for the new step
if not transitionFailed: # only if not failed
self._te = TaskExecutor(self._curStep)
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task
logger.debug(
"[TRD] Main thread waking up at step {}, tapping worker threads".format(
self._curStep)) # Now not all threads had time to go to sleep
# Worker threads will wake up at this point, and each execute it's
# own task
self.tapAllThreads()
logger.debug("Main thread ready to finish up...")
if not transitionFailed: # only in regular situations
......@@ -336,21 +367,24 @@ class ThreadCoordinator:
def tapAllThreads(self): # in a deterministic manner
wakeSeq = []
for i in range(self._pool.numThreads): # generate a random sequence
if Dice.throw(2) == 1 :
if Dice.throw(2) == 1:
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
logger.debug(
"[TRD] Main thread waking up worker threads: {}".format(
str(wakeSeq)))
# TODO: set dice seed to a deterministic value
for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
# TODO: maybe a bit too deep?!
self._pool.threadList[i].tapStepGate()
time.sleep(0) # yield
def isRunning(self):
return self._te != None
return self._te is not None
def fetchTask(self) -> Task :
if ( not self.isRunning() ): # no task
def fetchTask(self) -> Task:
if (not self.isRunning()): # no task
raise RuntimeError("Cannot fetch task when not running")
# return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task
......@@ -361,8 +395,11 @@ class ThreadCoordinator:
# logger.debug(" (dice:{}/{}) ".format(i, nTasks))
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
# return tasks[i].clone() # TODO: still necessary?
taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state
return taskType(self.getDbManager(), self._execStats) # create a task from it
# pick a task type for current state
taskType = self.getDbManager().getStateMachine().pickTaskType()
return taskType(
self.getDbManager(),
self._execStats) # create a task from it
def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread
......@@ -372,6 +409,8 @@ class ThreadCoordinator:
self._executedTasks.append(task)
# We define a class to run a number of threads in locking steps.
class ThreadPool:
def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads
......@@ -394,6 +433,8 @@ class ThreadPool:
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
class LinearQueue():
def __init__(self):
self.firstIndex = 1 # 1st ever element
......@@ -402,7 +443,8 @@ class LinearQueue():
self.inUse = set() # the indexes that are in use right now
def toText(self):
return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)
return "[{}..{}], in use: {}".format(
self.firstIndex, self.lastIndex, self.inUse)
# Push (add new element, largest) to the tail, and mark it in use
def push(self):
......@@ -418,12 +460,12 @@ class LinearQueue():
def pop(self):
with self._lock:
if ( self.isEmpty() ):
if (self.isEmpty()):
# raise RuntimeError("Cannot pop an empty queue")
return False # TODO: None?
index = self.firstIndex
if ( index in self.inUse ):
if (index in self.inUse):
return False
self.firstIndex += 1
......@@ -441,8 +483,9 @@ class LinearQueue():
def allocate(self, i):
with self._lock:
# logger.debug("LQ allocating item {}".format(i))
if ( i in self.inUse ):
raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
if (i in self.inUse):
raise RuntimeError(
"Cannot re-use same index in queue: {}".format(i))
self.inUse.add(i)
def release(self, i):
......@@ -454,20 +497,21 @@ class LinearQueue():
return self.lastIndex + 1 - self.firstIndex
def pickAndAllocate(self):
if ( self.isEmpty() ):
if (self.isEmpty()):
return None
with self._lock:
cnt = 0 # counting the interations
while True:
cnt += 1
if ( cnt > self.size()*10 ): # 10x iteration already
if (cnt > self.size() * 10): # 10x iteration already
# raise RuntimeError("Failed to allocate LinearQueue element")
return None
ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
if ( not ret in self.inUse ):
ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
if (ret not in self.inUse):
self.allocate(ret)
return ret
class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
......@@ -480,7 +524,8 @@ class DbConn:
elif connType == cls.TYPE_REST:
return DbConnRest()
else:
raise RuntimeError("Unexpected connection type: {}".format(connType))
raise RuntimeError(
"Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls):
......@@ -495,18 +540,21 @@ class DbConn:
self._type = self.TYPE_INVALID
def open(self):
if ( self.isOpen ):
if (self.isOpen):
raise RuntimeError("Cannot re-open an existing DB connection")
# below implemented by child classes
self.openByType()
logger.debug("[DB] data connection opened, type = {}".format(self._type))
logger.debug(
"[DB] data connection opened, type = {}".format(
self._type))
self.isOpen = True
def resetDb(self): # reset the whole database, etc.
if ( not self.isOpen ):
raise RuntimeError("Cannot reset database until connection is open")
if (not self.isOpen):
raise RuntimeError(
"Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc.
self.execute('drop database if exists db')
......@@ -515,34 +563,44 @@ class DbConn:
# self._cursor.execute('use db')
# tdSql.execute('show databases')
def queryScalar(self, sql) -> int :
def queryScalar(self, sql) -> int:
return self._queryAny(sql)
def queryString(self, sql) -> str :
def queryString(self, sql) -> str:
return self._queryAny(sql)
def _queryAny(self, sql) : # actual query result as an int
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1 :
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
if nRows != 1:
raise RuntimeError(
"Unexpected result for query: {}, rows = {}".format(
sql, nRows))
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
raise RuntimeError(
"Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def execute(self, sql):
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
def __init__(self):
super().__init__()
......@@ -554,44 +612,50 @@ class DbConnRest(DbConn):
pass # do nothing, always open
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
if (not self.isOpen):
raise RuntimeError(
"Cannot clean up database until connection is open")
# Do nothing for REST
logger.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
data=sql,
auth=HTTPBasicAuth('root', 'taosdata'))
rj = r.json()
# Sanity check for the "Json Result"
if (not 'status' in rj):
if ('status' not in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if (not 'code' in rj): # error without code
if ('code' not in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc'
raise taos.error.ProgrammingError(
rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError("Unexpected REST return status: {}".format(rj['status']))
raise RuntimeError(
"Unexpected REST return status: {}".format(
rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
logger.debug(
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql) : # return rows affected
def query(self, sql): # return rows affected
return self.execute(sql)
def getQueryResult(self):
......@@ -606,6 +670,7 @@ class DbConnRest(DbConn):
print(self._result)
raise RuntimeError("TBD")
class DbConnNative(DbConn):
def __init__(self):
super().__init__()
......@@ -615,38 +680,48 @@ class DbConnNative(DbConn):
def openByType(self): # Open connection
cfgPath = "../../build/test/cfg"
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._conn = taos.connect(
host="127.0.0.1",
config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every step
# self._cursor.execute('use db') # do this at the beginning of every
# step
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
if (not self.isOpen):
raise RuntimeError(
"Cannot clean up database until connection is open")
self._tdSql.close()
logger.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.execute(sql)
logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
logger.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
def query(self, sql) : # return rows affected
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
logger.debug("[SQL] Executing SQL: {}".format(sql))
nRows = self._tdSql.query(sql)
logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
logger.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
......@@ -680,7 +755,8 @@ class AnyState:
self._info = self.getInfo()
def __str__(self):
return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
# -1 hack to accomodate the STATE_INVALID case
return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
def getInfo(self):
raise RuntimeError("Must be overriden by child classes")
......@@ -691,7 +767,9 @@ class AnyState:
elif isinstance(other, AnyState):
return self.getValIndex() == other.getValIndex()
else:
raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))
raise RuntimeError(
"Unexpected comparison, type = {}".format(
type(other)))
def verifyTasksToState(self, tasks, newState):
raise RuntimeError("Must be overriden by child classes")
......@@ -701,55 +779,65 @@ class AnyState:
def getValue(self):
return self._info[self.STATE_VAL_IDX]
def canCreateDb(self):
return self._info[self.CAN_CREATE_DB]
def canDropDb(self):
return self._info[self.CAN_DROP_DB]
def canCreateFixedSuperTable(self):
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self):
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
def canAddData(self):
return self._info[self.CAN_ADD_DATA]
def canReadData(self):
return self._info[self.CAN_READ_DATA]
def assertAtMostOneSuccess(self, tasks, cls):
sCnt = 0
for task in tasks :
for task in tasks:
if not isinstance(task, cls):
continue
if task.isSuccess():
# task.logDebug("Task success found")
sCnt += 1
if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
if (sCnt >= 2):
raise RuntimeError(
"Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls):
sCnt = 0
exists = False
for task in tasks :
for task in tasks:
if not isinstance(task, cls):
continue
exists = True # we have a valid instance
if task.isSuccess():
sCnt += 1
if ( exists and sCnt <= 0 ):
raise RuntimeError("Unexpected zero success for task: {}".format(cls))
if (exists and sCnt <= 0):
raise RuntimeError(
"Unexpected zero success for task: {}".format(cls))
def assertNoTask(self, tasks, cls):
for task in tasks :
for task in tasks:
if isinstance(task, cls):
raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
raise CrashGenError(
"This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
def assertNoSuccess(self, tasks, cls):
for task in tasks :
for task in tasks:
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError("Unexpected successful task: {}".format(cls))
raise RuntimeError(
"Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls):
for task in tasks :
for task in tasks:
if not isinstance(task, cls):
continue
if task.isSuccess():
......@@ -757,11 +845,12 @@ class AnyState:
return False
def hasTask(self, tasks, cls):
for task in tasks :
for task in tasks:
if isinstance(task, cls):
return True
return False
class StateInvalid(AnyState):
def getInfo(self):
return [
......@@ -773,6 +862,7 @@ class StateInvalid(AnyState):
# def verifyTasksToState(self, tasks, newState):
class StateEmpty(AnyState):
def getInfo(self):
return [
......@@ -783,9 +873,12 @@ class StateEmpty(AnyState):
]
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB
if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks
self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers
if (self.hasSuccess(tasks, TaskCreateDb)
): # at EMPTY, if there's succes in creating DB
if (not self.hasTask(tasks, TaskDropDb)): # and no drop_db tasks
# we must have at most one. TODO: compare numbers
self.assertAtMostOneSuccess(tasks, TaskCreateDb)
class StateDbOnly(AnyState):
def getInfo(self):
......@@ -797,8 +890,9 @@ class StateDbOnly(AnyState):
]
def verifyTasksToState(self, tasks, newState):
if ( not self.hasTask(tasks, TaskCreateDb) ):
self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more
if (not self.hasTask(tasks, TaskCreateDb)):
# only if we don't create any more
self.assertAtMostOneSuccess(tasks, TaskDropDb)
self.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
# Nothing to be said about adding data task
......@@ -823,6 +917,7 @@ class StateDbOnly(AnyState):
# # raise RuntimeError("Unexpected no-success scenario") # We might just landed all failure tasks,
# self._state = self.STATE_DB_ONLY # no change
class StateSuperTableOnly(AnyState):
def getInfo(self):
return [
......@@ -833,9 +928,11 @@ class StateSuperTableOnly(AnyState):
]
def verifyTasksToState(self, tasks, newState):
if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
if (self.hasSuccess(tasks, TaskDropSuperTable)
): # we are able to drop the table
#self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it
# we must have had recreted it
self.hasSuccess(tasks, TaskCreateSuperTable)
# self._state = self.STATE_DB_ONLY
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
......@@ -849,6 +946,7 @@ class StateSuperTableOnly(AnyState):
# raise RuntimeError("Unexpected no-success scenarios")
# TODO: need to revamp!!
class StateHasData(AnyState):
def getInfo(self):
return [
......@@ -859,13 +957,15 @@ class StateHasData(AnyState):
]
def verifyTasksToState(self, tasks, newState):
if ( newState.equals(AnyState.STATE_EMPTY) ):
if (newState.equals(AnyState.STATE_EMPTY)):
self.hasSuccess(tasks, TaskDropDb)
if ( not self.hasTask(tasks, TaskCreateDb) ) :
if (not self.hasTask(tasks, TaskCreateDb)):
self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task
self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
elif (newState.equals(AnyState.STATE_DB_ONLY)): # in DB only
if (not self.hasTask(tasks, TaskCreateDb)
): # without a create_db task
# we must have drop_db task
self.assertNoTask(tasks, TaskDropDb)
self.hasSuccess(tasks, TaskDropSuperTable)
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
# elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
......@@ -874,17 +974,24 @@ class StateHasData(AnyState):
# self.assertNoTask(tasks, TaskAddData)
# self.hasSuccess(tasks, DeleteDataTasks)
else: # should be STATE_HAS_DATA
if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table
self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
if (not self.hasTask(tasks, TaskCreateDb)
): # only if we didn't create one
# we shouldn't have dropped it
self.assertNoTask(tasks, TaskDropDb)
if (not self.hasTask(tasks, TaskCreateSuperTable)
): # if we didn't create the table
# we should not have a task that drops it
self.assertNoTask(tasks, TaskDropSuperTable)
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
class StateMechine:
def __init__(self, dbConn):
self._dbConn = dbConn
self._curState = self._findCurrentState() # starting state
self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
# transitition target probabilities, indexed with value of STATE_EMPTY,
# STATE_DB_ONLY, etc.
self._stateWeights = [1, 3, 5, 15]
def getCurrentState(self):
return self._curState
......@@ -906,129 +1013,165 @@ class StateMechine:
# t = tc(self) # create task object
if tc.canBeginFrom(self._curState):
firstTaskTypes.append(tc)
# now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
# now we have all the tasks that can begin directly from the current
# state, let's figure out the INDIRECT ones
taskTypes = firstTaskTypes.copy() # have to have these
for task1 in firstTaskTypes: # each task type gathered so far
endState = task1.getEndState() # figure the end state
if endState == None: # does not change end state
if endState is None: # does not change end state
continue # no use, do nothing
for tc in allTaskClasses: # what task can further begin from there?
if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
taskTypes.append(tc) # gather it
if len(taskTypes) <= 0:
raise RuntimeError("No suitable task types found for state: {}".format(self._curState))
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))
raise RuntimeError(
"No suitable task types found for state: {}".format(
self._curState))
logger.debug(
"[OPS] Tasks found for state {}: {}".format(
self._curState,
typesToStrings(taskTypes)))
return taskTypes
def _findCurrentState(self):
dbc = self._dbConn
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
if dbc.query("show databases") == 0 : # no database?!
if dbc.query("show databases") == 0: # no database?!
# logger.debug("Found EMPTY state")
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] empty database found, between {} and {}".format(
ts, time.time()))
return StateEmpty()
dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
if dbc.query("show tables") == 0 : # no tables
# did not do this when openning connection, and this is NOT the worker
# thread, which does this on their own
dbc.execute("use db")
if dbc.query("show tables") == 0: # no tables
# logger.debug("Found DB ONLY state")
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] DB_ONLY found, between {} and {}".format(
ts, time.time()))
return StateDbOnly()
if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables
if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName())
) == 0: # no regular tables
# logger.debug("Found TABLE_ONLY state")
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] SUPER_TABLE_ONLY found, between {} and {}".format(
ts, time.time()))
return StateSuperTableOnly()
else: # has actual tables
# logger.debug("Found HAS_DATA state")
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
logger.debug(
"[STT] HAS_DATA found, between {} and {}".format(
ts, time.time()))
return StateHasData()
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
if (len(tasks) == 0): # before 1st step, or otherwise empty
logger.debug("[STT] Starting State: {}".format(self._curState))
return # do nothing
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
# this should show up in the server log, separating steps
self._dbConn.execute("show dnodes")
# Generic Checks, first based on the start state
if self._curState.canCreateDb():
self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
# case of multiple creation and drops
if self._curState.canDropDb():
self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
# case of drop-create-drop
# if self._state.canCreateFixedTable():
# self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
# really, in case of create-drop-create
# if self._state.canDropFixedTable():
# self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
# really in case of drop-create-drop
# if self._state.canAddData():
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true
# actually
# if self._state.canReadData():
# Nothing for sure
newState = self._findCurrentState()
logger.debug("[STT] New DB state determined: {}".format(newState))
self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
# can old state move to new state through the tasks?
self._curState.verifyTasksToState(tasks, newState)
self._curState = newState
def pickTaskType(self):
taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state
# all the task types we can choose from at curent state
taskTypes = self.getTaskTypes()
weights = []
for tt in taskTypes:
endState = tt.getEndState()
if endState != None :
weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
if endState is not None:
# TODO: change to a method
weights.append(self._stateWeights[endState.getValIndex()])
else:
weights.append(10) # read data task, default to 10: TODO: change to a constant
# read data task, default to 10: TODO: change to a constant
weights.append(10)
i = self._weighted_choice_sub(weights)
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
return taskTypes[i]
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
# ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def _weighted_choice_sub(self, weights):
# TODO: use our dice to ensure it being determinstic?
rnd = random.random() * sum(weights)
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
return i
# Manager of the Database Data/Connection
class DbManager():
def __init__(self, resetDb = True):
def __init__(self, resetDb=True):
self.tableNumQueue = LinearQueue()
self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick
# datetime.datetime(2019, 1, 1) # initial date time tick
self._lastTick = self.setupLastTick()
self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock()
# self.openDbServerConnection()
self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest()
self._dbConn = DbConn.createNative() if (
gConfig.connector_type == 'native') else DbConn.createRest()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if ( err.msg == 'client disconnected' ): # cannot open DB connection
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
raise
except:
except BaseException:
print("[=] Unexpected exception")
raise
if resetDb :
if resetDb:
self._dbConn.resetDb() # drop and recreate DB
self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape
# Do this after dbConn is in proper shape
self._stateMachine = StateMechine(self._dbConn)
def getDbConn(self):
return self._dbConn
def getStateMachine(self) -> StateMechine :
def getStateMachine(self) -> StateMechine:
return self._stateMachine
# def getState(self):
......@@ -1043,11 +1186,14 @@ class DbManager():
def setupLastTick(self):
t1 = datetime.datetime(2020, 6, 1)
t2 = datetime.datetime.now()
elSec = int(t2.timestamp() - t1.timestamp()) # maybe a very large number, takes 69 years to exceed Python int range
elSec2 = ( elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500 ) ) * 500 # a number representing seconds within 10 years
# maybe a very large number, takes 69 years to exceed Python int range
elSec = int(t2.timestamp() - t1.timestamp())
elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
500 # a number representing seconds within 10 years
# print("elSec = {}".format(elSec))
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above
t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above
logger.info("Setting up TICKS to start from: {}".format(t4))
return t4
......@@ -1068,10 +1214,11 @@ class DbManager():
def getNextTick(self):
with self._lock: # prevent duplicate tick
if Dice.throw(10) == 0 : # 1 in 10 chance
if Dice.throw(10) == 0: # 1 in 10 chance
return self._lastTick + datetime.timedelta(0, -100)
else: # regular
self._lastTick += datetime.timedelta(0, 1) # add one second to it
# add one second to it
self._lastTick += datetime.timedelta(0, 1)
return self._lastTick
def getNextInt(self):
......@@ -1080,14 +1227,15 @@ class DbManager():
return self._lastInt
def getNextBinary(self):
return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(self.getNextInt())
return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
self.getNextInt())
def getNextFloat(self):
return 0.9 + self.getNextInt()
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if ( not tblNum ): # maybe false
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
......@@ -1095,13 +1243,14 @@ class DbManager():
def cleanUp(self):
self._dbConn.close()
class TaskExecutor():
class BoundedList:
def __init__(self, size = 10):
def __init__(self, size=10):
self._size = size
self._list = []
def add(self, n: int) :
def add(self, n: int):
if not self._list: # empty
self._list.append(n)
return
......@@ -1110,22 +1259,22 @@ class TaskExecutor():
insPos = 0
for i in range(nItems):
insPos = i
if n <= self._list[i] : # smaller than this item, time to insert
if n <= self._list[i]: # smaller than this item, time to insert
break # found the insertion point
insPos += 1 # insert to the right
if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item
if insPos == 0: # except for the 1st item, # TODO: elimiate first item as gating item
return # do nothing
# print("Inserting at postion {}, value: {}".format(insPos, n))
self._list.insert(insPos, n) # insert
newLen = len(self._list)
if newLen <= self._size :
if newLen <= self._size:
return # do nothing
elif newLen == (self._size + 1) :
elif newLen == (self._size + 1):
del self._list[0] # remove the first item
else :
else:
raise RuntimeError("Corrupt Bounded List")
def __str__(self):
......@@ -1156,6 +1305,7 @@ class TaskExecutor():
# def logDebug(self, msg):
# logger.debug(" T[{}.x]: ".format(self._curStep) + msg)
class Task():
taskSn = 100
......@@ -1181,7 +1331,7 @@ class Task():
self._lastSql = "" # last SQL executed/attempted
def isSuccess(self):
return self._err == None
return self._err is None
def isAborted(self):
return self._aborted
......@@ -1191,13 +1341,19 @@ class Task():
return newTask
def logDebug(self, msg):
self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
self._workerThread.logDebug(
"Step[{}.{}] {}".format(
self._curStep, self._taskNum, msg))
def logInfo(self, msg):
self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
self._workerThread.logInfo(
"Step[{}.{}] {}".format(
self._curStep, self._taskNum, msg))
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
raise RuntimeError(
"To be implemeted by child classes, class name: {}".format(
self.__class__.__name__))
def execute(self, wt: WorkerThread):
wt.verifyThreadSelf()
......@@ -1205,43 +1361,54 @@ class Task():
te = wt.getTaskExecutor()
self._curStep = te.getCurStep()
self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
self.logDebug(
"[-] executing task {}...".format(self.__class__.__name__))
self._err = None
self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
self._execStats.beginTaskType(
self.__class__.__name__) # mark beginning
try:
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
errno2 = err.errno if (
err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if (errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
1000 # REST catch-all error
]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
]): # allowed errors
self.logDebug(
"[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql))
print("_", end="", flush=True)
self._err = err
else:
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, self._lastSql)
self.logDebug(errMsg)
if gConfig.debug :
if gConfig.debug:
raise # so that we see full stack
else: # non-debug
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
print(
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"----------------------------\n")
# sys.exit(-1)
self._err = err
self._aborted = True
except Exception as e :
except Exception as e:
self.logInfo("Non-TAOS exception encountered")
self._err = e
self._aborted = True
traceback.print_exc()
except :
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
except BaseException:
self.logDebug(
"[=] Unexpected exception, SQL: {}".format(
self._lastSql))
raise
self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
self.logDebug("[X] task execution completed, {}, status: {}".format(
self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
# TODO: merge with above.
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess())
def execSql(self, sql):
self._lastSql = sql
......@@ -1259,10 +1426,10 @@ class Task():
return wt.getQueryResult()
class ExecutionStats:
def __init__(self):
self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
# total/success times for a task
self._execTimes: Dict[str, [int, int]] = {}
self._tasksInProgress = 0
self._lock = threading.Lock()
self._firstTaskStartTime = None
......@@ -1274,10 +1441,11 @@ class ExecutionStats:
self._failureReason = None
def __str__(self):
return "[ExecStats: _failed={}, _failureReason={}".format(self._failed, self._failureReason)
return "[ExecStats: _failed={}, _failureReason={}".format(
self._failed, self._failureReason)
def isFailed(self):
return self._failed == True
return self._failed
def startExec(self):
self._execStartTime = time.time()
......@@ -1295,14 +1463,14 @@ class ExecutionStats:
def beginTaskType(self, klassName):
with self._lock:
if self._tasksInProgress == 0 : # starting a new round
if self._tasksInProgress == 0: # starting a new round
self._firstTaskStartTime = time.time() # I am now the first task
self._tasksInProgress += 1
def endTaskType(self, klassName, isSuccess):
with self._lock:
self._tasksInProgress -= 1
if self._tasksInProgress == 0 : # all tasks have stopped
if self._tasksInProgress == 0: # all tasks have stopped
self._accRunTime += (time.time() - self._firstTaskStartTime)
self._firstTaskStartTime = None
......@@ -1311,23 +1479,36 @@ class ExecutionStats:
self._failureReason = reason
def printStats(self):
logger.info("----------------------------------------------------------------------")
logger.info("| Crash_Gen test {}, with the following stats:".
format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
logger.info(
"----------------------------------------------------------------------")
logger.info(
"| Crash_Gen test {}, with the following stats:". format(
"FAILED (reason: {})".format(
self._failureReason) if self._failed else "SUCCEEDED"))
logger.info("| Task Execution Times (success/total):")
execTimesAny = 0
for k, n in self._execTimes.items():
execTimesAny += n[0]
logger.info("| {0:<24}: {1}/{2}".format(k,n[1],n[0]))
logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
logger.info("----------------------------------------------------------------------")
logger.info("| {0:<24}: {1}/{2}".format(k, n[1], n[0]))
logger.info(
"| Total Tasks Executed (success or not): {} ".format(execTimesAny))
logger.info(
"| Total Tasks In Progress at End: {}".format(
self._tasksInProgress))
logger.info(
"| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
self._accRunTime))
logger.info(
"| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
logger.info(
"| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
self._elapsedTime))
logger.info(
"| Top numbers written: {}".format(
TaskExecutor.getBoundedList()))
logger.info(
"----------------------------------------------------------------------")
class StateTransitionTask(Task):
......@@ -1365,6 +1546,7 @@ class StateTransitionTask(Task):
def execute(self, wt: WorkerThread):
super().execute(wt)
class TaskCreateDb(StateTransitionTask):
@classmethod
def getEndState(cls):
......@@ -1377,6 +1559,7 @@ class TaskCreateDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self.execWtSql(wt, "create database db")
class TaskDropDb(StateTransitionTask):
@classmethod
def getEndState(cls):
......@@ -1390,6 +1573,7 @@ class TaskDropDb(StateTransitionTask):
self.execWtSql(wt, "drop database db")
logger.debug("[OPS] database dropped at {}".format(time.time()))
class TaskCreateSuperTable(StateTransitionTask):
@classmethod
def getEndState(cls):
......@@ -1406,8 +1590,11 @@ class TaskCreateSuperTable(StateTransitionTask):
tblName = self._dbManager.getFixedSuperTableName()
# wt.execSql("use db") # should always be in place
self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
self.execWtSql(
wt,
"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that
# automatically
class TaskReadData(StateTransitionTask):
......@@ -1421,19 +1608,23 @@ class TaskReadData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTbName = self._dbManager.getFixedSuperTableName()
self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
self.queryWtSql(wt, "select TBNAME from db.{}".format(
sTbName)) # TODO: analyze result set later
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
if random.randrange(
5) == 0: # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
wt.getDbConn().close()
wt.getDbConn().open()
else:
rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
# wt.getDbConn().getQueryResult()
rTables = self.getQueryResult(wt)
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
for rTbName in rTables : # regular tables
for rTbName in rTables: # regular tables
self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
class TaskDropSuperTable(StateTransitionTask):
@classmethod
def getEndState(cls):
......@@ -1444,26 +1635,33 @@ class TaskDropSuperTable(StateTransitionTask):
return state.canDropFixedSuperTable()
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
if Dice.throw(2) == 0 :
tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
# 1/2 chance, we'll drop the regular tables one by one, in a randomized
# sequence
if Dice.throw(2) == 0:
tblSeq = list(range(
2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
random.shuffle(tblSeq)
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
isSuccess = True
for i in tblSeq:
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
regTableName = self.getRegTableName(
i) # "db.reg_table_{}".format(i)
try:
self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL
self.execWtSql(wt, "drop table {}".format(
regTableName)) # nRows always 0, like MySQL
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme
if ( errno2 in [0x362]) : # mnode invalid table name
# correcting for strange error number scheme
errno2 = err.errno if (
err.errno > 0) else 0x80000000 + err.errno
if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False
logger.debug("[DB] Acceptable error when dropping a table")
logger.debug(
"[DB] Acceptable error when dropping a table")
continue # try to delete next regular table
if (not tickOutput):
tickOutput = True # Print only one time
if isSuccess :
if isSuccess:
print("d", end="", flush=True)
else:
print("f", end="", flush=True)
......@@ -1472,6 +1670,7 @@ class TaskDropSuperTable(StateTransitionTask):
tblName = self._dbManager.getFixedSuperTableName()
self.execWtSql(wt, "drop table db.{}".format(tblName))
class TaskAlterTags(StateTransitionTask):
@classmethod
def getEndState(cls):
......@@ -1484,31 +1683,36 @@ class TaskAlterTags(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbManager.getFixedSuperTableName()
dice = Dice.throw(4)
if dice == 0 :
if dice == 0:
sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1 :
elif dice == 1:
sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2 :
elif dice == 2:
sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
sql = "alter table db.{} change tag extraTag newTag".format(tblName)
sql = "alter table db.{} change tag extraTag newTag".format(
tblName)
self.execWtSql(wt, sql)
class TaskAddData(StateTransitionTask):
activeTable : Set[int] = set() # Track which table is being actively worked on
# Track which table is being actively worked on
activeTable: Set[int] = set()
# We use these two files to record operations to DB, useful for power-off tests
# We use these two files to record operations to DB, useful for power-off
# tests
fAddLogReady = None
fAddLogDone = None
@classmethod
def prepToRecordOps(cls):
if gConfig.record_ops :
if ( cls.fAddLogReady == None ):
logger.info("Recording in a file operations to be performed...")
if gConfig.record_ops:
if (cls.fAddLogReady is None):
logger.info(
"Recording in a file operations to be performed...")
cls.fAddLogReady = open("add_log_ready.txt", "w")
if ( cls.fAddLogDone == None ):
if (cls.fAddLogDone is None):
logger.info("Recording in a file operations completed...")
cls.fAddLogDone = open("add_log_done.txt", "w")
......@@ -1522,23 +1726,32 @@ class TaskAddData(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbManager
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
# wt.execSql("use db") # TODO: seems to be an INSERT bug to require
# this
tblSeq = list(
range(
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
random.shuffle(tblSeq)
for i in tblSeq:
if ( i in self.activeTable ): # wow already active
if (i in self.activeTable): # wow already active
# logger.info("Concurrent data insertion into table: {}".format(i))
# print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table
# print("ct({})".format(i), end="", flush=True) # Concurrent
# insertion into table
print("x", end="", flush=True)
else:
self.activeTable.add(i) # marking it active
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
# No need to shuffle data sequence, unless later we decide to do
# non-increment insertion
regTableName = self.getRegTableName(
i) # "db.reg_table_{}".format(i)
for j in range(
self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
nextInt = ds.getNextInt()
if gConfig.record_ops:
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.write(
"Ready to write {} to {}\n".format(
nextInt, regTableName))
self.fAddLogReady.flush()
os.fsync(self.fAddLogReady)
sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format(
......@@ -1547,10 +1760,13 @@ class TaskAddData(StateTransitionTask):
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), nextInt)
self.execWtSql(wt, sql)
# Successfully wrote the data into the DB, let's record it somehow
# Successfully wrote the data into the DB, let's record it
# somehow
te.recordDataMark(nextInt)
if gConfig.record_ops:
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.write(
"Wrote {} to {}\n".format(
nextInt, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone)
self.activeTable.discard(i) # not raising an error, unlike remove
......@@ -1563,7 +1779,8 @@ class Dice():
@classmethod
def seed(cls, s): # static
if (cls.seeded):
raise RuntimeError("Cannot seed the random generator more than once")
raise RuntimeError(
"Cannot seed the random generator more than once")
cls.verifyRNG()
random.seed(s)
cls.seeded = True # TODO: protect against multi-threading
......@@ -1574,7 +1791,7 @@ class Dice():
x1 = random.randrange(0, 1000)
x2 = random.randrange(0, 1000)
x3 = random.randrange(0, 1000)
if ( x1 != 864 or x2!=394 or x3!=776 ):
if (x1 != 864 or x2 != 394 or x3 != 776):
raise RuntimeError("System RNG is not deterministic")
@classmethod
......@@ -1583,7 +1800,7 @@ class Dice():
@classmethod
def throwRange(cls, start, stop): # up to stop-1
if ( not cls.seeded ):
if (not cls.seeded):
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
......@@ -1614,7 +1831,7 @@ class Dice():
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if ( record.levelno >= logging.INFO ) :
if (record.levelno >= logging.INFO):
return True # info or above always log
# Commenting out below to adjust...
......@@ -1623,11 +1840,13 @@ class LoggingFilter(logging.Filter):
# return False
return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
def __init__(self):
......@@ -1641,14 +1860,15 @@ class SvcManager:
def svcOutputReader(self, out: IO, queue):
# print("This is the svcOutput Reader...")
for line in out : # iter(out.readline, b''):
for line in out: # iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
queue.put(line.rstrip()) # get rid of new lines
print("No more output from incoming IO") # meaning sub process must have died
# meaning sub process must have died
print("No more output from incoming IO")
out.close()
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
if self.status != MainExec.STATUS_RUNNING:
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
......@@ -1659,7 +1879,7 @@ class SvcManager:
self.joinIoThread()
def joinIoThread(self):
if self.ioThread :
if self.ioThread:
self.ioThread.join()
self.ioThread = None
......@@ -1667,9 +1887,16 @@ class SvcManager:
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
self.subProcess = subprocess.Popen(
svcCmd,
stdout=subprocess.PIPE,
bufsize=1,
close_fds=ON_POSIX,
text=True)
q = Queue()
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
self.ioThread = threading.Thread(
target=self.svcOutputReader, args=(
self.subProcess.stdout, q))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
......@@ -1679,7 +1906,7 @@ class SvcManager:
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
while True :
while True:
try:
line = q.get_nowait() # getting output at fast speed
except Empty:
......@@ -1697,6 +1924,7 @@ class SvcManager:
self.joinIoThread()
print("Finished")
class ClientManager:
def __init__(self):
print("Starting service manager")
......@@ -1707,7 +1935,7 @@ class ClientManager:
self.tc = None
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
if self.status != MainExec.STATUS_RUNNING:
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
......@@ -1718,23 +1946,24 @@ class ClientManager:
def _printLastNumbers(self): # to verify data durability
dbManager = DbManager(resetDb=False)
dbc = dbManager.getDbConn()
if dbc.query("show databases") == 0 : # no databae
if dbc.query("show databases") == 0: # no databae
return
if dbc.query("show tables") == 0 : # no tables
if dbc.query("show tables") == 0: # no tables
return
dbc.execute("use db")
sTbName = dbManager.getFixedSuperTableName()
# get all regular tables
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
# TODO: analyze result set later
dbc.query("select TBNAME from db.{}".format(sTbName))
rTables = dbc.getQueryResult()
bList = TaskExecutor.BoundedList()
for rTbName in rTables : # regular tables
for rTbName in rTables: # regular tables
dbc.query("select speed from db.{}".format(rTbName[0]))
numbers = dbc.getQueryResult()
for row in numbers :
for row in numbers:
# print("<{}>".format(n), end="", flush=True)
bList.add(row[0])
......@@ -1759,7 +1988,8 @@ class ClientManager:
# print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
# Linux return code: ref https://shapeshed.com/unix-exit-codes/
return 1 if self.tc.isFailed() else 0
def conclude(self):
self.tc.printStats()
......@@ -1825,8 +2055,10 @@ class MainExec:
# print("Rows: {}, time={}".format(rows, time.time()))
return
def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
# Super cool Python argument library:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
......@@ -1837,21 +2069,51 @@ def main():
'''))
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
parser.add_argument(
'-c',
'--connector-type',
action='store',
default='native',
type=str,
help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-d', '--debug', action='store_true',
parser.add_argument(
'-d',
'--debug',
action='store_true',
help='Turn on DEBUG mode for more logging (default: false)')
parser.add_argument('-e', '--run-tdengine', action='store_true',
parser.add_argument(
'-e',
'--run-tdengine',
action='store_true',
help='Run TDengine service in foreground (default: false)')
parser.add_argument('-l', '--larger-data', action='store_true',
parser.add_argument(
'-l',
'--larger-data',
action='store_true',
help='Write larger amount of data during write operations (default: false)')
parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
parser.add_argument(
'-p',
'--per-thread-db-connection',
action='store_true',
help='Use a single shared db connection (default: false)')
parser.add_argument('-r', '--record-ops', action='store_true',
parser.add_argument(
'-r',
'--record-ops',
action='store_true',
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
parser.add_argument(
'-s',
'--max-steps',
action='store',
default=1000,
type=int,
help='Maximum number of steps to run (default: 100)')
parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
parser.add_argument(
'-t',
'--num-threads',
action='store',
default=5,
type=int,
help='Number of threads to run (default: 10)')
global gConfig
......@@ -1868,23 +2130,32 @@ def main():
ch = logging.StreamHandler()
_logger.addHandler(ch)
logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger
# Logging adapter, to be used as a logger
logger = MyLoggingAdapter(_logger, [])
if ( gConfig.debug ):
if (gConfig.debug):
logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
logger.setLevel(logging.INFO)
# Run server or client
if gConfig.run_tdengine : # run server
if gConfig.run_tdengine: # run server
MainExec.runService()
else :
else:
return MainExec.runClient()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__":
tdDnodes.init("")
tdDnodes.setTestCluster(False)
tdDnodes.setValgrind(False)
tdDnodes.stopAll()
tdDnodes.deploy(1)
tdDnodes.start(1)
tdLog.sleep(5)
exitCode = main()
# print("Exiting with code: {}".format(exitCode))
sys.exit(exitCode)
......@@ -31,11 +31,23 @@ then
exit -1
fi
CURR_DIR=`pwd`
IN_TDINTERNAL="community"
if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
TAOS_DIR=$CURR_DIR/../../..
else
TAOS_DIR=$CURR_DIR/../..
fi
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
echo $LIB_DIR
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
# Then let us set up the library path so that our compiled SO file can be loaded by Python
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
python3 ./crash_gen.py $@
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册