crash_gen.py 29.5 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17 18 19 20
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
21
import getopt
22
import argparse
23
import copy
S
Steven Li 已提交
24 25 26 27

import threading
import random
import logging
28
import datetime
S
Steven Li 已提交
29

30 31
from typing import List

S
Steven Li 已提交
32 33 34 35 36
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

37
import crash_gen
S
Steven Li 已提交
38 39
import taos

40 41 42
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
43

44 45
def runThread(wt: WorkerThread):    
    wt.run()
46

S
Steven Li 已提交
47
class WorkerThread:
48
    def __init__(self, pool: ThreadPool, tid, 
49 50 51 52
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
53
        self._pool = pool
54
        self._tid = tid        
55
        self._tc = tc
S
Steven Li 已提交
56
        # self.threadIdent = threading.get_ident()
57 58
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
59

60
        # Let us have a DB connection of our own
61 62 63
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

64 65 66 67 68 69 70
    def logDebug(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

    def logInfo(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

   
71 72
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
73

S
Steven Li 已提交
74
    def start(self):
75
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
76

77
    def run(self): 
S
Steven Li 已提交
78
        # initialization after thread starts, in the thread context
79
        # self.isSleeping = False
80 81
        logger.info("Starting to run thread: {}".format(self._tid))

82
        if ( gConfig.per_thread_db_connection ): # type: ignore
83
            self._dbConn.open()
S
Steven Li 已提交
84

85 86
        self._doTaskLoop()       
        
87
        # clean up
88
        if ( gConfig.per_thread_db_connection ): # type: ignore 
89
            self._dbConn.close()
90

91 92 93
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
94 95 96
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
97 98 99 100 101 102
            logger.debug("Thread task loop exited barrier...")
            self.crossStepGate()   # then per-thread gate, after being tapped
            logger.debug("Thread task loop exited step gate...")
            if not self._tc.isRunning():
                break

103
            task = tc.fetchTask()
104
            task.execute(self)
105
            tc.saveExecutedTask(task)
106
  
S
Steven Li 已提交
107
    def verifyThreadSelf(self): # ensure we are called by this own thread
108
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
109 110 111 112 113 114 115
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
116
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
117 118
            raise RuntimeError("Unexpected dead thread")

119
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
120 121 122 123
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
124
        # Wait again at the "gate", waiting to be "tapped"
125
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
126 127
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
128
        
129
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
130 131 132 133

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
134
 
135
        logger.debug("Tapping worker thread {}".format(self._tid))
136 137
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
138

139
    def execSql(self, sql):
140 141 142
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execSql(sql)            
        else:
143
            return self._tc.getDbState.getDbConn().execSql(sql)
144
                  
145
class ThreadCoordinator:
146
    def __init__(self, pool, wd: WorkDispatcher, dbState):
147 148 149 150
        self._curStep = -1 # first step is 0
        self._pool = pool
        self._wd = wd
        self._te = None # prepare for every new step
151 152 153
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
154

155
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
S
Steven Li 已提交
156

157 158 159
    def getTaskExecutor(self):
        return self._te

160 161 162
    def getDbState(self) -> DbState :
        return self._dbState

163 164 165
    def crossStepBarrier(self):
        self._stepBarrier.wait()

166 167
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
168 169

        # Coordinate all threads step by step
170 171 172
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
        while(self._curStep < maxSteps):
173
            print(".", end="", flush=True)
S
Steven Li 已提交
174
            logger.debug("Main thread going to sleep")
175

176
            # Now ready to enter a step
177 178 179 180
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
181
            self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
182
            self.resetExecutedTasks() # clear the tasks after we are done
183 184

            # Get ready for next step
185 186 187
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
188

189
            # A new TE for the new step
190
            self._te = TaskExecutor(self._curStep)
191

192
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
193 194
            self.tapAllThreads()

195 196 197 198 199 200 201 202 203
        logger.debug("Main thread ready to finish up...")
        self.crossStepBarrier() # Cross it one last time, after all threads finish
        self._stepBarrier.reset()
        logger.debug("Main thread in exclusive zone...")
        self._te = None # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads() # Let the threads run one last time
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
204 205

        logger.info("All threads finished")
206
        print("\r\nFinished")
S
Steven Li 已提交
207 208 209

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
210
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
211 212 213 214 215
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
216
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
217
        for i in wakeSeq:
218
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
219 220
            time.sleep(0) # yield

221 222 223 224 225 226
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
227 228 229 230 231
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
        dbState = self.getDbState()
        tasks = dbState.getTasksAtState()
        i = Dice.throw(len(tasks))
232 233 234 235 236
        # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        return tasks[i].clone()

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
237 238 239 240

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
241 242

# We define a class to run a number of threads in locking steps.
243
class ThreadPool:
244 245 246 247 248 249 250 251 252
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
        self.dispatcher = WorkDispatcher(dbState)
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
253
        # self.numWaitingThreads = 0    
254 255
        
    # starting to run all the threads, in locking steps
256
    def createAndStartThreads(self, tc: ThreadCoordinator):
257
        for tid in range(0, self.numThreads): # Create the threads
258
            workerThread = WorkerThread(self, tid, tc)            
259 260 261 262 263 264 265 266
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
267 268 269
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
270
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
271
        self.lastIndex = 0
272
        self._lock = threading.RLock() # our functions may call each other
273
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
274

275 276 277 278 279 280 281 282 283
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
284 285
            # Otherwise we have something
            self.lastIndex += 1
286 287
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
288
            return self.lastIndex
S
Steven Li 已提交
289 290

    def pop(self):
291
        with self._lock:
292
            if ( self.isEmpty() ): 
293 294 295
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
296
            index = self.firstIndex
297
            if ( index in self.inUse ):
298 299
                return False

300 301 302 303 304 305 306
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
307
        with self._lock:
308 309 310 311
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
312
    def allocate(self, i):
313
        with self._lock:
314
            # logger.debug("LQ allocating item {}".format(i))
315 316 317 318
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
319
    def release(self, i):
320
        with self._lock:
321 322
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
323 324 325 326

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
327
    def pickAndAllocate(self):
328 329 330
        if ( self.isEmpty() ):
            return None
        with self._lock:
331 332 333 334
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
335 336
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
337 338
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
339 340 341 342 343
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
344 345
        self._conn = None 
        self._cursor = None
346 347 348 349 350 351 352
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
353 354
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
355

356 357 358 359 360
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
361
        self._tdSql = TDSql()
362
        self._tdSql.init(self._cursor)
363 364 365 366 367
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
368 369 370
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
371 372
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
373 374
        # self._cursor.execute('use db')

375 376 377 378 379 380 381
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
382

383 384 385
    def execSql(self, sql): 
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
386
        return self._tdSql.execute(sql)
S
Steven Li 已提交
387 388 389

# State of the database as we believe it to be
class DbState():
390 391 392 393 394 395
    STATE_INVALID    = -1
    STATE_EMPTY      = 1  # nothing there, no even a DB
    STATE_DB_ONLY    = 2  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 3  # we have a table, but totally empty
    STATE_HAS_DATA   = 4  # we have some data in the table

S
Steven Li 已提交
396 397
    def __init__(self):
        self.tableNumQueue = LinearQueue()
398 399 400
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
401 402
        self._state = self.STATE_INVALID
        
403 404 405 406
        # self.openDbServerConnection()
        self._dbConn = DbConn()
        self._dbConn.open() 
        self._dbConn.resetDb() # drop and recreate DB
407
        self._state = self.STATE_EMPTY # initial state, the result of above
408

409 410 411
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
412 413 414
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

415 416 417 418 419
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

420 421 422
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
423 424 425
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

426
    def getNextTick(self):
427 428 429
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
430 431

    def getNextInt(self):
432 433 434 435
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
436
    def getTableNameToDelete(self):
437
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
438 439 440
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
441 442
        return "table_{}".format(tblNum)

443 444 445 446 447 448
    def execSql(self, sql): # using the main DB connection
        return self._dbConn.execSql(sql)

    def cleanUp(self):
        self._dbConn.close()      

449 450
    def getTasksAtState(self):
        if ( self._state == self.STATE_EMPTY ):
451
            return [CreateDbTask(self), CreateFixedTableTask(self)]
452
        elif ( self._state == self.STATE_DB_ONLY ):
453 454 455 456 457
            return [DropDbTask(self), CreateFixedTableTask(self), AddFixedDataTask(self)]
        elif ( self._state == self.STATE_TABLE_ONLY ):
            return [DropFixedTableTask(self), AddFixedDataTask(self)]
        elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
            return [DropFixedTableTask(self), AddFixedDataTask(self)]
458 459 460 461 462 463 464 465
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
        if ( self._state == self.STATE_EMPTY ):
            if ( self.hasSuccess(tasks, CreateDbTask) ):
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
                self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
                self._state = self.STATE_DB_ONLY
                if ( self.hasSuccess(tasks, CreateFixedTableTask )):
                    self._state = self.STATE_TABLE_ONLY
                # else: # no successful table creation, not much we can say, as it is step 2
            else: # did not create db
                self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
                # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
                self.assertNoSuccess(tasks, CreateFixedTableTask)
                
        elif ( self._state == self.STATE_DB_ONLY ): 
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            self.assertIfExistThenSuccess(tasks, DropDbTask)
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
            # Nothing to be said about adding data task
            if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
                # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
                self.assertAtMostOneSuccess(tasks, DropDbTask)
                self._state = self.STATE_EMPTY
            elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
                # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
                self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
                self.assertNoTask(tasks, DropDbTask) # should have have tried
                if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
                    # can't say there's add-data attempts, since they may all fail
                    self._state = self.STATE_TABLE_ONLY
                else:                    
                    self._state = self.STATE_HAS_DATA
            else: # no success in dropping db tasks, no success in create fixed table, not acceptable
                raise RuntimeError("Unexpected no-success scenario")

        elif ( self._state == self.STATE_TABLE_ONLY ):            
            if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
500
                self._state = self.STATE_DB_ONLY
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
                self.assertNoTask(tasks, DropFixedTableTask)
                self._state = self.STATE_HAS_DATA
            else: # did not drop table, did not insert data, that is impossible
                raise RuntimeError("Unexpected no-success scenarios")

        elif ( self._state == self.STATE_TABLE_ONLY ): # Same as above, TODO: adjust
            if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
                self._state = self.STATE_DB_ONLY
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
                self.assertNoTask(tasks, DropFixedTableTask)
                self._state = self.STATE_HAS_DATA
            else: # did not drop table, did not insert data, that is impossible
                raise RuntimeError("Unexpected no-success scenarios")

517 518 519 520 521 522 523 524 525 526
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        logger.debug("New DB state is: {}".format(self._state))

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
527
                task.logDebug("Task success found")
528 529 530 531 532 533
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
534
        exists = False
535 536 537
        for task in tasks :
            if not isinstance(task, cls):
                continue
538
            exists = True # we have a valid instance
539 540
            if task.isSuccess():
                sCnt += 1
541
        if ( exists and sCnt <= 0 ):
542 543
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

544 545 546 547 548 549 550 551 552 553 554
    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise RuntimeError("Unexpected task: {}".format(cls))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

555 556 557 558 559 560 561 562
    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

563 564 565 566
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

567 568 569
    def getCurStep(self):
        return self._curStep

570 571
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
572

573 574
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
575

576 577
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
578

S
Steven Li 已提交
579
class Task():
580 581 582 583 584 585 586
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
        cls.taskSn += 1
        return cls.taskSn

587 588
    def __init__(self, dbState: DbState):
        self._dbState = dbState
589
        self._workerThread = None 
590
        self._err = None
591 592 593 594
        self._curStep = None

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
595 596 597

    def isSuccess(self):
        return self._err == None
598

599 600 601 602 603 604 605 606 607 608
    def clone(self):
        newTask = self.__class__(self._dbState)
        return newTask

    def logDebug(self, msg):
        self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

    def logInfo(self, msg):
        self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

609
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
610
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
611

612 613
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
614
        self._workerThread = wt # type: ignore
615 616

        te = wt.getTaskExecutor()
617 618
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
619 620 621 622 623

        self._err = None
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
624
            self.logDebug("[=]Taos Execution exception: {0}".format(err))
625 626
            self._err = err
        except:
627
            self.logDebug("[=]Unexpected exception")
628 629
            raise
        
630
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
S
Steven Li 已提交
631

632
    def execSql(self, sql):
633
        return self._dbState.execute(sql)
634

635 636 637 638
class CreateDbTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

639
class DropDbTask(Task):
640 641 642
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

S
Steven Li 已提交
643
class CreateTableTask(Task):
644
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
645
        tIndex = self._dbState.addTable()
646
        self.logDebug("Creating a table {} ...".format(tIndex))
647
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
648
        self.logDebug("Table {} created.".format(tIndex))
649 650 651 652 653 654
        self._dbState.releaseTable(tIndex)

class CreateFixedTableTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
655 656

class DropTableTask(Task):
657
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
658
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
659
        if ( not tableName ): # May be "False"
660
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
661
            return
662
        self.logInfo("Dropping a table db.{} ...".format(tableName))
663
        wt.execSql("drop table db.{}".format(tableName))
664 665 666 667 668
        
class DropFixedTableTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))
S
Steven Li 已提交
669 670

class AddDataTask(Task):
671
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
672
        ds = self._dbState
673
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
674 675
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
676
            self.logInfo("No table found to add data, skipping...")
677
            return
678
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
679
        self.logDebug("Executing SQL: {}".format(sql))
680 681
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
682
        self.logDebug("Finished adding data")
S
Steven Li 已提交
683

684 685 686 687 688 689
class AddFixedDataTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 

S
Steven Li 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
712 713
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
714 715

    @classmethod
716
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
717 718
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
719
        return random.randrange(start, stop)
S
Steven Li 已提交
720 721 722 723


# Anyone needing to carry out work should simply come here
class WorkDispatcher():
724
    def __init__(self, dbState):
725
        # self.totalNumMethods = 2
S
Steven Li 已提交
726
        self.tasks = [
727
            CreateTableTask(dbState),
728
            DropTableTask(dbState),
S
Steven Li 已提交
729
            AddDataTask(dbState),
S
Steven Li 已提交
730 731 732
        ]

    def throwDice(self):
733 734 735 736
        max = len(self.tasks) - 1 
        dRes = random.randint(0, max)
        # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
        return dRes
S
Steven Li 已提交
737

738
    def pickTask(self):
S
Steven Li 已提交
739
        dice = self.throwDice()
740 741 742 743
        return self.tasks[dice]

    def doWork(self, workerThread):
        task = self.pickTask()
744
        task.execute(workerThread)
S
Steven Li 已提交
745

746
def main():
747 748 749 750 751 752
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
    parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
753 754 755 756
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
757

758
    global gConfig
759 760
    gConfig = parser.parse_args()

761
    global logger
S
Steven Li 已提交
762
    logger = logging.getLogger('myApp')
763 764
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
765 766 767 768
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
769 770
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
771 772 773
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
        WorkDispatcher(dbState),
        dbState
774
        )
775
    tc.run()
776
    dbState.cleanUp()
777 778 779 780
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()