crash_gen.py 45.9 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17
import traceback
18 19 20 21
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
22
import getopt
23
import argparse
24
import copy
S
Steven Li 已提交
25 26 27

import threading
import random
28
import time
S
Steven Li 已提交
29
import logging
30
import datetime
31
import textwrap
S
Steven Li 已提交
32

33
from typing import List
34
from typing import Dict
35

S
Steven Li 已提交
36 37 38 39 40
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

41
import crash_gen
S
Steven Li 已提交
42 43
import taos

44 45 46
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
47

48 49
def runThread(wt: WorkerThread):    
    wt.run()
50

51 52 53 54 55 56 57 58
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
59
class WorkerThread:
60
    def __init__(self, pool: ThreadPool, tid, 
61 62 63 64
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
65
        self._pool = pool
66
        self._tid = tid        
67
        self._tc = tc
S
Steven Li 已提交
68
        # self.threadIdent = threading.get_ident()
69 70
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
71

72
        # Let us have a DB connection of our own
73 74 75
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

76 77 78 79 80 81 82
    def logDebug(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

    def logInfo(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

   
83 84
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
85

S
Steven Li 已提交
86
    def start(self):
87
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
88

89
    def run(self): 
S
Steven Li 已提交
90
        # initialization after thread starts, in the thread context
91
        # self.isSleeping = False
92 93
        logger.info("Starting to run thread: {}".format(self._tid))

94
        if ( gConfig.per_thread_db_connection ): # type: ignore
95
            self._dbConn.open()
S
Steven Li 已提交
96

97 98
        self._doTaskLoop()       
        
99
        # clean up
100
        if ( gConfig.per_thread_db_connection ): # type: ignore 
101
            self._dbConn.close()
102

103 104 105
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
106 107 108
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
109
            # logger.debug("Thread task loop exited barrier...")
110
            self.crossStepGate()   # then per-thread gate, after being tapped
111
            # logger.debug("Thread task loop exited step gate...")
112
            if not self._tc.isRunning():
113
                logger.debug("Thread Coordinator not running any more, worker thread now stopping...")
114 115
                break

116
            task = tc.fetchTask()
117
            task.execute(self)
118
            tc.saveExecutedTask(task)
119
  
S
Steven Li 已提交
120
    def verifyThreadSelf(self): # ensure we are called by this own thread
121
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
122 123 124 125 126 127 128
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
129
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
130 131
            raise RuntimeError("Unexpected dead thread")

132
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
133 134 135 136
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
137
        # Wait again at the "gate", waiting to be "tapped"
138
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
139 140
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
141
        
142
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
143 144 145 146

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
147
 
148
        # logger.debug("Tapping worker thread {}".format(self._tid))
149 150
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
151

152 153 154 155
    def execSql(self, sql): # not "execute", since we are out side the DB context
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
156
            return self._tc.getDbState().getDbConn().execute(sql)
157

158 159 160 161 162
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
    #         return self._dbConn.query(sql)            
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
163

164
class ThreadCoordinator:
165
    def __init__(self, pool, dbState):
166 167
        self._curStep = -1 # first step is 0
        self._pool = pool
168
        # self._wd = wd
169
        self._te = None # prepare for every new step
170 171 172
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
173

174
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
175
        self._execStats = ExecutionStats()
S
Steven Li 已提交
176

177 178 179
    def getTaskExecutor(self):
        return self._te

180 181 182
    def getDbState(self) -> DbState :
        return self._dbState

183 184 185
    def crossStepBarrier(self):
        self._stepBarrier.wait()

186 187
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
188 189

        # Coordinate all threads step by step
190 191
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
192 193 194
        self._execStats.startExec() # start the stop watch
        failed = False
        while(self._curStep < maxSteps-1 and not failed):  # maxStep==10, last curStep should be 9
195
            print(".", end="", flush=True)
S
Steven Li 已提交
196
            logger.debug("Main thread going to sleep")
197

198
            # Now ready to enter a step
199 200 201 202
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
            try:
                self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
            except taos.error.ProgrammingError as err:
                if ( err.msg == 'network unavailable' ): # broken DB connection
                    logger.info("DB connection broken, execution failed")
                    traceback.print_stack()
                    failed = True
                    self._te = None # Not running any more
                    self._execStats.registerFailure("Broken DB Connection")
                    # continue # don't do that, need to tap all threads at end, and maybe signal them to stop
                else:
                    raise 
            finally:
                pass
            
218
            self.resetExecutedTasks() # clear the tasks after we are done
219 220

            # Get ready for next step
221 222 223
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
224

225
            # A new TE for the new step
226 227
            if not failed: # only if not failed
                self._te = TaskExecutor(self._curStep)
228

229
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
230 231
            self.tapAllThreads()

232
        logger.debug("Main thread ready to finish up...")
233 234 235 236 237 238 239 240
        if not failed: # only in regular situations
            self.crossStepBarrier() # Cross it one last time, after all threads finish
            self._stepBarrier.reset()
            logger.debug("Main thread in exclusive zone...")
            self._te = None # No more executor, time to end
            logger.debug("Main thread tapping all threads one last time...")
            self.tapAllThreads() # Let the threads run one last time

241 242
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
243
        logger.info("All threads finished")
244 245 246
        self._execStats.endExec()

    def logStats(self):
247
        self._execStats.logStats()
S
Steven Li 已提交
248 249 250

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
251
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
252 253 254 255 256
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
257
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
258
        for i in wakeSeq:
259
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
260 261
            time.sleep(0) # yield

262 263 264 265 266 267
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
268 269
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
270 271 272 273 274 275 276 277 278
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
        taskType = self.getDbState().pickTaskType() # pick a task type for current state
        return taskType(self.getDbState(), self._execStats) # create a task from it
279 280 281

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
282 283 284 285

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
286 287

# We define a class to run a number of threads in locking steps.
288
class ThreadPool:
289 290 291 292 293
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
294
        # self.dispatcher = WorkDispatcher(dbState) # Obsolete?
295 296 297
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
298
        # self.numWaitingThreads = 0    
299 300
        
    # starting to run all the threads, in locking steps
301
    def createAndStartThreads(self, tc: ThreadCoordinator):
302
        for tid in range(0, self.numThreads): # Create the threads
303
            workerThread = WorkerThread(self, tid, tc)            
304 305 306 307 308 309 310 311
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
312 313 314
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
315
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
316
        self.lastIndex = 0
317
        self._lock = threading.RLock() # our functions may call each other
318
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
319

320 321 322 323 324 325 326 327 328
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
329 330
            # Otherwise we have something
            self.lastIndex += 1
331 332
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
333
            return self.lastIndex
S
Steven Li 已提交
334 335

    def pop(self):
336
        with self._lock:
337
            if ( self.isEmpty() ): 
338 339 340
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
341
            index = self.firstIndex
342
            if ( index in self.inUse ):
343 344
                return False

345 346 347 348 349 350 351
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
352
        with self._lock:
353 354 355 356
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
357
    def allocate(self, i):
358
        with self._lock:
359
            # logger.debug("LQ allocating item {}".format(i))
360 361 362 363
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
364
    def release(self, i):
365
        with self._lock:
366 367
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
368 369 370 371

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
372
    def pickAndAllocate(self):
373 374 375
        if ( self.isEmpty() ):
            return None
        with self._lock:
376 377 378 379
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
380 381
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
382 383
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
384 385 386 387 388
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
389 390
        self._conn = None 
        self._cursor = None
391 392 393 394 395 396 397
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
398 399
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
400

401 402
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
403
        # self._cursor.execute('use db') # note we do this in _findCurrenState
404 405

        # Open connection
406
        self._tdSql = TDSql()
407
        self._tdSql.init(self._cursor)
408 409 410 411 412
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
413 414 415
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
416 417
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
418 419
        # self._cursor.execute('use db')

420 421 422 423 424 425 426
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
427

428
    def execute(self, sql): 
429
        if ( not self.isOpen ):
430
            raise RuntimeError("Cannot execute database commands until connection is open")
431
        return self._tdSql.execute(sql)
S
Steven Li 已提交
432

433
    def query(self, sql) :  # return rows affected
434 435 436
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        return self._tdSql.query(sql)
437
        # results are in: return self._tdSql.queryResult
438

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
    def _queryAny(self, sql) : # actual query result as an int
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        tSql = self._tdSql
        nRows = tSql.query(sql)
        if nRows != 1 :
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
        if tSql.queryRows != 1 or tSql.queryCols != 1:
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
        return tSql.queryResult[0][0]

    def queryScalar(self, sql) -> int :
        return self._queryAny(sql)

    def queryString(self, sql) -> str :
        return self._queryAny(sql)
    
class AnyState:
457
    STATE_INVALID    = -1
458 459 460 461
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
    CAN_CREATE_FIXED_TABLE = 3
    CAN_DROP_FIXED_TABLE = 4
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
        return self._stateNames[self._info[self.STATE_VAL_IDX] - 1] # -1 hack to accomodate the STATE_INVALID case

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
    def canCreateFixedTable(self):
        return self._info[self.CAN_CREATE_FIXED_TABLE]
    def canDropFixedTable(self):
        return self._info[self.CAN_DROP_FIXED_TABLE]
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                task.logDebug("Task success found")
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
        for task in tasks :
            if not isinstance(task, cls):
                continue
            exists = True # we have a valid instance
            if task.isSuccess():
                sCnt += 1
        if ( exists and sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
            False, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    # def verifyTasksToState(self, tasks, newState):

class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
            True, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, CreateDbTask) ):
            self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really valid for massively parrallel tasks

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
        self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
        self.assertIfExistThenSuccess(tasks, DropDbTask)
        self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) 
        # Nothing to be said about adding data task
        if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
            # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            # self._state = self.STATE_EMPTY
        elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
            # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
            self.assertNoTask(tasks, DropDbTask) # should have have tried
            # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
            #     # can't say there's add-data attempts, since they may all fail
            #     self._state = self.STATE_TABLE_ONLY
            # else:                    
            #     self._state = self.STATE_HAS_DATA
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
        #     self._state = self.STATE_DB_ONLY  # no change

class StateTableOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
            self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
            # self._state = self.STATE_DB_ONLY
        elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
            self.assertNoTask(tasks, DropFixedTableTask)
            # self._state = self.STATE_HAS_DATA
        elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertNoTask(tasks, AddFixedDataTask)
            # self._state = self.STATE_TABLE_ONLY # no change
        else: # did not drop table, did not insert data, did not read successfully, that is impossible
            raise RuntimeError("Unexpected no-success scenarios")

class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
                # self._state = self.STATE_DB_ONLY
        else: # no success dropping the table, table remains intact in this step
            self.assertNoTask(tasks, DropFixedTableTask) # we should not have had such a task

            if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # added data
                # self._state = self.STATE_HAS_DATA
            # else:
                self.assertNoTask(tasks, AddFixedDataTask)
                if ( not self.hasSuccess(tasks, ReadFixedDataTask) ): # simple able to read some data
                    # which is ok, then no state change
                #     self._state = self.STATE_HAS_DATA # no change
                # else: # did not drop table, did not insert data, that is impossible? yeah, we might only had ReadData task
                    raise RuntimeError("Unexpected no-success scenarios")
647

648 649 650
# State of the database as we believe it to be
class DbState():
    
S
Steven Li 已提交
651 652
    def __init__(self):
        self.tableNumQueue = LinearQueue()
653 654 655
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
656

657
        self._state = StateInvalid() # starting state
658
        self._stateWeights = [1,3,5,10]
659
        
660 661
        # self.openDbServerConnection()
        self._dbConn = DbConn()
662 663 664 665 666 667 668 669 670 671 672 673
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if ( err.msg == 'disconnected' ): # cannot open DB connection
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
            print("[=]Unexpected exception")
            raise        
674
        self._dbConn.resetDb() # drop and recreate DB
675
        self._state = StateEmpty() # initial state, the result of above
676

677 678 679
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
680 681 682
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

683 684 685 686 687
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

688 689 690
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
691 692 693
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

694
    def getNextTick(self):
695 696 697
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
698 699

    def getNextInt(self):
700 701 702 703
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
704
    def getTableNameToDelete(self):
705
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
706 707 708
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
709 710
        return "table_{}".format(tblNum)

711 712 713
    def cleanUp(self):
        self._dbConn.close()      

714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
    def getTaskTypesAtState(self):
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
        taskTypes = []
        for tc in allTaskClasses:
            # t = tc(self) # create task object
            if tc.canBeginFrom(self._state):
                taskTypes.append(tc)
        if len(taskTypes) <= 0:
            raise RuntimeError("No suitable task types found for state: {}".format(self._state))
        return taskTypes

        # tasks.append(ReadFixedDataTask(self)) # always for everybody
        # if ( self._state == self.STATE_EMPTY ):
        #     tasks.append(CreateDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        # elif ( self._state == self.STATE_DB_ONLY ):
        #     tasks.append(DropDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_TABLE_ONLY ):
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # else:
        #     raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        # return tasks

    def pickTaskType(self):
        taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
                weights.append(self._stateWeights[endState]) # TODO: change to a method
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
753
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
754 755 756 757 758 759 760 761 762
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i

763 764 765 766 767 768 769 770 771 772 773
    def _findCurrentState(self):
        dbc = self._dbConn
        if dbc.query("show databases") == 0 : # no database?!
            return StateEmpty()
        dbc.execute("use db") # did not do this when openning connection
        if dbc.query("show tables") == 0 : # no tables
            return StateDbOnly()
        if dbc.query("SELECT * FROM {}".format(self.getFixedTableName()) ) == 0 : # no data
            return StateTableOnly()
        else:
            return StateHasData()
774
    
775 776 777
    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
778

779
        self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
780

781 782 783 784
        # Generic Checks, first based on the start state
        if self._state.canCreateDb():
            self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
785

786 787 788
        if self._state.canDropDb():
            self._state.assertIfExistThenSuccess(tasks, DropDbTask)
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
789

790 791 792
        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
793

794 795 796
        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
797

798 799
        # if self._state.canAddData():
        #     self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true actually
800

801 802
        # if self._state.canReadData():
            # Nothing for sure
803

804 805
        newState = self._findCurrentState()
        self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
806

807 808
        self._state = newState
        logger.debug("New DB state is: {}".format(self._state))
809

810 811 812 813
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

814 815 816
    def getCurStep(self):
        return self._curStep

817 818
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
819

820 821
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
822

823 824
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
825

S
Steven Li 已提交
826
class Task():
827 828 829 830 831 832 833
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
        cls.taskSn += 1
        return cls.taskSn

834
    def __init__(self, dbState: DbState, execStats: ExecutionStats):
835
        self._dbState = dbState
836
        self._workerThread = None 
837
        self._err = None
838
        self._curStep = None
839
        self._numRows = None # Number of rows affected
840 841 842

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
843

844 845
        self._execStats = execStats

846 847
    def isSuccess(self):
        return self._err == None
848

849 850
    def clone(self): # TODO: why do we need this again?
        newTask = self.__class__(self._dbState, self._execStats)
851 852 853 854 855 856 857 858
        return newTask

    def logDebug(self, msg):
        self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

    def logInfo(self, msg):
        self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

859
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
860
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
861

862 863
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
864
        self._workerThread = wt # type: ignore
865 866

        te = wt.getTaskExecutor()
867 868
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
869 870

        self._err = None
871
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
872 873 874
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
875
            self.logDebug("[=]Taos Execution exception: {0}".format(err))
876 877
            self._err = err
        except:
878
            self.logDebug("[=]Unexpected exception")
879
            raise
880
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
881
        
882 883
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
884

885
    def execSql(self, sql):
886
        return self._dbState.execute(sql)
887

888 889 890 891 892 893 894
                  
class ExecutionStats:    
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
895 896
        self._execStartTime = None
        self._elapsedTime = 0.0 # total elapsed time
897 898
        self._accRunTime = 0.0 # accumulated run time

899 900 901 902 903 904 905 906 907
        self._failed = False
        self._failureReason = None

    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

929 930 931 932
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

933
    def logStats(self):
934 935 936 937
        logger.info("----------------------------------------------------------------------")
        logger.info("| Crash_Gen test {}, with the following stats:".
            format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
        logger.info("| Task Execution Times (success/total):")
938 939 940
        execTimesAny = 0
        for k, n in self._execTimes.items():            
            execTimesAny += n[1]
941
            logger.info("|    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
942
                
943 944 945 946 947 948 949
        logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
        logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
        logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
        logger.info("----------------------------------------------------------------------")
        
950 951 952 953 954 955 956 957 958 959


class StateTransitionTask(Task):
    # @classmethod
    # def getAllTaskClasses(cls): # static
    #     return cls.__subclasses__()
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

960 961 962
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
963 964 965

    @classmethod
    def getEndState(cls):
966
        return cls.getInfo()[0]
967 968

    @classmethod
969 970 971
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
972 973 974 975 976 977 978 979 980 981

    def execute(self, wt: WorkerThread):
        super().execute(wt)
        


class CreateDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
982 983
            # [AnyState.STATE_EMPTY], # can begin from
            AnyState.STATE_DB_ONLY # end state
984 985
        ]

986 987 988 989
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

990 991 992
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

993 994 995 996
class DropDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
997 998
            # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
            AnyState.STATE_EMPTY
999 1000
        ]

1001 1002 1003 1004
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1005 1006 1007
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

1008 1009 1010 1011
class CreateFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1012 1013
            # [AnyState.STATE_DB_ONLY],
            AnyState.STATE_TABLE_ONLY
1014
        ]
1015

1016 1017 1018 1019
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateFixedTable()

1020 1021 1022
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
1023

1024 1025 1026 1027
class ReadFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1028
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
1029 1030 1031
            None # meaning doesn't affect state
        ]

1032 1033 1034 1035
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1036 1037
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
1038
        wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later
1039 1040
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

1041 1042 1043 1044
class DropFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1045 1046
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
            AnyState.STATE_DB_ONLY # meaning doesn't affect state
1047 1048
        ]

1049 1050 1051 1052
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropFixedTable()

1053 1054 1055 1056 1057 1058 1059 1060
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))

class AddFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1061 1062
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
            AnyState.STATE_HAS_DATA
1063
        ]
1064 1065 1066 1067

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 


#---------- Non State-Transition Related Tasks ----------#

class CreateTableTask(Task):    
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self._dbState.addTable()
        self.logDebug("Creating a table {} ...".format(tIndex))
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
        self.logDebug("Table {} created.".format(tIndex))
        self._dbState.releaseTable(tIndex)

S
Steven Li 已提交
1085
class DropTableTask(Task):
1086
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1087
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
1088
        if ( not tableName ): # May be "False"
1089
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
1090
            return
1091
        self.logInfo("Dropping a table db.{} ...".format(tableName))
1092
        wt.execSql("drop table db.{}".format(tableName))
1093
        
1094

S
Steven Li 已提交
1095 1096

class AddDataTask(Task):
1097
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1098
        ds = self._dbState
1099
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
1100 1101
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
1102
            self.logInfo("No table found to add data, skipping...")
1103
            return
1104
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
1105
        self.logDebug("Executing SQL: {}".format(sql))
1106 1107
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
1108
        self.logDebug("Finished adding data")
S
Steven Li 已提交
1109

1110

S
Steven Li 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
1133 1134
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1135 1136

    @classmethod
1137
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
1138 1139
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
1140
        return random.randrange(start, stop)
S
Steven Li 已提交
1141 1142 1143


# Anyone needing to carry out work should simply come here
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
1166

1167
def main():
1168
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
1169 1170 1171 1172 1173 1174 1175 1176 1177
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
1178 1179 1180 1181
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
1182 1183 1184 1185
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
1186

1187
    global gConfig
1188
    gConfig = parser.parse_args()
1189 1190 1191
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()
1192

1193
    global logger
1194
    logger = logging.getLogger('CrashGen')
1195 1196
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1197 1198 1199 1200
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
1201 1202
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
1203
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
1204
        # WorkDispatcher(dbState), # Obsolete?
1205
        dbState
1206
        )
1207
    
1208
    tc.run()
1209 1210 1211
    tc.logStats()
    dbState.cleanUp()    
    
1212 1213 1214 1215
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()