crash_gen.py 30.8 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17 18 19 20
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
21
import getopt
22
import argparse
23
import copy
S
Steven Li 已提交
24 25 26 27

import threading
import random
import logging
28
import datetime
S
Steven Li 已提交
29

30 31
from typing import List

S
Steven Li 已提交
32 33 34 35 36
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

37
import crash_gen
S
Steven Li 已提交
38 39
import taos

40 41 42
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
43

44 45
def runThread(wt: WorkerThread):    
    wt.run()
46

S
Steven Li 已提交
47
class WorkerThread:
48
    def __init__(self, pool: ThreadPool, tid, 
49 50 51 52
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
53
        self._pool = pool
54
        self._tid = tid        
55
        self._tc = tc
S
Steven Li 已提交
56
        # self.threadIdent = threading.get_ident()
57 58
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
59

60
        # Let us have a DB connection of our own
61 62 63
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

64 65 66 67 68 69 70
    def logDebug(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

    def logInfo(self, msg):
        logger.info("    t[{}] {}".format(self._tid, msg))

   
71 72
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
73

S
Steven Li 已提交
74
    def start(self):
75
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
76

77
    def run(self): 
S
Steven Li 已提交
78
        # initialization after thread starts, in the thread context
79
        # self.isSleeping = False
80 81
        logger.info("Starting to run thread: {}".format(self._tid))

82
        if ( gConfig.per_thread_db_connection ): # type: ignore
83
            self._dbConn.open()
S
Steven Li 已提交
84

85 86
        self._doTaskLoop()       
        
87
        # clean up
88
        if ( gConfig.per_thread_db_connection ): # type: ignore 
89
            self._dbConn.close()
90

91 92 93
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
94 95 96
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
97 98 99 100 101 102
            logger.debug("Thread task loop exited barrier...")
            self.crossStepGate()   # then per-thread gate, after being tapped
            logger.debug("Thread task loop exited step gate...")
            if not self._tc.isRunning():
                break

103
            task = tc.fetchTask()
104
            task.execute(self)
105
            tc.saveExecutedTask(task)
106
  
S
Steven Li 已提交
107
    def verifyThreadSelf(self): # ensure we are called by this own thread
108
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
109 110 111 112 113 114 115
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
116
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
117 118
            raise RuntimeError("Unexpected dead thread")

119
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
120 121 122 123
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
124
        # Wait again at the "gate", waiting to be "tapped"
125
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
126 127
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
128
        
129
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
130 131 132 133

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
134
 
135
        logger.debug("Tapping worker thread {}".format(self._tid))
136 137
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
138

139 140 141 142 143 144 145
    def execSql(self, sql): # not "execute", since we are out side the DB context
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
            return self._tc.getDbState.getDbConn().execute(sql)

    def querySql(self, sql): # not "execute", since we are out side the DB context
146
        if ( gConfig.per_thread_db_connection ):
147
            return self._dbConn.query(sql)            
148
        else:
149
            return self._tc.getDbState.getDbConn().query(sql)
150
                  
151
class ThreadCoordinator:
152
    def __init__(self, pool, wd: WorkDispatcher, dbState):
153 154 155 156
        self._curStep = -1 # first step is 0
        self._pool = pool
        self._wd = wd
        self._te = None # prepare for every new step
157 158 159
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
160

161
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
S
Steven Li 已提交
162

163 164 165
    def getTaskExecutor(self):
        return self._te

166 167 168
    def getDbState(self) -> DbState :
        return self._dbState

169 170 171
    def crossStepBarrier(self):
        self._stepBarrier.wait()

172 173
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
174 175

        # Coordinate all threads step by step
176 177 178
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
        while(self._curStep < maxSteps):
179
            print(".", end="", flush=True)
S
Steven Li 已提交
180
            logger.debug("Main thread going to sleep")
181

182
            # Now ready to enter a step
183 184 185 186
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
187
            self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
188
            self.resetExecutedTasks() # clear the tasks after we are done
189 190

            # Get ready for next step
191 192 193
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
194

195
            # A new TE for the new step
196
            self._te = TaskExecutor(self._curStep)
197

198
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
199 200
            self.tapAllThreads()

201 202 203 204 205 206 207 208 209
        logger.debug("Main thread ready to finish up...")
        self.crossStepBarrier() # Cross it one last time, after all threads finish
        self._stepBarrier.reset()
        logger.debug("Main thread in exclusive zone...")
        self._te = None # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads() # Let the threads run one last time
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
210 211

        logger.info("All threads finished")
212
        print("\r\nFinished")
S
Steven Li 已提交
213 214 215

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
216
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
217 218 219 220 221
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
222
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
223
        for i in wakeSeq:
224
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
225 226
            time.sleep(0) # yield

227 228 229 230 231 232
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
233 234 235 236 237
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
        dbState = self.getDbState()
        tasks = dbState.getTasksAtState()
        i = Dice.throw(len(tasks))
238 239 240 241 242
        # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        return tasks[i].clone()

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
243 244 245 246

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
247 248

# We define a class to run a number of threads in locking steps.
249
class ThreadPool:
250 251 252 253 254 255 256 257 258
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
        self.dispatcher = WorkDispatcher(dbState)
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
259
        # self.numWaitingThreads = 0    
260 261
        
    # starting to run all the threads, in locking steps
262
    def createAndStartThreads(self, tc: ThreadCoordinator):
263
        for tid in range(0, self.numThreads): # Create the threads
264
            workerThread = WorkerThread(self, tid, tc)            
265 266 267 268 269 270 271 272
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
273 274 275
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
276
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
277
        self.lastIndex = 0
278
        self._lock = threading.RLock() # our functions may call each other
279
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
280

281 282 283 284 285 286 287 288 289
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
290 291
            # Otherwise we have something
            self.lastIndex += 1
292 293
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
294
            return self.lastIndex
S
Steven Li 已提交
295 296

    def pop(self):
297
        with self._lock:
298
            if ( self.isEmpty() ): 
299 300 301
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
302
            index = self.firstIndex
303
            if ( index in self.inUse ):
304 305
                return False

306 307 308 309 310 311 312
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
313
        with self._lock:
314 315 316 317
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
318
    def allocate(self, i):
319
        with self._lock:
320
            # logger.debug("LQ allocating item {}".format(i))
321 322 323 324
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
325
    def release(self, i):
326
        with self._lock:
327 328
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
329 330 331 332

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
333
    def pickAndAllocate(self):
334 335 336
        if ( self.isEmpty() ):
            return None
        with self._lock:
337 338 339 340
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
341 342
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
343 344
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
345 346 347 348 349
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
350 351
        self._conn = None 
        self._cursor = None
352 353 354 355 356 357 358
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
359 360
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
361

362 363 364 365 366
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
367
        self._tdSql = TDSql()
368
        self._tdSql.init(self._cursor)
369 370 371 372 373
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
374 375 376
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
377 378
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
379 380
        # self._cursor.execute('use db')

381 382 383 384 385 386 387
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
388

389
    def execute(self, sql): 
390
        if ( not self.isOpen ):
391
            raise RuntimeError("Cannot execute database commands until connection is open")
392
        return self._tdSql.execute(sql)
S
Steven Li 已提交
393

394 395 396 397 398 399
    def query(self, sql) -> int :  # return number of rows retrieved
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        return self._tdSql.query(sql)


S
Steven Li 已提交
400 401
# State of the database as we believe it to be
class DbState():
402 403 404 405 406 407
    STATE_INVALID    = -1
    STATE_EMPTY      = 1  # nothing there, no even a DB
    STATE_DB_ONLY    = 2  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 3  # we have a table, but totally empty
    STATE_HAS_DATA   = 4  # we have some data in the table

S
Steven Li 已提交
408 409
    def __init__(self):
        self.tableNumQueue = LinearQueue()
410 411 412
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
413 414
        self._state = self.STATE_INVALID
        
415 416 417 418
        # self.openDbServerConnection()
        self._dbConn = DbConn()
        self._dbConn.open() 
        self._dbConn.resetDb() # drop and recreate DB
419
        self._state = self.STATE_EMPTY # initial state, the result of above
420

421 422 423
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
424 425 426
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

427 428 429 430 431
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

432 433 434
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
435 436 437
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

438
    def getNextTick(self):
439 440 441
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
442 443

    def getNextInt(self):
444 445 446 447
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
448
    def getTableNameToDelete(self):
449
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
450 451 452
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
453 454
        return "table_{}".format(tblNum)

455
    def execSql(self, sql): # using the main DB connection
456
        return self._dbConn.execute(sql)
457 458 459 460

    def cleanUp(self):
        self._dbConn.close()      

461
    def getTasksAtState(self):
462 463
        tasks = []
        tasks.append(ReadFixedDataTask(self)) # always
464
        if ( self._state == self.STATE_EMPTY ):
465 466
            tasks.append(CreateDbTask(self))
            tasks.append(CreateFixedTableTask(self))
467
        elif ( self._state == self.STATE_DB_ONLY ):
468 469 470
            tasks.append(DropDbTask(self))
            tasks.append(CreateFixedTableTask(self))
            tasks.append(AddFixedDataTask(self))
471
        elif ( self._state == self.STATE_TABLE_ONLY ):
472 473
            tasks.append(DropFixedTableTask(self))
            tasks.append(AddFixedDataTask(self))
474
        elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
475 476
            tasks.append(DropFixedTableTask(self))
            tasks.append(AddFixedDataTask(self))
477 478
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))
479
        return tasks
480 481 482 483 484

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
        if ( self._state == self.STATE_EMPTY ):
485
            # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
486
            if ( self.hasSuccess(tasks, CreateDbTask) ):
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
                self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
                self._state = self.STATE_DB_ONLY
                if ( self.hasSuccess(tasks, CreateFixedTableTask )):
                    self._state = self.STATE_TABLE_ONLY
                # else: # no successful table creation, not much we can say, as it is step 2
            else: # did not create db
                self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
                # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
                self.assertNoSuccess(tasks, CreateFixedTableTask)
                
        elif ( self._state == self.STATE_DB_ONLY ): 
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            self.assertIfExistThenSuccess(tasks, DropDbTask)
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
            # Nothing to be said about adding data task
            if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
                # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
                self.assertAtMostOneSuccess(tasks, DropDbTask)
                self._state = self.STATE_EMPTY
            elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
                # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
                self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
                self.assertNoTask(tasks, DropDbTask) # should have have tried
                if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
                    # can't say there's add-data attempts, since they may all fail
                    self._state = self.STATE_TABLE_ONLY
                else:                    
                    self._state = self.STATE_HAS_DATA
            else: # no success in dropping db tasks, no success in create fixed table, not acceptable
                raise RuntimeError("Unexpected no-success scenario")

        elif ( self._state == self.STATE_TABLE_ONLY ):            
            if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
521
                self._state = self.STATE_DB_ONLY
522 523 524 525 526 527
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
                self.assertNoTask(tasks, DropFixedTableTask)
                self._state = self.STATE_HAS_DATA
            else: # did not drop table, did not insert data, that is impossible
                raise RuntimeError("Unexpected no-success scenarios")

528
        elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
529 530 531 532 533 534 535 536 537
            if ( self.hasSuccess(tasks, DropFixedTableTask) ):
                self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
                self._state = self.STATE_DB_ONLY
            elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
                self.assertNoTask(tasks, DropFixedTableTask)
                self._state = self.STATE_HAS_DATA
            else: # did not drop table, did not insert data, that is impossible
                raise RuntimeError("Unexpected no-success scenarios")

538 539 540 541 542 543 544 545 546 547
        else:
            raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        logger.debug("New DB state is: {}".format(self._state))

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
548
                task.logDebug("Task success found")
549 550 551 552 553 554
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
555
        exists = False
556 557 558
        for task in tasks :
            if not isinstance(task, cls):
                continue
559
            exists = True # we have a valid instance
560 561
            if task.isSuccess():
                sCnt += 1
562
        if ( exists and sCnt <= 0 ):
563 564
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

565 566 567 568 569 570 571 572 573 574 575
    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise RuntimeError("Unexpected task: {}".format(cls))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

576 577 578 579 580 581 582 583
    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

584 585 586 587
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

588 589 590
    def getCurStep(self):
        return self._curStep

591 592
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
593

594 595
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
596

597 598
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
599

S
Steven Li 已提交
600
class Task():
601 602 603 604 605 606 607
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
        cls.taskSn += 1
        return cls.taskSn

608 609
    def __init__(self, dbState: DbState):
        self._dbState = dbState
610
        self._workerThread = None 
611
        self._err = None
612
        self._curStep = None
613
        self._numRows = None # Number of rows affected
614 615 616

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
617 618 619

    def isSuccess(self):
        return self._err == None
620

621 622 623 624 625 626 627 628 629 630
    def clone(self):
        newTask = self.__class__(self._dbState)
        return newTask

    def logDebug(self, msg):
        self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

    def logInfo(self, msg):
        self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))

631
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
632
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
633

634 635
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
636
        self._workerThread = wt # type: ignore
637 638

        te = wt.getTaskExecutor()
639 640
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
641 642 643 644 645

        self._err = None
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
646
            self.logDebug("[=]Taos Execution exception: {0}".format(err))
647 648
            self._err = err
        except:
649
            self.logDebug("[=]Unexpected exception")
650 651
            raise
        
652
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
S
Steven Li 已提交
653

654
    def execSql(self, sql):
655
        return self._dbState.execute(sql)
656

657 658 659 660
class CreateDbTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

661
class DropDbTask(Task):
662 663 664
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

S
Steven Li 已提交
665
class CreateTableTask(Task):
666
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
667
        tIndex = self._dbState.addTable()
668
        self.logDebug("Creating a table {} ...".format(tIndex))
669
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
670
        self.logDebug("Table {} created.".format(tIndex))
671 672 673 674 675 676
        self._dbState.releaseTable(tIndex)

class CreateFixedTableTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
677

678 679 680 681 682 683
class ReadFixedDataTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

S
Steven Li 已提交
684
class DropTableTask(Task):
685
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
686
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
687
        if ( not tableName ): # May be "False"
688
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
689
            return
690
        self.logInfo("Dropping a table db.{} ...".format(tableName))
691
        wt.execSql("drop table db.{}".format(tableName))
692 693 694 695 696
        
class DropFixedTableTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))
S
Steven Li 已提交
697 698

class AddDataTask(Task):
699
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
700
        ds = self._dbState
701
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
702 703
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
704
            self.logInfo("No table found to add data, skipping...")
705
            return
706
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
707
        self.logDebug("Executing SQL: {}".format(sql))
708 709
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
710
        self.logDebug("Finished adding data")
S
Steven Li 已提交
711

712 713 714 715 716 717
class AddFixedDataTask(Task):
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 

S
Steven Li 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
740 741
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
742 743

    @classmethod
744
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
745 746
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
747
        return random.randrange(start, stop)
S
Steven Li 已提交
748 749 750 751


# Anyone needing to carry out work should simply come here
class WorkDispatcher():
752
    def __init__(self, dbState):
753
        # self.totalNumMethods = 2
S
Steven Li 已提交
754
        self.tasks = [
755
            CreateTableTask(dbState),
756
            DropTableTask(dbState),
S
Steven Li 已提交
757
            AddDataTask(dbState),
S
Steven Li 已提交
758 759 760
        ]

    def throwDice(self):
761 762 763 764
        max = len(self.tasks) - 1 
        dRes = random.randint(0, max)
        # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
        return dRes
S
Steven Li 已提交
765

766
    def pickTask(self):
S
Steven Li 已提交
767
        dice = self.throwDice()
768 769 770 771
        return self.tasks[dice]

    def doWork(self, workerThread):
        task = self.pickTask()
772
        task.execute(workerThread)
S
Steven Li 已提交
773

774
def main():
775 776 777 778 779 780
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
    parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
781 782 783 784
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
785

786
    global gConfig
787 788
    gConfig = parser.parse_args()

789
    global logger
S
Steven Li 已提交
790
    logger = logging.getLogger('myApp')
791 792
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
793 794 795 796
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
797 798
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
799 800 801
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
        WorkDispatcher(dbState),
        dbState
802
        )
803
    tc.run()
804
    dbState.cleanUp()
805 806 807 808
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()