diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 06d6a1946213cdf4b348d5430433749f05a120d3..3f0f315d3388fcac1c752c847b7fa1c412b06749 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -1,6 +1,7 @@ from .cinterface import CTaosInterface from .error import * from .constants import FieldType +import threading # querySeqNum = 0 @@ -37,6 +38,7 @@ class TDengineCursor(object): self._block_iter = 0 self._affected_rows = 0 self._logfile = "" + self._threadId = threading.get_ident() if connection is not None: self._connection = connection @@ -103,6 +105,12 @@ class TDengineCursor(object): def execute(self, operation, params=None): """Prepare and execute a database operation (query or command). """ + # if threading.get_ident() != self._threadId: + # info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None + if not operation: return None @@ -188,6 +196,11 @@ class TDengineCursor(object): def fetchall(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation. """ + # if threading.get_ident() != self._threadId: + # info ="[WARNING] Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") @@ -232,6 +245,12 @@ class TDengineCursor(object): def _handle_result(self): """Handle the return result from query. """ + # if threading.get_ident() != self._threadId: + # info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None + self._description = [] for ele in self._fields: self._description.append( diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index cc41fd5e7d2e2c0de92fdd1b4d097d34b1102106..49c428b7f108b785f564564915aa2f8dbf52127e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3.7 +#-----!/usr/bin/python3.7 ################################################################### # Copyright (c) 2016 by TAOS Technologies, Inc. # All rights reserved. @@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h import sys import os +import io +import signal import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -23,6 +25,7 @@ if sys.version_info[0] < 3: import getopt import argparse import copy +import requests import threading import random @@ -30,10 +33,14 @@ import time import logging import datetime import textwrap +import requests +from requests.auth import HTTPBasicAuth from typing import List from typing import Dict from typing import Set +from typing import IO +from queue import Queue, Empty from util.log import * from util.dnodes import * @@ -76,7 +83,10 @@ class WorkerThread: # Let us have a DB connection of our own if ( gConfig.per_thread_db_connection ): # type: ignore - self._dbConn = DbConn() + # print("connector_type = {}".format(gConfig.connector_type)) + self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest() + + self._dbInUse = False # if "use db" was executed already def logDebug(self, msg): logger.debug(" TRD[{}] {}".format(self._tid, msg)) @@ -84,7 +94,14 @@ class WorkerThread: def logInfo(self, msg): logger.info(" TRD[{}] {}".format(self._tid, msg)) - + def dbInUse(self): + return self._dbInUse + + def useDb(self): + if ( not self._dbInUse ): + self.execSql("use db") + self._dbInUse = True + def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -97,6 +114,7 @@ class WorkerThread: logger.info("Starting to run thread: {}".format(self._tid)) if ( gConfig.per_thread_db_connection ): # type: ignore + logger.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() @@ -118,12 +136,17 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Fetch a task from the Thread Coordinator logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + + # Execute such a task logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) + + self._dbInUse = False # there may be changes between steps def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -163,6 +186,18 @@ class WorkerThread: else: return self._tc.getDbManager().getDbConn().execute(sql) + def querySql(self, sql): # TODO: expose DbConn directly + if ( gConfig.per_thread_db_connection ): + return self._dbConn.query(sql) + else: + return self._tc.getDbManager().getDbConn().query(sql) + + def getQueryResult(self): + if ( gConfig.per_thread_db_connection ): + return self._dbConn.getQueryResult() + else: + return self._tc.getDbManager().getDbConn().getQueryResult() + def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn @@ -175,8 +210,9 @@ class WorkerThread: # else: # return self._tc.getDbState().getDbConn().query(sql) +# The coordinator of all worker threads, mostly running in main thread class ThreadCoordinator: - def __init__(self, pool, dbManager): + def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd @@ -187,6 +223,7 @@ class ThreadCoordinator: self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads self._execStats = ExecutionStats() + self._runStatus = MainExec.STATUS_RUNNING def getTaskExecutor(self): return self._te @@ -197,6 +234,10 @@ class ThreadCoordinator: def crossStepBarrier(self): self._stepBarrier.wait() + def requestToStop(self): + self._runStatus = MainExec.STATUS_STOPPING + self._execStats.registerFailure("User Interruption") + def run(self): self._pool.createAndStartThreads(self) @@ -204,48 +245,73 @@ class ThreadCoordinator: self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore self._execStats.startExec() # start the stop watch - failed = False - while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 + transitionFailed = False + hasAbortedTask = False + while(self._curStep < maxSteps-1 and + (not transitionFailed) and + (self._runStatus==MainExec.STATUS_RUNNING) and + (not hasAbortedTask)): # maxStep==10, last curStep should be 9 + if not gConfig.debug: print(".", end="", flush=True) # print this only if we are not in debug mode logger.debug("[TRD] Main thread going to sleep") - # Now ready to enter a step + # Now main thread (that's us) is ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - try: - self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state - except taos.error.ProgrammingError as err: - if ( err.msg == 'network unavailable' ): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - failed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at end, and maybe signal them to stop - else: - raise - finally: - pass + # We use this period to do house keeping work, when all worker threads are QUIET. + hasAbortedTask = False + for task in self._executedTasks : + if task.isAborted() : + print("Task aborted: {}".format(task)) + hasAbortedTask = True + break + + if hasAbortedTask : # do transition only if tasks are error free + self._execStats.registerFailure("Aborted Task Encountered") + else: + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + sm.transition(self._executedTasks) # at end of step, transiton the DB state + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads + if sm.hasDatabase() : + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + # finally: + # pass self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step logger.debug("<-- Step {} finished".format(self._curStep)) self._curStep += 1 # we are about to get into next step. TODO: race condition here! - logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep + logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - if not failed: # only if not failed + if not transitionFailed: # only if not failed self._te = TaskExecutor(self._curStep) logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep - self.tapAllThreads() + self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task logger.debug("Main thread ready to finish up...") - if not failed: # only in regular situations + if not transitionFailed: # only in regular situations self.crossStepBarrier() # Cross it one last time, after all threads finish self._stepBarrier.reset() logger.debug("Main thread in exclusive zone...") @@ -255,11 +321,17 @@ class ThreadCoordinator: logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish - logger.info("All worker thread finished") + logger.info("\nAll worker threads finished") self._execStats.endExec() - def logStats(self): - self._execStats.logStats() + def printStats(self): + self._execStats.printStats() + + def isFailed(self): + return self._execStats.isFailed() + + def getExecStats(self): + return self._execStats def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -268,7 +340,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) + logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -306,7 +378,7 @@ class ThreadPool: self.maxSteps = maxSteps # Internal class variables self.curStep = 0 - self.threadList = [] + self.threadList = [] # type: List[WorkerThread] # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -397,26 +469,39 @@ class LinearQueue(): return ret class DbConn: + TYPE_NATIVE = "native-c" + TYPE_REST = "rest-api" + TYPE_INVALID = "invalid" + + @classmethod + def create(cls, connType): + if connType == cls.TYPE_NATIVE: + return DbConnNative() + elif connType == cls.TYPE_REST: + return DbConnRest() + else: + raise RuntimeError("Unexpected connection type: {}".format(connType)) + + @classmethod + def createNative(cls): + return cls.create(cls.TYPE_NATIVE) + + @classmethod + def createRest(cls): + return cls.create(cls.TYPE_REST) + def __init__(self): - self._conn = None - self._cursor = None self.isOpen = False - - def open(self): # Open connection + self._type = self.TYPE_INVALID + + def open(self): if ( self.isOpen ): raise RuntimeError("Cannot re-open an existing DB connection") - cfgPath = "../../build/test/cfg" - self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable - self._cursor = self._conn.cursor() + # below implemented by child classes + self.openByType() - # Get the connection/cursor ready - self._cursor.execute('reset query cache') - # self._cursor.execute('use db') # note we do this in _findCurrenState - - # Open connection - self._tdSql = TDSql() - self._tdSql.init(self._cursor) + logger.debug("[DB] data connection opened, type = {}".format(self._type)) self.isOpen = True def resetDb(self): # reset the whole database, etc. @@ -424,17 +509,128 @@ class DbConn: raise RuntimeError("Cannot reset database until connection is open") # self._tdSql.prepare() # Recreate database, etc. - self._cursor.execute('drop database if exists db') + self.execute('drop database if exists db') logger.debug("Resetting DB, dropped database") # self._cursor.execute('create database db') # self._cursor.execute('use db') - # tdSql.execute('show databases') + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + nRows = self.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if self.getResultRows() != 1 or self.getResultCols() != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return self.getQueryResult()[0][0] + + def execute(self, sql): + raise RuntimeError("Unexpected execution, should be overriden") + def openByType(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getQueryResult(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultRows(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultCols(self): + raise RuntimeError("Unexpected execution, should be overriden") + +# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql +class DbConnRest(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._url = "http://localhost:6020/rest/sql" # fixed for now + self._result = None + + def openByType(self): # Open connection + pass # do nothing, always open + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + # Do nothing for REST + logger.debug("[DB] REST Database connection closed") + self.isOpen = False + + def _doSql(self, sql): + r = requests.post(self._url, + data = sql, + auth = HTTPBasicAuth('root', 'taosdata')) + rj = r.json() + # Sanity check for the "Json Result" + if (not 'status' in rj): + raise RuntimeError("No status in REST response") + + if rj['status'] == 'error': # clearly reported error + if (not 'code' in rj): # error without code + raise RuntimeError("REST error return without code") + errno = rj['code'] # May need to massage this in the future + # print("Raising programming error with REST return: {}".format(rj)) + raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc' + + if rj['status'] != 'succ': # better be this + raise RuntimeError("Unexpected REST return status: {}".format(rj['status'])) + + nRows = rj['rows'] if ('rows' in rj) else 0 + self._result = rj + return nRows + + def execute(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot execute database commands until connection is open") + logger.debug("[SQL-REST] Executing SQL: {}".format(sql)) + nRows = self._doSql(sql) + logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows + + def query(self, sql) : # return rows affected + return self.execute(sql) + + def getQueryResult(self): + return self._result['data'] + + def getResultRows(self): + print(self._result) + raise RuntimeError("TBD") + # return self._tdSql.queryRows + + def getResultCols(self): + print(self._result) + raise RuntimeError("TBD") + +class DbConnNative(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._conn = None + self._cursor = None + + def openByType(self): # Open connection + cfgPath = "../../build/test/cfg" + self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + + # Get the connection/cursor ready + self._cursor.execute('reset query cache') + # self._cursor.execute('use db') # do this at the beginning of every step + + # Open connection + self._tdSql = TDSql() + self._tdSql.init(self._cursor) + def close(self): if ( not self.isOpen ): raise RuntimeError("Cannot clean up database until connection is open") self._tdSql.close() + logger.debug("[DB] Database connection closed") self.isOpen = False def execute(self, sql): @@ -450,29 +646,19 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") logger.debug("[SQL] Executing SQL: {}".format(sql)) nRows = self._tdSql.query(sql) - logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql)) return nRows # results are in: return self._tdSql.queryResult def getQueryResult(self): return self._tdSql.queryResult - def _queryAny(self, sql) : # actual query result as an int - if ( not self.isOpen ): - raise RuntimeError("Cannot query database until connection is open") - tSql = self._tdSql - nRows = tSql.query(sql) - if nRows != 1 : - raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) - if tSql.queryRows != 1 or tSql.queryCols != 1: - raise RuntimeError("Unexpected result set for query: {}".format(sql)) - return tSql.queryResult[0][0] + def getResultRows(self): + return self._tdSql.queryRows - def queryScalar(self, sql) -> int : - return self._queryAny(sql) + def getResultCols(self): + return self._tdSql.queryCols - def queryString(self, sql) -> str : - return self._queryAny(sql) class AnyState: STATE_INVALID = -1 @@ -620,10 +806,10 @@ class StateDbOnly(AnyState): # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, TaskDropSuperTable) ): - self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything + # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success + # # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + # if ( not self.hasTask(tasks, TaskDropSuperTable) ): + # self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -648,7 +834,9 @@ class StateSuperTableOnly(AnyState): def verifyTasksToState(self, tasks, newState): if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it + # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -680,18 +868,19 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy - elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, TaskDropDb) - self.assertNoTask(tasks, TaskDropSuperTable) - self.assertNoTask(tasks, TaskAddData) + # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted + # self.assertNoTask(tasks, TaskDropDb) + # self.assertNoTask(tasks, TaskDropSuperTable) + # self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -class StateMechine : +class StateMechine: def __init__(self, dbConn): self._dbConn = dbConn self._curState = self._findCurrentState() # starting state @@ -700,8 +889,17 @@ class StateMechine : def getCurrentState(self): return self._curState + def hasDatabase(self): + return self._curState.canDropDb() # ha, can drop DB means it has one + # May be slow, use cautionsly... def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + def typesToStrings(types): + ss = [] + for t in types: + ss.append(t.__name__) + return ss + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks firstTaskTypes = [] for tc in allTaskClasses: @@ -720,7 +918,7 @@ class StateMechine : if len(taskTypes) <= 0: raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes))) return taskTypes def _findCurrentState(self): @@ -730,7 +928,7 @@ class StateMechine : # logger.debug("Found EMPTY state") logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() - dbc.execute("use db") # did not do this when openning connection + dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own if dbc.query("show tables") == 0 : # no tables # logger.debug("Found DB ONLY state") logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) @@ -746,6 +944,7 @@ class StateMechine : def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + logger.debug("[STT] Starting State: {}".format(self._curState)) return # do nothing self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps @@ -807,14 +1006,14 @@ class DbManager(): self._lock = threading.RLock() # self.openDbServerConnection() - self._dbConn = DbConn() + self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest() try: self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) if ( err.msg == 'client disconnected' ): # cannot open DB connection print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") - sys.exit() + sys.exit(2) else: raise except: @@ -829,7 +1028,7 @@ class DbManager(): def getDbConn(self): return self._dbConn - def getStateMachine(self): + def getStateMachine(self) -> StateMechine : return self._stateMachine # def getState(self): @@ -869,8 +1068,11 @@ class DbManager(): def getNextTick(self): with self._lock: # prevent duplicate tick - self._lastTick += datetime.timedelta(0, 1) # add one second to it - return self._lastTick + if Dice.throw(10) == 0 : # 1 in 10 chance + return self._lastTick + datetime.timedelta(0, -100) + else: # regular + self._lastTick += datetime.timedelta(0, 1) # add one second to it + return self._lastTick def getNextInt(self): with self._lock: @@ -894,15 +1096,60 @@ class DbManager(): self._dbConn.close() class TaskExecutor(): + class BoundedList: + def __init__(self, size = 10): + self._size = size + self._list = [] + + def add(self, n: int) : + if not self._list: # empty + self._list.append(n) + return + # now we should insert + nItems = len(self._list) + insPos = 0 + for i in range(nItems): + insPos = i + if n <= self._list[i] : # smaller than this item, time to insert + break # found the insertion point + insPos += 1 # insert to the right + + if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item + return # do nothing + + # print("Inserting at postion {}, value: {}".format(insPos, n)) + self._list.insert(insPos, n) # insert + + newLen = len(self._list) + if newLen <= self._size : + return # do nothing + elif newLen == (self._size + 1) : + del self._list[0] # remove the first item + else : + raise RuntimeError("Corrupt Bounded List") + + def __str__(self): + return repr(self._list) + + _boundedList = BoundedList() + def __init__(self, curStep): self._curStep = curStep + @classmethod + def getBoundedList(cls): + return cls._boundedList + def getCurStep(self): return self._curStep def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread task.execute(wt) + def recordDataMark(self, n: int): + # print("[{}]".format(n), end="", flush=True) + self._boundedList.add(n) + # def logInfo(self, msg): # logger.info(" T[{}.x]: ".format(self._curStep) + msg) @@ -922,6 +1169,7 @@ class Task(): self._dbManager = dbManager self._workerThread = None self._err = None + self._aborted = False self._curStep = None self._numRows = None # Number of rows affected @@ -930,10 +1178,14 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats + self._lastSql = "" # last SQL executed/attempted def isSuccess(self): return self._err == None + def isAborted(self): + return self._aborted + def clone(self): # TODO: why do we need this again? newTask = self.__class__(self._dbManager, self._execStats) return newTask @@ -960,10 +1212,31 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err - except: - self.logDebug("[=] Unexpected exception") + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600, + 1000 # REST catch-all error + ]) : # allowed errors + self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + print("_", end="", flush=True) + self._err = err + else: + errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql) + self.logDebug(errMsg) + if gConfig.debug : + raise # so that we see full stack + else: # non-debug + print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + + "----------------------------\n") + # sys.exit(-1) + self._err = err + self._aborted = True + except Exception as e : + self.logInfo("Non-TAOS exception encountered") + self._err = e + self._aborted = True + traceback.print_exc() + except : + self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) @@ -971,8 +1244,21 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): + self._lastSql = sql return self._dbManager.execute(sql) + def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.execSql(sql) + + def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.querySql(sql) + + def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread + return wt.getQueryResult() + + class ExecutionStats: def __init__(self): @@ -987,6 +1273,12 @@ class ExecutionStats: self._failed = False self._failureReason = None + def __str__(self): + return "[ExecStats: _failed={}, _failureReason={}".format(self._failed, self._failureReason) + + def isFailed(self): + return self._failed == True + def startExec(self): self._execStartTime = time.time() @@ -1018,7 +1310,7 @@ class ExecutionStats: self._failed = True self._failureReason = reason - def logStats(self): + def printStats(self): logger.info("----------------------------------------------------------------------") logger.info("| Crash_Gen test {}, with the following stats:". format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) @@ -1033,11 +1325,17 @@ class ExecutionStats: logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) logger.info("----------------------------------------------------------------------") class StateTransitionTask(Task): + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") @@ -1060,6 +1358,10 @@ class StateTransitionTask(Task): # return state.getValue() in cls.getBeginStates() raise RuntimeError("must be overriden") + @classmethod + def getRegTableName(cls, i): + return "db.reg_table_{}".format(i) + def execute(self, wt: WorkerThread): super().execute(wt) @@ -1073,7 +1375,7 @@ class TaskCreateDb(StateTransitionTask): return state.canCreateDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("create database db") + self.execWtSql(wt, "create database db") class TaskDropDb(StateTransitionTask): @classmethod @@ -1085,7 +1387,7 @@ class TaskDropDb(StateTransitionTask): return state.canDropDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("drop database db") + self.execWtSql(wt, "drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) class TaskCreateSuperTable(StateTransitionTask): @@ -1098,8 +1400,13 @@ class TaskCreateSuperTable(StateTransitionTask): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) + if not wt.dbInUse(): # no DB yet, to the best of our knowledge + logger.debug("Skipping task, no DB yet") + return + + tblName = self._dbManager.getFixedSuperTableName() + # wt.execSql("use db") # should always be in place + self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically @@ -1114,16 +1421,16 @@ class TaskReadData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): sTbName = self._dbManager.getFixedSuperTableName() - dbc = wt.getDbConn() - dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations - dbc.close() - dbc.open() + wt.getDbConn().close() + wt.getDbConn().open() else: - rTables = dbc.getQueryResult() + rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult() # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) # tdSql.query(" cars where tbname in ('carzero', 'carone')") @@ -1137,8 +1444,33 @@ class TaskDropSuperTable(StateTransitionTask): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("drop table db.{}".format(tblName)) + # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence + if Dice.throw(2) == 0 : + tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) + random.shuffle(tblSeq) + tickOutput = False # if we have spitted out a "d" character for "drop regular table" + isSuccess = True + for i in tblSeq: + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) + try: + self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme + if ( errno2 in [0x362]) : # mnode invalid table name + isSuccess = False + logger.debug("[DB] Acceptable error when dropping a table") + continue # try to delete next regular table + + if (not tickOutput): + tickOutput = True # Print only one time + if isSuccess : + print("d", end="", flush=True) + else: + print("f", end="", flush=True) + + # Drop the super table itself + tblName = self._dbManager.getFixedSuperTableName() + self.execWtSql(wt, "drop table db.{}".format(tblName)) class TaskAlterTags(StateTransitionTask): @classmethod @@ -1153,20 +1485,18 @@ class TaskAlterTags(StateTransitionTask): tblName = self._dbManager.getFixedSuperTableName() dice = Dice.throw(4) if dice == 0 : - wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1 : - wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2 : - wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + sql = "alter table db.{} change tag extraTag newTag".format(tblName) + + self.execWtSql(wt, sql) class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on - LARGE_NUMBER_OF_TABLES = 35 - SMALL_NUMBER_OF_TABLES = 3 - LARGE_NUMBER_OF_RECORDS = 50 - SMALL_NUMBER_OF_RECORDS = 3 # We use these two files to record operations to DB, useful for power-off tests fAddLogReady = None @@ -1192,7 +1522,7 @@ class TaskAddData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbManager - wt.execSql("use db") # TODO: seems to be an INSERT bug to require this + # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: @@ -1202,10 +1532,10 @@ class TaskAddData(StateTransitionTask): print("x", end="", flush=True) else: self.activeTable.add(i) # marking it active - # No need to shuffle data sequence, unless later we decide to do non-increment insertion + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table - nextInt = ds.getNextInt() - regTableName = "db.reg_table_{}".format(i) + nextInt = ds.getNextInt() if gConfig.record_ops: self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) @@ -1216,7 +1546,9 @@ class TaskAddData(StateTransitionTask): ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) - wt.execSql(sql) + self.execWtSql(wt, sql) + # Successfully wrote the data into the DB, let's record it somehow + te.recordDataMark(nextInt) if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() @@ -1285,17 +1617,213 @@ class LoggingFilter(logging.Filter): if ( record.levelno >= logging.INFO ) : return True # info or above always log - msg = record.msg - # print("type = {}, value={}".format(type(msg), msg)) - # sys.exit() - # Commenting out below to adjust... # if msg.startswith("[TRD]"): # return False return True +class MyLoggingAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs + # return '[%s] %s' % (self.extra['connid'], msg), kwargs + +class SvcManager: + + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + self.ioThread = None + self.subProcess = None + self.shouldStop = False + self.status = MainExec.STATUS_RUNNING + + def svcOutputReader(self, out: IO, queue): + # print("This is the svcOutput Reader...") + for line in out : # iter(out.readline, b''): + # print("Finished reading a line: {}".format(line)) + queue.put(line.rstrip()) # get rid of new lines + print("No more output from incoming IO") # meaning sub process must have died + out.close() + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.subProcess.send_signal(signal.SIGINT) + self.shouldStop = True + self.joinIoThread() + + def joinIoThread(self): + if self.ioThread : + self.ioThread.join() + self.ioThread = None + + def run(self): + ON_POSIX = 'posix' in sys.builtin_module_names + svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] + # svcCmd = ['vmstat', '1'] + self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True) + q = Queue() + self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q)) + self.ioThread.daemon = True # thread dies with the program + self.ioThread.start() + + # proc = subprocess.Popen(['echo', '"to stdout"'], + # stdout=subprocess.PIPE, + # ) + # stdout_value = proc.communicate()[0] + # print('\tstdout: {}'.format(repr(stdout_value))) + + while True : + try: + line = q.get_nowait() # getting output at fast speed + except Empty: + # print('no output yet') + time.sleep(2.3) # wait only if there's no output + else: # got line + print(line) + # print("----end of iteration----") + if self.shouldStop: + print("Ending main Svc thread") + break + + print("end of loop") + + self.joinIoThread() + print("Finished") + +class ClientManager: + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + + self.status = MainExec.STATUS_RUNNING + self.tc = None + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.tc.requestToStop() + + def _printLastNumbers(self): # to verify data durability + dbManager = DbManager(resetDb=False) + dbc = dbManager.getDbConn() + if dbc.query("show databases") == 0 : # no databae + return + if dbc.query("show tables") == 0 : # no tables + return + + dbc.execute("use db") + sTbName = dbManager.getFixedSuperTableName() + + # get all regular tables + dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + rTables = dbc.getQueryResult() + + bList = TaskExecutor.BoundedList() + for rTbName in rTables : # regular tables + dbc.query("select speed from db.{}".format(rTbName[0])) + numbers = dbc.getQueryResult() + for row in numbers : + # print("<{}>".format(n), end="", flush=True) + bList.add(row[0]) + + print("Top numbers in DB right now: {}".format(bList)) + print("TDengine client execution is about to start in 2 seconds...") + time.sleep(2.0) + dbManager = None # release? + + def prepare(self): + self._printLastNumbers() + + def run(self): + self._printLastNumbers() + + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + self.tc = ThreadCoordinator(thPool, dbManager) + self.tc.run() + # print("exec stats: {}".format(self.tc.getExecStats())) + # print("TC failed = {}".format(self.tc.isFailed())) + self.conclude() + # print("TC failed (2) = {}".format(self.tc.isFailed())) + return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/ + + def conclude(self): + self.tc.printStats() + self.tc.getDbManager().cleanUp() + + +class MainExec: + STATUS_RUNNING = 1 + STATUS_STOPPING = 2 + # STATUS_STOPPED = 3 # Not used yet + + @classmethod + def runClient(cls): + clientManager = ClientManager() + return clientManager.run() + + @classmethod + def runService(cls): + svcManager = SvcManager() + svcManager.run() + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1308,93 +1836,55 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) + + parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, + help='Connector type to use: native, rest, or mixed (default: 10)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() + + # Logging Stuff global logger - logger = logging.getLogger('CrashGen') - logger.addFilter(LoggingFilter()) + _logger = logging.getLogger('CrashGen') # real logger + _logger.addFilter(LoggingFilter()) + ch = logging.StreamHandler() + _logger.addHandler(ch) + + logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger + if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO else: logger.setLevel(logging.INFO) - ch = logging.StreamHandler() - logger.addHandler(ch) - - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(gConfig.num_threads, gConfig.max_steps), - # WorkDispatcher(dbState), # Obsolete? - dbManager - ) - - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - + # Run server or client + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + return MainExec.runClient() - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbManager.cleanUp() # logger.info("Crash_Gen execution finished") if __name__ == "__main__": - main() + exitCode = main() + # print("Exiting with code: {}".format(exitCode)) + sys.exit(exitCode) diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index c845b3976473bedc0fef5684c29b06f866df0840..de80361aa3ca655e81f5e36d511a6104e1c96aa6 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -38,4 +38,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib # Now we are all let, and let's see if we can find a crash. Note we pass all params -./crash_gen.py $@ +python3 ./crash_gen.py $@