crash_gen.py 23.7 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17 18 19 20
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
21
import getopt
22
import argparse
23
import copy
S
Steven Li 已提交
24 25 26 27

import threading
import random
import logging
28
import datetime
S
Steven Li 已提交
29

30 31
from typing import List

S
Steven Li 已提交
32 33 34 35 36
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

37
import crash_gen
S
Steven Li 已提交
38 39
import taos

40 41 42
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
43

44 45
def runThread(wt: WorkerThread):    
    wt.run()
46

S
Steven Li 已提交
47
class WorkerThread:
48
    def __init__(self, pool: ThreadPool, tid, 
49 50 51 52
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
53
        self._pool = pool
54
        self._tid = tid        
55
        self._tc = tc
S
Steven Li 已提交
56
        # self.threadIdent = threading.get_ident()
57 58
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
59

60
        # Let us have a DB connection of our own
61 62 63 64 65
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
66

S
Steven Li 已提交
67
    def start(self):
68
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
69

70
    def run(self): 
S
Steven Li 已提交
71
        # initialization after thread starts, in the thread context
72
        # self.isSleeping = False
73 74
        logger.info("Starting to run thread: {}".format(self._tid))

75
        if ( gConfig.per_thread_db_connection ): # type: ignore
76
            self._dbConn.open()
S
Steven Li 已提交
77

78 79
        self._doTaskLoop()       
        
80
        # clean up
81
        if ( gConfig.per_thread_db_connection ): # type: ignore 
82
            self._dbConn.close()
83

84 85 86
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
87 88 89
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
90 91 92 93 94 95
            logger.debug("Thread task loop exited barrier...")
            self.crossStepGate()   # then per-thread gate, after being tapped
            logger.debug("Thread task loop exited step gate...")
            if not self._tc.isRunning():
                break

96
            task = tc.fetchTask()
97
            task.execute(self)
98
            tc.saveExecutedTask(task)
99
  
S
Steven Li 已提交
100
    def verifyThreadSelf(self): # ensure we are called by this own thread
101
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
102 103 104 105 106 107 108
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
109
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
110 111
            raise RuntimeError("Unexpected dead thread")

112
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
113 114 115 116
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
117
        # Wait again at the "gate", waiting to be "tapped"
118
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
119 120
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
121
        
122
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
123 124 125 126

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
127
 
128
        logger.debug("Tapping worker thread {}".format(self._tid))
129 130
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
131

132
    def execSql(self, sql):
133 134 135
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execSql(sql)            
        else:
136
            return self._tc.getDbState.getDbConn().execSql(sql)
137
                  
138
class ThreadCoordinator:
139
    def __init__(self, pool, wd: WorkDispatcher, dbState):
140 141 142 143
        self._curStep = -1 # first step is 0
        self._pool = pool
        self._wd = wd
        self._te = None # prepare for every new step
144 145 146
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
147

148
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
S
Steven Li 已提交
149

150 151 152
    def getTaskExecutor(self):
        return self._te

153 154 155
    def getDbState(self) -> DbState :
        return self._dbState

156 157 158
    def crossStepBarrier(self):
        self._stepBarrier.wait()

159 160
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
161 162

        # Coordinate all threads step by step
163 164 165
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
        while(self._curStep < maxSteps):
166
            print(".", end="", flush=True)
S
Steven Li 已提交
167
            logger.debug("Main thread going to sleep")
168

169
            # Now ready to enter a step
170 171 172 173
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
174 175 176
            self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state

            # Get ready for next step
177 178 179
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
180

181
            # A new TE for the new step
182
            self._te = TaskExecutor(self._curStep)
183

184
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
185 186
            self.tapAllThreads()

187 188 189 190 191 192 193 194 195
        logger.debug("Main thread ready to finish up...")
        self.crossStepBarrier() # Cross it one last time, after all threads finish
        self._stepBarrier.reset()
        logger.debug("Main thread in exclusive zone...")
        self._te = None # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads() # Let the threads run one last time
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
196 197

        logger.info("All threads finished")
198
        print("\r\nFinished")
S
Steven Li 已提交
199 200 201

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
202
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
203 204 205 206 207
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
208
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
209
        for i in wakeSeq:
210
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
211 212
            time.sleep(0) # yield

213 214 215 216 217 218
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
219 220 221 222 223 224 225 226 227 228
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
        dbState = self.getDbState()
        tasks = dbState.getTasksAtState()
        i = Dice.throw(len(tasks))
        return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
229 230

# We define a class to run a number of threads in locking steps.
231
class ThreadPool:
232 233 234 235 236 237 238 239 240
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
        self.dispatcher = WorkDispatcher(dbState)
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
241
        # self.numWaitingThreads = 0    
242 243
        
    # starting to run all the threads, in locking steps
244
    def createAndStartThreads(self, tc: ThreadCoordinator):
245
        for tid in range(0, self.numThreads): # Create the threads
246
            workerThread = WorkerThread(self, tid, tc)            
247 248 249 250 251 252 253 254
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
255 256 257
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
258
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
259
        self.lastIndex = 0
260
        self._lock = threading.RLock() # our functions may call each other
261
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
262

263 264 265 266 267 268 269 270 271
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
272 273
            # Otherwise we have something
            self.lastIndex += 1
274 275
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
276
            return self.lastIndex
S
Steven Li 已提交
277 278

    def pop(self):
279
        with self._lock:
280
            if ( self.isEmpty() ): 
281 282 283
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
284
            index = self.firstIndex
285
            if ( index in self.inUse ):
286 287
                return False

288 289 290 291 292 293 294
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
295
        with self._lock:
296 297 298 299
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
300
    def allocate(self, i):
301
        with self._lock:
302
            # logger.debug("LQ allocating item {}".format(i))
303 304 305 306
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
307
    def release(self, i):
308
        with self._lock:
309 310
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
311 312 313 314

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
315
    def pickAndAllocate(self):
316 317 318
        if ( self.isEmpty() ):
            return None
        with self._lock:
319 320 321 322
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
323 324
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
325 326
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
327 328 329 330 331
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
332 333
        self._conn = None 
        self._cursor = None
334 335 336 337 338 339 340
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
341 342
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
343

344 345 346 347 348
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
349
        self._tdSql = TDSql()
350
        self._tdSql.init(self._cursor)
351 352 353 354 355
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
356 357 358
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
359 360
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
361 362
        # self._cursor.execute('use db')

363 364 365 366 367 368 369
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
370

371 372 373
    def execSql(self, sql): 
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
374
        return self._tdSql.execute(sql)
S
Steven Li 已提交
375 376 377

# State of the database as we believe it to be
class DbState():
378 379 380 381 382 383
    STATE_INVALID    = -1
    STATE_EMPTY      = 1  # nothing there, no even a DB
    STATE_DB_ONLY    = 2  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 3  # we have a table, but totally empty
    STATE_HAS_DATA   = 4  # we have some data in the table

S
Steven Li 已提交
384 385
    def __init__(self):
        self.tableNumQueue = LinearQueue()
386 387 388
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
389 390
        self._state = self.STATE_INVALID
        
391 392 393 394
        # self.openDbServerConnection()
        self._dbConn = DbConn()
        self._dbConn.open() 
        self._dbConn.resetDb() # drop and recreate DB
395
        self._state = self.STATE_EMPTY # initial state, the result of above
396

397 398 399
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
400 401 402
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

403 404 405 406 407
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

S
Steven Li 已提交
408 409 410
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

411
    def getNextTick(self):
412 413 414
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
415 416

    def getNextInt(self):
417 418 419 420
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
421
    def getTableNameToDelete(self):
422
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
423 424 425
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
426 427
        return "table_{}".format(tblNum)

428 429 430 431 432 433
    def execSql(self, sql): # using the main DB connection
        return self._dbConn.execSql(sql)

    def cleanUp(self):
        self._dbConn.close()      

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
    def getTasksAtState(self):
        if ( self._state == self.STATE_EMPTY ):
            return [CreateDbTask(self), CreateTableTask(self)]
        elif ( self._state == self.STATE_DB_ONLY ):
            return [DeleteDbTask(self), CreateTableTask(self), AddDataTask(self)]
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
        if ( self._state == self.STATE_EMPTY ):
            self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
            self.assertIfExistThenSuccess(tasks, CreateDbTask)
            self.assertAtMostOneSuccess(tasks, CreateTableTask)
            if ( self.hasSuccess(tasks, CreateDbTask) ):
                self._state = self.STATE_DB_ONLY
            if ( self.hasSuccess(tasks, CreateTableTask) ):
                self._state = self.STATE_TABLE_ONLY
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        logger.debug("New DB state is: {}".format(self._state))

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                sCnt += 1
        if ( sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

485 486 487 488
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

489 490
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
491 492 493 494 495 496 497

    def logInfo(self, msg):
        logger.info("    T[{}.x]: ".format(self._curStep) + msg)

    def logDebug(self, msg):
        logger.debug("    T[{}.x]: ".format(self._curStep) + msg)

S
Steven Li 已提交
498
class Task():
499 500
    def __init__(self, dbState):
        self.dbState = dbState
501 502 503 504
        self._err = None

    def isSuccess(self):
        return self._err == None
505

506
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
507 508
        raise RuntimeError("To be implemeted by child classes")

509 510 511 512
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()

        te = wt.getTaskExecutor()
513 514 515 516 517 518 519 520 521 522 523 524
        te.logDebug("[-] executing task {}...".format(self.__class__.__name__))

        self._err = None
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
            te.logDebug("[=]Taos Execution exception: {0}".format(err))
            self._err = err
        except:
            te.logDebug("[=]Unexpected exception")
            raise
        
525
        te.logDebug("[X] task execution completed")
S
Steven Li 已提交
526

527 528 529
    def execSql(self, sql):
        return self.dbState.execute(sql)

530 531 532 533 534 535 536 537
class CreateDbTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

class DeleteDbTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

S
Steven Li 已提交
538
class CreateTableTask(Task):
539 540 541
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self.dbState.addTable()
        te.logDebug("Creating a table {} ...".format(tIndex))
542
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
543 544
        te.logDebug("Table {} created.".format(tIndex))
        self.dbState.releaseTable(tIndex)
S
Steven Li 已提交
545 546

class DropTableTask(Task):
547 548
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tableName = self.dbState.getTableNameToDelete()
S
Steven Li 已提交
549
        if ( not tableName ): # May be "False"
550
            te.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
551
            return
552
        te.logInfo("Dropping a table db.{} ...".format(tableName))
553
        wt.execSql("drop table db.{}".format(tableName))
S
Steven Li 已提交
554 555

class AddDataTask(Task):
556
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Steven Li 已提交
557
        ds = self.dbState
558
        te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
559 560
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
561
            te.logInfo("No table found to add data, skipping...")
562
            return
563
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
564
        te.logDebug("Executing SQL: {}".format(sql))
565 566
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
567
        te.logDebug("Finished adding data")
S
Steven Li 已提交
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590

# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
591 592
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
593 594

    @classmethod
595
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
596 597
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
598
        return random.randrange(start, stop)
S
Steven Li 已提交
599 600 601 602


# Anyone needing to carry out work should simply come here
class WorkDispatcher():
603
    def __init__(self, dbState):
604
        # self.totalNumMethods = 2
S
Steven Li 已提交
605
        self.tasks = [
606
            CreateTableTask(dbState),
607
            DropTableTask(dbState),
S
Steven Li 已提交
608
            AddDataTask(dbState),
S
Steven Li 已提交
609 610 611
        ]

    def throwDice(self):
612 613 614 615
        max = len(self.tasks) - 1 
        dRes = random.randint(0, max)
        # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
        return dRes
S
Steven Li 已提交
616

617
    def pickTask(self):
S
Steven Li 已提交
618
        dice = self.throwDice()
619 620 621 622
        return self.tasks[dice]

    def doWork(self, workerThread):
        task = self.pickTask()
623
        task.execute(workerThread)
S
Steven Li 已提交
624

625
def main():
626 627 628 629 630 631
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
    parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
632 633 634 635
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
636

637
    global gConfig
638 639
    gConfig = parser.parse_args()

640
    global logger
S
Steven Li 已提交
641
    logger = logging.getLogger('myApp')
642 643
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
644 645 646 647
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
648 649
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
650 651 652
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
        WorkDispatcher(dbState),
        dbState
653
        )
654
    tc.run()
655
    dbState.cleanUp()
656 657 658 659
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()