crash_gen.py 48.2 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17
import traceback
18 19 20 21
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
22
import getopt
23
import argparse
24
import copy
S
Steven Li 已提交
25 26 27

import threading
import random
28
import time
S
Steven Li 已提交
29
import logging
30
import datetime
31
import textwrap
S
Steven Li 已提交
32

33
from typing import List
34
from typing import Dict
35

S
Steven Li 已提交
36 37 38 39 40
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

41
import crash_gen
S
Steven Li 已提交
42 43
import taos

44 45 46
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
47

48 49
def runThread(wt: WorkerThread):    
    wt.run()
50

51 52 53 54 55 56 57 58
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
59
class WorkerThread:
60
    def __init__(self, pool: ThreadPool, tid, 
61 62 63 64
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
65
        self._pool = pool
66
        self._tid = tid        
67
        self._tc = tc
S
Steven Li 已提交
68
        # self.threadIdent = threading.get_ident()
69 70
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
71

72
        # Let us have a DB connection of our own
73 74 75
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

76
    def logDebug(self, msg):
S
Steven Li 已提交
77
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
78 79

    def logInfo(self, msg):
S
Steven Li 已提交
80
        logger.info("    TRD[{}] {}".format(self._tid, msg))
81 82

   
83 84
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
85

S
Steven Li 已提交
86
    def start(self):
87
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
88

89
    def run(self): 
S
Steven Li 已提交
90
        # initialization after thread starts, in the thread context
91
        # self.isSleeping = False
92 93
        logger.info("Starting to run thread: {}".format(self._tid))

94
        if ( gConfig.per_thread_db_connection ): # type: ignore
95
            self._dbConn.open()
S
Steven Li 已提交
96

97 98
        self._doTaskLoop()       
        
99
        # clean up
100
        if ( gConfig.per_thread_db_connection ): # type: ignore 
101
            self._dbConn.close()
102

103 104 105
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
106 107 108
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
S
Steven Li 已提交
109
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
110
            self.crossStepGate()   # then per-thread gate, after being tapped
S
Steven Li 已提交
111
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
112
            if not self._tc.isRunning():
S
Steven Li 已提交
113
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
114 115
                break

S
Steven Li 已提交
116
            logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
117
            task = tc.fetchTask()
S
Steven Li 已提交
118
            logger.debug("[TRD] Worker thread [{}] about to execute task".format(self._tid))
119
            task.execute(self)
120
            tc.saveExecutedTask(task)
S
Steven Li 已提交
121
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
122
  
S
Steven Li 已提交
123
    def verifyThreadSelf(self): # ensure we are called by this own thread
124
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
125 126 127 128 129 130 131
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
132
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
133 134
            raise RuntimeError("Unexpected dead thread")

135
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
136 137 138 139
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
140
        # Wait again at the "gate", waiting to be "tapped"
S
Steven Li 已提交
141
        logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
142 143
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
144
        
145
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
146 147 148 149

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
150
 
S
Steven Li 已提交
151
        logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
152 153
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
154

155 156 157 158
    def execSql(self, sql): # not "execute", since we are out side the DB context
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
159
            return self._tc.getDbState().getDbConn().execute(sql)
160

161 162 163 164 165
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
    #         return self._dbConn.query(sql)            
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
166

167
class ThreadCoordinator:
168
    def __init__(self, pool, dbState):
169 170
        self._curStep = -1 # first step is 0
        self._pool = pool
171
        # self._wd = wd
172
        self._te = None # prepare for every new step
173 174 175
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
176

177
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
178
        self._execStats = ExecutionStats()
S
Steven Li 已提交
179

180 181 182
    def getTaskExecutor(self):
        return self._te

183 184 185
    def getDbState(self) -> DbState :
        return self._dbState

186 187 188
    def crossStepBarrier(self):
        self._stepBarrier.wait()

189 190
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
191 192

        # Coordinate all threads step by step
193 194
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
195 196 197
        self._execStats.startExec() # start the stop watch
        failed = False
        while(self._curStep < maxSteps-1 and not failed):  # maxStep==10, last curStep should be 9
S
Steven Li 已提交
198 199 200
            if not gConfig.debug: 
                print(".", end="", flush=True) # print this only if we are not in debug mode
            logger.debug("[TRD] Main thread going to sleep")
201

202
            # Now ready to enter a step
203 204 205 206
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
            try:
                self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
            except taos.error.ProgrammingError as err:
                if ( err.msg == 'network unavailable' ): # broken DB connection
                    logger.info("DB connection broken, execution failed")
                    traceback.print_stack()
                    failed = True
                    self._te = None # Not running any more
                    self._execStats.registerFailure("Broken DB Connection")
                    # continue # don't do that, need to tap all threads at end, and maybe signal them to stop
                else:
                    raise 
            finally:
                pass
            
222
            self.resetExecutedTasks() # clear the tasks after we are done
223 224

            # Get ready for next step
S
Steven Li 已提交
225
            logger.debug("<-- Step {} finished".format(self._curStep))
226 227
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
228

229
            # A new TE for the new step
230 231
            if not failed: # only if not failed
                self._te = TaskExecutor(self._curStep)
232

S
Steven Li 已提交
233
            logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
234 235
            self.tapAllThreads()

236
        logger.debug("Main thread ready to finish up...")
237 238 239 240 241 242 243 244
        if not failed: # only in regular situations
            self.crossStepBarrier() # Cross it one last time, after all threads finish
            self._stepBarrier.reset()
            logger.debug("Main thread in exclusive zone...")
            self._te = None # No more executor, time to end
            logger.debug("Main thread tapping all threads one last time...")
            self.tapAllThreads() # Let the threads run one last time

245 246
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
247
        logger.info("All worker thread finished")
248 249 250
        self._execStats.endExec()

    def logStats(self):
251
        self._execStats.logStats()
S
Steven Li 已提交
252 253 254

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
255
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
256 257 258 259
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Steven Li 已提交
260
        logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
261
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
262
        for i in wakeSeq:
263
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
264 265
            time.sleep(0) # yield

266 267 268 269 270 271
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
272 273
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
274 275 276 277 278 279 280 281 282
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
        taskType = self.getDbState().pickTaskType() # pick a task type for current state
        return taskType(self.getDbState(), self._execStats) # create a task from it
283 284 285

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
286 287 288 289

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
290 291

# We define a class to run a number of threads in locking steps.
292
class ThreadPool:
293 294 295 296 297
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
298
        # self.dispatcher = WorkDispatcher(dbState) # Obsolete?
299 300 301
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
302
        # self.numWaitingThreads = 0    
303 304
        
    # starting to run all the threads, in locking steps
305
    def createAndStartThreads(self, tc: ThreadCoordinator):
306
        for tid in range(0, self.numThreads): # Create the threads
307
            workerThread = WorkerThread(self, tid, tc)            
308 309 310 311 312 313 314 315
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
316 317 318
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
319
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
320
        self.lastIndex = 0
321
        self._lock = threading.RLock() # our functions may call each other
322
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
323

324 325 326 327 328 329 330 331 332
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
333 334
            # Otherwise we have something
            self.lastIndex += 1
335 336
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
337
            return self.lastIndex
S
Steven Li 已提交
338 339

    def pop(self):
340
        with self._lock:
341
            if ( self.isEmpty() ): 
342 343 344
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
345
            index = self.firstIndex
346
            if ( index in self.inUse ):
347 348
                return False

349 350 351 352 353 354 355
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
356
        with self._lock:
357 358 359 360
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
361
    def allocate(self, i):
362
        with self._lock:
363
            # logger.debug("LQ allocating item {}".format(i))
364 365 366 367
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
368
    def release(self, i):
369
        with self._lock:
370 371
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
372 373 374 375

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
376
    def pickAndAllocate(self):
377 378 379
        if ( self.isEmpty() ):
            return None
        with self._lock:
380 381 382 383
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
384 385
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
386 387
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
388 389 390 391 392
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
393 394
        self._conn = None 
        self._cursor = None
395 396 397 398 399 400 401
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
402 403
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
404

405 406
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
407
        # self._cursor.execute('use db') # note we do this in _findCurrenState
408 409

        # Open connection
410
        self._tdSql = TDSql()
411
        self._tdSql.init(self._cursor)
412 413 414 415 416
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
417 418 419
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
420 421
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
422 423
        # self._cursor.execute('use db')

424 425 426 427 428 429 430
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
431

432
    def execute(self, sql): 
433
        if ( not self.isOpen ):
434
            raise RuntimeError("Cannot execute database commands until connection is open")
435
        return self._tdSql.execute(sql)
S
Steven Li 已提交
436

437
    def query(self, sql) :  # return rows affected
438 439 440
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        return self._tdSql.query(sql)
441
        # results are in: return self._tdSql.queryResult
442

443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    def _queryAny(self, sql) : # actual query result as an int
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        tSql = self._tdSql
        nRows = tSql.query(sql)
        if nRows != 1 :
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
        if tSql.queryRows != 1 or tSql.queryCols != 1:
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
        return tSql.queryResult[0][0]

    def queryScalar(self, sql) -> int :
        return self._queryAny(sql)

    def queryString(self, sql) -> str :
        return self._queryAny(sql)
    
class AnyState:
461
    STATE_INVALID    = -1
462 463 464 465
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
466 467 468 469 470 471 472 473 474 475 476 477 478 479
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
    CAN_CREATE_FIXED_TABLE = 3
    CAN_DROP_FIXED_TABLE = 4
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Steven Li 已提交
480
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
481 482 483 484

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
485 486 487 488 489 490 491 492
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
            raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))

493 494 495
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
496 497 498
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
    def canCreateFixedTable(self):
        return self._info[self.CAN_CREATE_FIXED_TABLE]
    def canDropFixedTable(self):
        return self._info[self.CAN_DROP_FIXED_TABLE]
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
520
                # task.logDebug("Task success found")
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
        for task in tasks :
            if not isinstance(task, cls):
                continue
            exists = True # we have a valid instance
            if task.isSuccess():
                sCnt += 1
        if ( exists and sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
            False, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    # def verifyTasksToState(self, tasks, newState):

class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
            True, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, CreateDbTask) ):
            self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really valid for massively parrallel tasks

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
        self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
        self.assertIfExistThenSuccess(tasks, DropDbTask)
        self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) 
        # Nothing to be said about adding data task
        if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
            # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            # self._state = self.STATE_EMPTY
        elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
            # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
            self.assertNoTask(tasks, DropDbTask) # should have have tried
            # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
            #     # can't say there's add-data attempts, since they may all fail
            #     self._state = self.STATE_TABLE_ONLY
            # else:                    
            #     self._state = self.STATE_HAS_DATA
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
        #     self._state = self.STATE_DB_ONLY  # no change

class StateTableOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
            self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
            # self._state = self.STATE_DB_ONLY
        elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
            self.assertNoTask(tasks, DropFixedTableTask)
            # self._state = self.STATE_HAS_DATA
        elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertNoTask(tasks, AddFixedDataTask)
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
634 635 636
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
637 638 639 640 641 642 643 644 645 646 647

class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Steven Li 已提交
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
        if ( newState.equals(AnyState.STATE_EMPTY) ):
            self.hasSuccess(tasks, DropDbTask)
            self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy
        elif ( newState.equals(AnyState.STATE_DB_ONLY) ):
            self.assertNoTask(tasks, DropDbTask)
            self.hasSuccess(tasks, DropFixedTableTask)
            self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # TODO: dicy
        elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            self.assertNoTask(tasks, DropDbTask)
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertNoTask(tasks, AddFixedDataTask)
            # self.hasSuccess(tasks, DeleteDataTasks)
        else:
            self.assertNoTask(tasks, DropDbTask)
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)

665

666 667 668
# State of the database as we believe it to be
class DbState():
    
S
Steven Li 已提交
669 670
    def __init__(self):
        self.tableNumQueue = LinearQueue()
671 672 673
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
674

675
        self._state = StateInvalid() # starting state
676
        self._stateWeights = [1,3,5,10]
677
        
678 679
        # self.openDbServerConnection()
        self._dbConn = DbConn()
680 681 682 683 684 685 686 687 688 689
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if ( err.msg == 'disconnected' ): # cannot open DB connection
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
S
Steven Li 已提交
690
            print("[=] Unexpected exception")
691
            raise        
692
        self._dbConn.resetDb() # drop and recreate DB
693
        self._state = StateEmpty() # initial state, the result of above
694

695 696 697
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
698 699 700
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

701 702 703 704 705
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

706 707 708
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
709 710 711
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

712
    def getNextTick(self):
713 714 715
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
716 717

    def getNextInt(self):
718 719 720 721
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
722
    def getTableNameToDelete(self):
723
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
724 725 726
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
727 728
        return "table_{}".format(tblNum)

729 730 731
    def cleanUp(self):
        self._dbConn.close()      

S
Steven Li 已提交
732 733
    # May be slow, use cautionsly...
    def getTaskTypesAtState(self):        
734
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
S
Steven Li 已提交
735
        firstTaskTypes = []
736
        for tc in allTaskClasses:
S
Steven Li 已提交
737
            # t = tc(self) # create task object            
738
            if tc.canBeginFrom(self._state):
S
Steven Li 已提交
739 740 741 742 743 744 745 746 747 748 749
                firstTaskTypes.append(tc)
        # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy() # have to have these
        for task1 in firstTaskTypes: # each task type gathered so far
            endState = task1.getEndState() # figure the end state
            if endState == None:
                continue
            for tc in allTaskClasses: # what task can further begin from there?
                if tc.canBeginFrom(endState) and (endState not in firstTaskTypes):
                    taskTypes.append(tc) # gather it

750
        if len(taskTypes) <= 0:
S
Steven Li 已提交
751
            raise RuntimeError("No suitable task types found for state: {}".format(self._state))        
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
        return taskTypes

        # tasks.append(ReadFixedDataTask(self)) # always for everybody
        # if ( self._state == self.STATE_EMPTY ):
        #     tasks.append(CreateDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        # elif ( self._state == self.STATE_DB_ONLY ):
        #     tasks.append(DropDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_TABLE_ONLY ):
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # else:
        #     raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        # return tasks

    def pickTaskType(self):
        taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
S
Steven Li 已提交
778
                weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
779 780 781
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
782
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
783 784 785 786 787 788 789 790 791
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i

792 793 794
    def _findCurrentState(self):
        dbc = self._dbConn
        if dbc.query("show databases") == 0 : # no database?!
S
Steven Li 已提交
795
            # logger.debug("Found EMPTY state")
796 797 798
            return StateEmpty()
        dbc.execute("use db") # did not do this when openning connection
        if dbc.query("show tables") == 0 : # no tables
S
Steven Li 已提交
799
            # logger.debug("Found DB ONLY state")
800
            return StateDbOnly()
S
Steven Li 已提交
801 802
        if dbc.query("SELECT * FROM db.{}".format(self.getFixedTableName()) ) == 0 : # no data
            # logger.debug("Found TABLE_ONLY state")
803 804
            return StateTableOnly()
        else:
S
Steven Li 已提交
805
            # logger.debug("Found HAS_DATA state")
806
            return StateHasData()
807
    
808 809 810
    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
811

812
        self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
813

814 815 816 817
        # Generic Checks, first based on the start state
        if self._state.canCreateDb():
            self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
818

819 820 821
        if self._state.canDropDb():
            self._state.assertIfExistThenSuccess(tasks, DropDbTask)
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
822

823 824 825
        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
826

827 828 829
        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
830

831 832
        # if self._state.canAddData():
        #     self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true actually
833

834 835
        # if self._state.canReadData():
            # Nothing for sure
836

837 838
        newState = self._findCurrentState()
        self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
839

840 841
        self._state = newState
        logger.debug("New DB state is: {}".format(self._state))
842

843 844 845 846
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

847 848 849
    def getCurStep(self):
        return self._curStep

850 851
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
852

853 854
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
855

856 857
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
858

S
Steven Li 已提交
859
class Task():
860 861 862 863
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Steven Li 已提交
864 865 866
        Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
867

S
Steven Li 已提交
868
    def __init__(self, dbState: DbState, execStats: ExecutionStats):        
869
        self._dbState = dbState
870
        self._workerThread = None 
871
        self._err = None
872
        self._curStep = None
873
        self._numRows = None # Number of rows affected
874 875 876

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
877
        # logger.debug("Creating new task {}...".format(self._taskNum))
878

879 880
        self._execStats = execStats

881 882
    def isSuccess(self):
        return self._err == None
883

884 885
    def clone(self): # TODO: why do we need this again?
        newTask = self.__class__(self._dbState, self._execStats)
886 887 888
        return newTask

    def logDebug(self, msg):
S
Steven Li 已提交
889
        self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
890 891

    def logInfo(self, msg):
S
Steven Li 已提交
892
        self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
893

894
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
895
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
896

897 898
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
899
        self._workerThread = wt # type: ignore
900 901

        te = wt.getTaskExecutor()
902 903
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
904 905

        self._err = None
906
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
907 908 909
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
S
Steven Li 已提交
910
            self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
911 912
            self._err = err
        except:
S
Steven Li 已提交
913
            self.logDebug("[=] Unexpected exception")
914
            raise
915
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
916
        
917 918
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
919

920
    def execSql(self, sql):
921
        return self._dbState.execute(sql)
922

923 924 925 926 927 928 929
                  
class ExecutionStats:    
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
930 931
        self._execStartTime = None
        self._elapsedTime = 0.0 # total elapsed time
932 933
        self._accRunTime = 0.0 # accumulated run time

934 935 936 937 938 939 940 941 942
        self._failed = False
        self._failureReason = None

    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

964 965 966 967
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

968
    def logStats(self):
969 970 971 972
        logger.info("----------------------------------------------------------------------")
        logger.info("| Crash_Gen test {}, with the following stats:".
            format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
        logger.info("| Task Execution Times (success/total):")
973 974 975
        execTimesAny = 0
        for k, n in self._execTimes.items():            
            execTimesAny += n[1]
976
            logger.info("|    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
977
                
978 979 980 981 982 983 984
        logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
        logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
        logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
        logger.info("----------------------------------------------------------------------")
        
985 986 987 988 989 990 991 992 993 994


class StateTransitionTask(Task):
    # @classmethod
    # def getAllTaskClasses(cls): # static
    #     return cls.__subclasses__()
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

995 996 997
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
998 999

    @classmethod
S
Steven Li 已提交
1000
    def getEndState(cls): # returning the class name
1001
        return cls.getInfo()[0]
1002 1003

    @classmethod
1004 1005 1006
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016

    def execute(self, wt: WorkerThread):
        super().execute(wt)
        


class CreateDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1017
            # [AnyState.STATE_EMPTY], # can begin from
S
Steven Li 已提交
1018
            StateDbOnly() # end state
1019 1020
        ]

1021 1022 1023 1024
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1025 1026 1027
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

1028 1029 1030 1031
class DropDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1032
            # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1033
            StateEmpty()
1034 1035
        ]

1036 1037 1038 1039
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1040 1041 1042
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")

1043 1044 1045 1046
class CreateFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1047
            # [AnyState.STATE_DB_ONLY],
S
Steven Li 已提交
1048
            StateTableOnly()
1049
        ]
1050

1051 1052 1053 1054
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateFixedTable()

1055 1056 1057
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
1058

1059 1060 1061 1062
class ReadFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1063
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
1064 1065 1066
            None # meaning doesn't affect state
        ]

1067 1068 1069 1070
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1071 1072
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
1073
        wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later
1074 1075
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

1076 1077 1078 1079
class DropFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1080
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1081
            StateDbOnly() # meaning doesn't affect state
1082 1083
        ]

1084 1085 1086 1087
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropFixedTable()

1088 1089 1090 1091 1092 1093 1094 1095
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))

class AddFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1096
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1097
            StateHasData()
1098
        ]
1099 1100 1101 1102

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 


#---------- Non State-Transition Related Tasks ----------#

class CreateTableTask(Task):    
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self._dbState.addTable()
        self.logDebug("Creating a table {} ...".format(tIndex))
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
        self.logDebug("Table {} created.".format(tIndex))
        self._dbState.releaseTable(tIndex)

S
Steven Li 已提交
1120
class DropTableTask(Task):
1121
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1122
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
1123
        if ( not tableName ): # May be "False"
1124
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
1125
            return
1126
        self.logInfo("Dropping a table db.{} ...".format(tableName))
1127
        wt.execSql("drop table db.{}".format(tableName))
1128
        
1129

S
Steven Li 已提交
1130 1131

class AddDataTask(Task):
1132
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1133
        ds = self._dbState
1134
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
1135 1136
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
1137
            self.logInfo("No table found to add data, skipping...")
1138
            return
1139
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
1140
        self.logDebug("Executing SQL: {}".format(sql))
1141 1142
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
1143
        self.logDebug("Finished adding data")
S
Steven Li 已提交
1144

1145

S
Steven Li 已提交
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
1168 1169
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1170 1171

    @classmethod
1172
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
1173 1174
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
1175
        return random.randrange(start, stop)
S
Steven Li 已提交
1176 1177 1178


# Anyone needing to carry out work should simply come here
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
1201

S
Steven Li 已提交
1202 1203
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Steven Li 已提交
1204 1205 1206
        if ( record.levelno >= logging.INFO ) :
            return True # info or above always log

S
Steven Li 已提交
1207 1208 1209 1210 1211 1212 1213 1214 1215
        msg = record.msg
        # print("type = {}, value={}".format(type(msg), msg))
        # sys.exit()
        if msg.startswith("[TRD]"):
            return False
        return True

        

1216
def main():
1217
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
1218 1219 1220 1221 1222 1223 1224 1225 1226
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
1227 1228 1229 1230
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
1231 1232 1233 1234
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
1235

1236
    global gConfig
1237
    gConfig = parser.parse_args()
1238 1239 1240
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()
1241

1242
    global logger
1243
    logger = logging.getLogger('CrashGen')
S
Steven Li 已提交
1244
    # logger.addFilter(LoggingFilter())
1245 1246
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1247 1248
    else:
        logger.setLevel(logging.INFO)
S
Steven Li 已提交
1249 1250 1251 1252
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
1253 1254
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
1255
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
1256
        # WorkDispatcher(dbState), # Obsolete?
1257
        dbState
1258
        )
1259
    
1260
    tc.run()
1261 1262 1263
    tc.logStats()
    dbState.cleanUp()    
    
S
Steven Li 已提交
1264
    # logger.info("Crash_Gen execution finished")
1265 1266 1267

if __name__ == "__main__":
    main()