crash_gen.py 39.4 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17 18 19 20
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
21
import getopt
22
import argparse
23
import copy
S
Steven Li 已提交
24 25 26

import threading
import random
27
import time
S
Steven Li 已提交
28
import logging
29
import datetime
30
import textwrap
S
Steven Li 已提交
31

32
from typing import List
33
from typing import Dict
34

S
Steven Li 已提交
35 36 37 38 39
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

40
import crash_gen
S
Steven Li 已提交
41 42
import taos

43 44 45
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
46

47 48
def runThread(wt: WorkerThread):    
    wt.run()
49

50 51 52 53 54 55 56 57
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
58
class WorkerThread:
59
    def __init__(self, pool: ThreadPool, tid, 
60 61 62 63
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
64
        self._pool = pool
65
        self._tid = tid        
66
        self._tc = tc
S
Steven Li 已提交
67
        # self.threadIdent = threading.get_ident()
68 69
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
70

71
        # Let us have a DB connection of our own
72 73 74
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

75 76 77 78 79 80 81
    def logDebug(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

    def logInfo(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

   
82 83
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
84

S
Steven Li 已提交
85
    def start(self):
86
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
87

88
    def run(self): 
S
Steven Li 已提交
89
        # initialization after thread starts, in the thread context
90
        # self.isSleeping = False
91 92
        logger.info("Starting to run thread: {}".format(self._tid))

93
        if ( gConfig.per_thread_db_connection ): # type: ignore
94
            self._dbConn.open()
S
Steven Li 已提交
95

96 97
        self._doTaskLoop()       
        
98
        # clean up
99
        if ( gConfig.per_thread_db_connection ): # type: ignore 
100
            self._dbConn.close()
101

102 103 104
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
105 106 107
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
108 109 110 111 112 113
            logger.debug("Thread task loop exited barrier...")
            self.crossStepGate()   # then per-thread gate, after being tapped
            logger.debug("Thread task loop exited step gate...")
            if not self._tc.isRunning():
                break

114
            task = tc.fetchTask()
115
            task.execute(self)
116
            tc.saveExecutedTask(task)
117
  
S
Steven Li 已提交
118
    def verifyThreadSelf(self): # ensure we are called by this own thread
119
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
120 121 122 123 124 125 126
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
127
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
128 129
            raise RuntimeError("Unexpected dead thread")

130
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
131 132 133 134
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
135
        # Wait again at the "gate", waiting to be "tapped"
136
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
137 138
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
139
        
140
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
141 142 143 144

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
145
 
146
        logger.debug("Tapping worker thread {}".format(self._tid))
147 148
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
149

150 151 152 153
    def execSql(self, sql): # not "execute", since we are out side the DB context
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
154
            return self._tc.getDbState().getDbConn().execute(sql)
155 156

    def querySql(self, sql): # not "execute", since we are out side the DB context
157
        if ( gConfig.per_thread_db_connection ):
158
            return self._dbConn.query(sql)            
159
        else:
160
            return self._tc.getDbState().getDbConn().query(sql)
161

162
class ThreadCoordinator:
163
    def __init__(self, pool, dbState):
164 165
        self._curStep = -1 # first step is 0
        self._pool = pool
166
        # self._wd = wd
167
        self._te = None # prepare for every new step
168 169 170
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
171

172
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
173
        self._execStats = ExecutionStats()
S
Steven Li 已提交
174

175 176 177
    def getTaskExecutor(self):
        return self._te

178 179 180
    def getDbState(self) -> DbState :
        return self._dbState

181 182 183
    def crossStepBarrier(self):
        self._stepBarrier.wait()

184 185
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
186 187

        # Coordinate all threads step by step
188 189
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
190 191
        startTime = time.time()
        while(self._curStep < maxSteps-1):  # maxStep==10, last curStep should be 9
192
            print(".", end="", flush=True)
S
Steven Li 已提交
193
            logger.debug("Main thread going to sleep")
194

195
            # Now ready to enter a step
196 197 198 199
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
200
            self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
201
            self.resetExecutedTasks() # clear the tasks after we are done
202 203

            # Get ready for next step
204 205 206
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
207

208
            # A new TE for the new step
209
            self._te = TaskExecutor(self._curStep)
210

211
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
212 213
            self.tapAllThreads()

214 215 216 217 218 219 220 221 222
        logger.debug("Main thread ready to finish up...")
        self.crossStepBarrier() # Cross it one last time, after all threads finish
        self._stepBarrier.reset()
        logger.debug("Main thread in exclusive zone...")
        self._te = None # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads() # Let the threads run one last time
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
223 224

        logger.info("All threads finished")
225 226
        self._execStats.logStats()
        logger.info("Total Execution Time (task busy time, plus Python overhead): {:.2f} seconds".format(time.time() - startTime))
227
        print("\r\nFinished")
S
Steven Li 已提交
228 229 230

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
231
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
232 233 234 235 236
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
237
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
238
        for i in wakeSeq:
239
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
240 241
            time.sleep(0) # yield

242 243 244 245 246 247
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
248 249
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
250 251 252 253 254 255 256 257 258
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
        taskType = self.getDbState().pickTaskType() # pick a task type for current state
        return taskType(self.getDbState(), self._execStats) # create a task from it
259 260 261

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
262 263 264 265

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
266 267

# We define a class to run a number of threads in locking steps.
268
class ThreadPool:
269 270 271 272 273
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
274
        # self.dispatcher = WorkDispatcher(dbState) # Obsolete?
275 276 277
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
278
        # self.numWaitingThreads = 0    
279 280
        
    # starting to run all the threads, in locking steps
281
    def createAndStartThreads(self, tc: ThreadCoordinator):
282
        for tid in range(0, self.numThreads): # Create the threads
283
            workerThread = WorkerThread(self, tid, tc)            
284 285 286 287 288 289 290 291
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
292 293 294
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
295
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
296
        self.lastIndex = 0
297
        self._lock = threading.RLock() # our functions may call each other
298
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
299

300 301 302 303 304 305 306 307 308
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
309 310
            # Otherwise we have something
            self.lastIndex += 1
311 312
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
313
            return self.lastIndex
S
Steven Li 已提交
314 315

    def pop(self):
316
        with self._lock:
317
            if ( self.isEmpty() ): 
318 319 320
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
321
            index = self.firstIndex
322
            if ( index in self.inUse ):
323 324
                return False

325 326 327 328 329 330 331
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
332
        with self._lock:
333 334 335 336
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
337
    def allocate(self, i):
338
        with self._lock:
339
            # logger.debug("LQ allocating item {}".format(i))
340 341 342 343
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
344
    def release(self, i):
345
        with self._lock:
346 347
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
348 349 350 351

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
352
    def pickAndAllocate(self):
353 354 355
        if ( self.isEmpty() ):
            return None
        with self._lock:
356 357 358 359
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
360 361
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
362 363
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
364 365 366 367 368
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
369 370
        self._conn = None 
        self._cursor = None
371 372 373 374 375 376 377
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
378 379
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
380

381 382 383 384 385
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
386
        self._tdSql = TDSql()
387
        self._tdSql.init(self._cursor)
388 389 390 391 392
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
393 394 395
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
396 397
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
398 399
        # self._cursor.execute('use db')

400 401 402 403 404 405 406
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
407

408
    def execute(self, sql): 
409
        if ( not self.isOpen ):
410
            raise RuntimeError("Cannot execute database commands until connection is open")
411
        return self._tdSql.execute(sql)
S
Steven Li 已提交
412

413 414 415 416 417 418
    def query(self, sql) -> int :  # return number of rows retrieved
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        return self._tdSql.query(sql)


S
Steven Li 已提交
419 420
# State of the database as we believe it to be
class DbState():
421
    STATE_INVALID    = -1
422 423 424 425
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
426

S
Steven Li 已提交
427 428
    def __init__(self):
        self.tableNumQueue = LinearQueue()
429 430 431
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
432

433
        self._state = self.STATE_INVALID
434
        self._stateWeights = [1,3,5,10]
435
        
436 437
        # self.openDbServerConnection()
        self._dbConn = DbConn()
438 439 440 441 442 443 444 445 446 447 448 449
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if ( err.msg == 'disconnected' ): # cannot open DB connection
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
            print("[=]Unexpected exception")
            raise        
450
        self._dbConn.resetDb() # drop and recreate DB
451
        self._state = self.STATE_EMPTY # initial state, the result of above
452

453 454 455
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
456 457 458
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

459 460 461 462 463
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

464 465 466
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
467 468 469
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

470
    def getNextTick(self):
471 472 473
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
474 475

    def getNextInt(self):
476 477 478 479
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
480
    def getTableNameToDelete(self):
481
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
482 483 484
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
485 486
        return "table_{}".format(tblNum)

487
    def execSql(self, sql): # using the main DB connection
488
        return self._dbConn.execute(sql)
489 490 491 492

    def cleanUp(self):
        self._dbConn.close()      

493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    def getTaskTypesAtState(self):
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
        taskTypes = []
        for tc in allTaskClasses:
            # t = tc(self) # create task object
            if tc.canBeginFrom(self._state):
                taskTypes.append(tc)
        if len(taskTypes) <= 0:
            raise RuntimeError("No suitable task types found for state: {}".format(self._state))
        return taskTypes

        # tasks.append(ReadFixedDataTask(self)) # always for everybody
        # if ( self._state == self.STATE_EMPTY ):
        #     tasks.append(CreateDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        # elif ( self._state == self.STATE_DB_ONLY ):
        #     tasks.append(DropDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_TABLE_ONLY ):
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # else:
        #     raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        # return tasks

    def pickTaskType(self):
        taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
                weights.append(self._stateWeights[endState]) # TODO: change to a method
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
        logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i

    
543 544 545 546

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
547 548 549

        self.execSql("show dnodes") # this should show up in the server log, separating steps

550
        if ( self._state == self.STATE_EMPTY ):
551
            # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
552
            if ( self.hasSuccess(tasks, CreateDbTask) ):
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
                self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
                self._state = self.STATE_DB_ONLY
                if ( self.hasSuccess(tasks, CreateFixedTableTask )):
                    self._state = self.STATE_TABLE_ONLY
                # else: # no successful table creation, not much we can say, as it is step 2
            else: # did not create db
                self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
                # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
                self.assertNoSuccess(tasks, CreateFixedTableTask)
                
        elif ( self._state == self.STATE_DB_ONLY ): 
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            self.assertIfExistThenSuccess(tasks, DropDbTask)
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
            # Nothing to be said about adding data task
            if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
                # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
                self.assertAtMostOneSuccess(tasks, DropDbTask)
                self._state = self.STATE_EMPTY
            elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
                # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
                self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
                self.assertNoTask(tasks, DropDbTask) # should have have tried
                if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
                    # can't say there's add-data attempts, since they may all fail
                    self._state = self.STATE_TABLE_ONLY
                else:                    
                    self._state = self.STATE_HAS_DATA
581 582 583 584 585 586
            # What about AddFixedData?
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
                self._state = self.STATE_HAS_DATA
            else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
                # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
                self._state = self.STATE_DB_ONLY  # no change
587 588

        elif ( self._state == self.STATE_TABLE_ONLY ):            
589
            if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
590
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
591
                self._state = self.STATE_DB_ONLY
592
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
593 594
                self.assertNoTask(tasks, DropFixedTableTask)
                self._state = self.STATE_HAS_DATA
595 596 597 598 599
            elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
                self.assertNoTask(tasks, DropFixedTableTask)
                self.assertNoTask(tasks, AddFixedDataTask)
                self._state = self.STATE_TABLE_ONLY # no change
            else: # did not drop table, did not insert data, did not read successfully, that is impossible
600 601
                raise RuntimeError("Unexpected no-success scenarios")

602
        elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
603 604 605
            if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
                self._state = self.STATE_DB_ONLY
606 607 608 609 610 611 612 613 614 615 616 617 618
            else: # no success dropping the table, table remains intact in this step
                self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task

                if ( self.hasSuccess(tasks, AddFixedDataTask) ): # added data
                    self._state = self.STATE_HAS_DATA
                else:
                    self.assertNoTask(tasks, AddFixedDataTask)

                    if ( self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data
                        # which is ok, then no state change
                        self._state = self.STATE_HAS_DATA # no change
                    else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task
                        raise RuntimeError("Unexpected no-success scenarios")
619

620 621 622 623 624 625 626 627 628 629
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        logger.debug("New DB state is: {}".format(self._state))

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
630
                task.logDebug("Task success found")
631 632 633 634 635 636
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
637
        exists = False
638 639 640
        for task in tasks :
            if not isinstance(task, cls):
                continue
641
            exists = True # we have a valid instance
642 643
            if task.isSuccess():
                sCnt += 1
644
        if ( exists and sCnt <= 0 ):
645 646
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

647 648 649
    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
650
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
651 652 653 654 655 656 657

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

658 659 660 661 662 663 664 665
    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

666 667


668 669 670 671
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

672 673 674
    def getCurStep(self):
        return self._curStep

675 676
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
677

678 679
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
680

681 682
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
683

S
Steven Li 已提交
684
class Task():
685 686 687 688 689 690 691
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
        cls.taskSn += 1
        return cls.taskSn

692
    def __init__(self, dbState: DbState, execStats: ExecutionStats):
693
        self._dbState = dbState
694
        self._workerThread = None 
695
        self._err = None
696
        self._curStep = None
697
        self._numRows = None # Number of rows affected
698 699 700

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
701

702 703
        self._execStats = execStats

704 705
    def isSuccess(self):
        return self._err == None
706

707 708
    def clone(self): # TODO: why do we need this again?
        newTask = self.__class__(self._dbState, self._execStats)
709 710 711 712 713 714 715 716
        return newTask

    def logDebug(self, msg):
        self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

    def logInfo(self, msg):
        self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

717
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
718
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
719

720 721
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
722
        self._workerThread = wt # type: ignore
723 724

        te = wt.getTaskExecutor()
725 726
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
727 728

        self._err = None
729
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
730 731 732
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
733
            self.logDebug("[=]Taos Execution exception: {0}".format(err))
734 735
            self._err = err
        except:
736
            self.logDebug("[=]Unexpected exception")
737
            raise
738
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
739
        
740 741
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
742

743
    def execSql(self, sql):
744
        return self._dbState.execute(sql)
745

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
                  
class ExecutionStats:    
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
        self._accRunTime = 0.0 # accumulated run time

    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

    def logStats(self):
        logger.info("Logging task execution stats (success/total times)...")
        execTimesAny = 0
        for k, n in self._execTimes.items():            
            execTimesAny += n[1]
            logger.info("    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
                
        logger.info("Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("Total Task Busy Time (elapsed time when any task is in progress): {:.2f} seconds".format(self._accRunTime))


class StateTransitionTask(Task):
    # @classmethod
    # def getAllTaskClasses(cls): # static
    #     return cls.__subclasses__()
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

    @classmethod
    def getBeginStates(cls):
        return cls.getInfo()[0]

    @classmethod
    def getEndState(cls):
        return cls.getInfo()[1]

    @classmethod
    def canBeginFrom(cls, state):
        return state in cls.getBeginStates()

    def execute(self, wt: WorkerThread):
        super().execute(wt)
        


class CreateDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_EMPTY], # can begin from
            DbState.STATE_DB_ONLY # end state
        ]

821 822 823
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

824 825 826 827 828 829 830 831
class DropDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_DB_ONLY, DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
            DbState.STATE_EMPTY
        ]

832 833 834
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

835 836 837 838 839 840 841
class CreateFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_DB_ONLY],
            DbState.STATE_TABLE_ONLY
        ]
842 843 844 845

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
846

847 848 849 850 851 852 853 854
class ReadFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
            None # meaning doesn't affect state
        ]

855 856 857 858 859
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
class DropFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
            DbState.STATE_DB_ONLY # meaning doesn't affect state
        ]

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))

class AddFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
            [DbState.STATE_TABLE_ONLY, DbState.STATE_HAS_DATA],
            DbState.STATE_HAS_DATA
        ]
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 


#---------- Non State-Transition Related Tasks ----------#

class CreateTableTask(Task):    
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self._dbState.addTable()
        self.logDebug("Creating a table {} ...".format(tIndex))
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
        self.logDebug("Table {} created.".format(tIndex))
        self._dbState.releaseTable(tIndex)

S
Steven Li 已提交
896
class DropTableTask(Task):
897
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
898
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
899
        if ( not tableName ): # May be "False"
900
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
901
            return
902
        self.logInfo("Dropping a table db.{} ...".format(tableName))
903
        wt.execSql("drop table db.{}".format(tableName))
904
        
905

S
Steven Li 已提交
906 907

class AddDataTask(Task):
908
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
909
        ds = self._dbState
910
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
911 912
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
913
            self.logInfo("No table found to add data, skipping...")
914
            return
915
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
916
        self.logDebug("Executing SQL: {}".format(sql))
917 918
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
919
        self.logDebug("Finished adding data")
S
Steven Li 已提交
920

921

S
Steven Li 已提交
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
944 945
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
946 947

    @classmethod
948
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
949 950
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
951
        return random.randrange(start, stop)
S
Steven Li 已提交
952 953 954


# Anyone needing to carry out work should simply come here
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
977

978
def main():
979
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
980 981 982 983 984 985 986 987 988
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
989 990 991 992
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
993 994 995 996
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
997

998
    global gConfig
999
    gConfig = parser.parse_args()
1000 1001 1002
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()
1003

1004
    global logger
1005
    logger = logging.getLogger('CrashGen')
1006 1007
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1008 1009 1010 1011
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
1012 1013
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
1014
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
1015
        # WorkDispatcher(dbState), # Obsolete?
1016
        dbState
1017
        )
1018
    tc.run()
1019
    dbState.cleanUp()
1020 1021 1022 1023
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()