crash_gen.py 49.4 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17
import traceback
18 19 20 21
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
22
import getopt
23
import argparse
24
import copy
S
Steven Li 已提交
25 26 27

import threading
import random
28
import time
S
Steven Li 已提交
29
import logging
30
import datetime
31
import textwrap
S
Steven Li 已提交
32

33
from typing import List
34
from typing import Dict
35

S
Steven Li 已提交
36 37 38 39 40
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

41
import crash_gen
S
Steven Li 已提交
42 43
import taos

44 45 46
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
47

48 49
def runThread(wt: WorkerThread):    
    wt.run()
50

51 52 53 54 55 56 57 58
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
59
class WorkerThread:
60
    def __init__(self, pool: ThreadPool, tid, 
61 62 63 64
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
65
        self._pool = pool
66
        self._tid = tid        
67
        self._tc = tc
S
Steven Li 已提交
68
        # self.threadIdent = threading.get_ident()
69 70
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
71

72
        # Let us have a DB connection of our own
73 74 75
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

76
    def logDebug(self, msg):
S
Steven Li 已提交
77
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
78 79

    def logInfo(self, msg):
S
Steven Li 已提交
80
        logger.info("    TRD[{}] {}".format(self._tid, msg))
81 82

   
83 84
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
85

S
Steven Li 已提交
86
    def start(self):
87
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
88

89
    def run(self): 
S
Steven Li 已提交
90
        # initialization after thread starts, in the thread context
91
        # self.isSleeping = False
92 93
        logger.info("Starting to run thread: {}".format(self._tid))

94
        if ( gConfig.per_thread_db_connection ): # type: ignore
95
            self._dbConn.open()
S
Steven Li 已提交
96

97 98
        self._doTaskLoop()       
        
99
        # clean up
100
        if ( gConfig.per_thread_db_connection ): # type: ignore 
101
            self._dbConn.close()
102

103 104 105
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
106 107 108
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
S
Steven Li 已提交
109
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
110
            self.crossStepGate()   # then per-thread gate, after being tapped
S
Steven Li 已提交
111
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
112
            if not self._tc.isRunning():
S
Steven Li 已提交
113
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
114 115
                break

S
Steven Li 已提交
116
            logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
117
            task = tc.fetchTask()
S
Steven Li 已提交
118
            logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
119
            task.execute(self)
120
            tc.saveExecutedTask(task)
S
Steven Li 已提交
121
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
122
  
S
Steven Li 已提交
123
    def verifyThreadSelf(self): # ensure we are called by this own thread
124
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
125 126 127 128 129 130 131
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
132
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
133 134
            raise RuntimeError("Unexpected dead thread")

135
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
136 137 138 139
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
140
        # Wait again at the "gate", waiting to be "tapped"
S
Steven Li 已提交
141
        logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
142 143
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
144
        
145
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
146 147 148 149

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
150
 
S
Steven Li 已提交
151
        logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
152 153
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
154

155 156 157 158
    def execSql(self, sql): # not "execute", since we are out side the DB context
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
159
            return self._tc.getDbState().getDbConn().execute(sql)
160

161 162 163 164 165
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
    #         return self._dbConn.query(sql)            
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
166

167
class ThreadCoordinator:
168
    def __init__(self, pool, dbState):
169 170
        self._curStep = -1 # first step is 0
        self._pool = pool
171
        # self._wd = wd
172
        self._te = None # prepare for every new step
173 174 175
        self._dbState = dbState
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
176

177
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
178
        self._execStats = ExecutionStats()
S
Steven Li 已提交
179

180 181 182
    def getTaskExecutor(self):
        return self._te

183 184 185
    def getDbState(self) -> DbState :
        return self._dbState

186 187 188
    def crossStepBarrier(self):
        self._stepBarrier.wait()

189 190
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
191 192

        # Coordinate all threads step by step
193 194
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
195 196 197
        self._execStats.startExec() # start the stop watch
        failed = False
        while(self._curStep < maxSteps-1 and not failed):  # maxStep==10, last curStep should be 9
S
Steven Li 已提交
198 199 200
            if not gConfig.debug: 
                print(".", end="", flush=True) # print this only if we are not in debug mode
            logger.debug("[TRD] Main thread going to sleep")
201

202
            # Now ready to enter a step
203 204 205 206
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
            try:
                self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
            except taos.error.ProgrammingError as err:
                if ( err.msg == 'network unavailable' ): # broken DB connection
                    logger.info("DB connection broken, execution failed")
                    traceback.print_stack()
                    failed = True
                    self._te = None # Not running any more
                    self._execStats.registerFailure("Broken DB Connection")
                    # continue # don't do that, need to tap all threads at end, and maybe signal them to stop
                else:
                    raise 
            finally:
                pass
            
222
            self.resetExecutedTasks() # clear the tasks after we are done
223 224

            # Get ready for next step
S
Steven Li 已提交
225
            logger.debug("<-- Step {} finished".format(self._curStep))
226 227
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
228

229
            # A new TE for the new step
230 231
            if not failed: # only if not failed
                self._te = TaskExecutor(self._curStep)
232

S
Steven Li 已提交
233
            logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
234 235
            self.tapAllThreads()

236
        logger.debug("Main thread ready to finish up...")
237 238 239 240 241 242 243 244
        if not failed: # only in regular situations
            self.crossStepBarrier() # Cross it one last time, after all threads finish
            self._stepBarrier.reset()
            logger.debug("Main thread in exclusive zone...")
            self._te = None # No more executor, time to end
            logger.debug("Main thread tapping all threads one last time...")
            self.tapAllThreads() # Let the threads run one last time

245 246
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
247
        logger.info("All worker thread finished")
248 249 250
        self._execStats.endExec()

    def logStats(self):
251
        self._execStats.logStats()
S
Steven Li 已提交
252 253 254

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
255
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
256 257 258 259
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Steven Li 已提交
260
        logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq)))
261
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
262
        for i in wakeSeq:
263
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
264 265
            time.sleep(0) # yield

266 267 268 269 270 271
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
272 273
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
274 275 276 277 278 279 280 281 282
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
        taskType = self.getDbState().pickTaskType() # pick a task type for current state
        return taskType(self.getDbState(), self._execStats) # create a task from it
283 284 285

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
286 287 288 289

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
290 291

# We define a class to run a number of threads in locking steps.
292
class ThreadPool:
293 294 295 296 297
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
298
        # self.dispatcher = WorkDispatcher(dbState) # Obsolete?
299 300 301
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
302
        # self.numWaitingThreads = 0    
303 304
        
    # starting to run all the threads, in locking steps
305
    def createAndStartThreads(self, tc: ThreadCoordinator):
306
        for tid in range(0, self.numThreads): # Create the threads
307
            workerThread = WorkerThread(self, tid, tc)            
308 309 310 311 312 313 314 315
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
316 317 318
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
319
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
320
        self.lastIndex = 0
321
        self._lock = threading.RLock() # our functions may call each other
322
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
323

324 325 326 327 328 329 330 331 332
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
333 334
            # Otherwise we have something
            self.lastIndex += 1
335 336
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
337
            return self.lastIndex
S
Steven Li 已提交
338 339

    def pop(self):
340
        with self._lock:
341
            if ( self.isEmpty() ): 
342 343 344
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
345
            index = self.firstIndex
346
            if ( index in self.inUse ):
347 348
                return False

349 350 351 352 353 354 355
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
356
        with self._lock:
357 358 359 360
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
361
    def allocate(self, i):
362
        with self._lock:
363
            # logger.debug("LQ allocating item {}".format(i))
364 365 366 367
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
368
    def release(self, i):
369
        with self._lock:
370 371
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
372 373 374 375

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
376
    def pickAndAllocate(self):
377 378 379
        if ( self.isEmpty() ):
            return None
        with self._lock:
380 381 382 383
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
384 385
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
386 387
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
388 389 390 391 392
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
393 394
        self._conn = None 
        self._cursor = None
395 396 397 398 399 400 401
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
402 403
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
404

405 406
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
407
        # self._cursor.execute('use db') # note we do this in _findCurrenState
408 409

        # Open connection
410
        self._tdSql = TDSql()
411
        self._tdSql.init(self._cursor)
412 413 414 415 416
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
417 418 419
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
420 421
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
422 423
        # self._cursor.execute('use db')

424 425 426 427 428 429 430
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
431

432
    def execute(self, sql): 
433
        if ( not self.isOpen ):
434
            raise RuntimeError("Cannot execute database commands until connection is open")
435
        return self._tdSql.execute(sql)
S
Steven Li 已提交
436

437
    def query(self, sql) :  # return rows affected
438 439 440
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        return self._tdSql.query(sql)
441
        # results are in: return self._tdSql.queryResult
442

443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    def _queryAny(self, sql) : # actual query result as an int
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        tSql = self._tdSql
        nRows = tSql.query(sql)
        if nRows != 1 :
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
        if tSql.queryRows != 1 or tSql.queryCols != 1:
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
        return tSql.queryResult[0][0]

    def queryScalar(self, sql) -> int :
        return self._queryAny(sql)

    def queryString(self, sql) -> str :
        return self._queryAny(sql)
    
class AnyState:
461
    STATE_INVALID    = -1
462 463 464 465
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
466 467 468 469 470 471 472 473 474 475 476 477 478 479
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
    CAN_CREATE_FIXED_TABLE = 3
    CAN_DROP_FIXED_TABLE = 4
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Steven Li 已提交
480
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
481 482 483 484

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
485 486 487 488 489 490 491 492
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
            raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))

493 494 495
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
496 497 498
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
    def canCreateFixedTable(self):
        return self._info[self.CAN_CREATE_FIXED_TABLE]
    def canDropFixedTable(self):
        return self._info[self.CAN_DROP_FIXED_TABLE]
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
520
                # task.logDebug("Task success found")
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
        for task in tasks :
            if not isinstance(task, cls):
                continue
            exists = True # we have a valid instance
            if task.isSuccess():
                sCnt += 1
        if ( exists and sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
556 557 558 559 560 561
    def hasTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                return True
        return False

562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
            False, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    # def verifyTasksToState(self, tasks, newState):

class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
            True, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

S
Steven Li 已提交
582 583 584 585
    def verifyTasksToState(self, tasks, newState): 
        if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB
            if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks
                self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers
586 587 588 589 590 591 592 593 594 595 596 597 598

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
        self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
        self.assertIfExistThenSuccess(tasks, DropDbTask)
S
Steven Li 已提交
599
        # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
        # Nothing to be said about adding data task
        if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
            # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
            self.assertAtMostOneSuccess(tasks, DropDbTask)
            # self._state = self.STATE_EMPTY
        elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
            # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
            self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
            self.assertNoTask(tasks, DropDbTask) # should have have tried
            # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
            #     # can't say there's add-data attempts, since they may all fail
            #     self._state = self.STATE_TABLE_ONLY
            # else:                    
            #     self._state = self.STATE_HAS_DATA
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
        #     self._state = self.STATE_DB_ONLY  # no change

class StateTableOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
        if ( self.hasSuccess(tasks, DropFixedTableTask) ): # we are able to drop the table
            self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
634 635
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
636
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
637 638 639
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
640
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
641 642 643
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
644 645 646 647 648 649 650 651 652 653 654

class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Steven Li 已提交
655 656 657
        if ( newState.equals(AnyState.STATE_EMPTY) ):
            self.hasSuccess(tasks, DropDbTask)
            self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy
S
Steven Li 已提交
658 659 660
        elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
            if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task
                self.assertNoTask(tasks, DropDbTask) # we must have drop_db task
S
Steven Li 已提交
661 662 663 664 665 666 667 668 669 670 671 672
            self.hasSuccess(tasks, DropFixedTableTask)
            self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # TODO: dicy
        elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            self.assertNoTask(tasks, DropDbTask)
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertNoTask(tasks, AddFixedDataTask)
            # self.hasSuccess(tasks, DeleteDataTasks)
        else:
            self.assertNoTask(tasks, DropDbTask)
            self.assertNoTask(tasks, DropFixedTableTask)
            self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)

673

674 675 676
# State of the database as we believe it to be
class DbState():
    
S
Steven Li 已提交
677 678
    def __init__(self):
        self.tableNumQueue = LinearQueue()
679 680 681
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
682

683
        self._state = StateInvalid() # starting state
684
        self._stateWeights = [1,3,5,10]
685
        
686 687
        # self.openDbServerConnection()
        self._dbConn = DbConn()
688 689 690 691 692 693 694 695 696 697
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if ( err.msg == 'disconnected' ): # cannot open DB connection
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
S
Steven Li 已提交
698
            print("[=] Unexpected exception")
699
            raise        
700
        self._dbConn.resetDb() # drop and recreate DB
701
        self._state = StateEmpty() # initial state, the result of above
702

703 704 705
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
706 707 708
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

709 710 711 712 713
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

714 715 716
    def getFixedTableName(self):
        return "fixed_table"

S
Steven Li 已提交
717 718 719
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

720
    def getNextTick(self):
721 722 723
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
724 725

    def getNextInt(self):
726 727 728 729
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
730
    def getTableNameToDelete(self):
731
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
732 733 734
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
735 736
        return "table_{}".format(tblNum)

737 738 739
    def cleanUp(self):
        self._dbConn.close()      

S
Steven Li 已提交
740 741
    # May be slow, use cautionsly...
    def getTaskTypesAtState(self):        
742
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
S
Steven Li 已提交
743
        firstTaskTypes = []
744
        for tc in allTaskClasses:
S
Steven Li 已提交
745
            # t = tc(self) # create task object            
746
            if tc.canBeginFrom(self._state):
S
Steven Li 已提交
747 748 749 750 751 752 753 754 755 756 757
                firstTaskTypes.append(tc)
        # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy() # have to have these
        for task1 in firstTaskTypes: # each task type gathered so far
            endState = task1.getEndState() # figure the end state
            if endState == None:
                continue
            for tc in allTaskClasses: # what task can further begin from there?
                if tc.canBeginFrom(endState) and (endState not in firstTaskTypes):
                    taskTypes.append(tc) # gather it

758
        if len(taskTypes) <= 0:
S
Steven Li 已提交
759
            raise RuntimeError("No suitable task types found for state: {}".format(self._state))        
760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
        return taskTypes

        # tasks.append(ReadFixedDataTask(self)) # always for everybody
        # if ( self._state == self.STATE_EMPTY ):
        #     tasks.append(CreateDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        # elif ( self._state == self.STATE_DB_ONLY ):
        #     tasks.append(DropDbTask(self))
        #     tasks.append(CreateFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_TABLE_ONLY ):
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
        #     tasks.append(DropFixedTableTask(self))
        #     tasks.append(AddFixedDataTask(self))
        # else:
        #     raise RuntimeError("Unexpected DbState state: {}".format(self._state))
        # return tasks

    def pickTaskType(self):
        taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
S
Steven Li 已提交
786
                weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
787 788 789
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
790
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
791 792 793 794 795 796 797 798 799
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i

800 801
    def _findCurrentState(self):
        dbc = self._dbConn
S
Steven Li 已提交
802
        ts = time.time()
803
        if dbc.query("show databases") == 0 : # no database?!
S
Steven Li 已提交
804
            # logger.debug("Found EMPTY state")
S
Steven Li 已提交
805
            logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
806 807 808
            return StateEmpty()
        dbc.execute("use db") # did not do this when openning connection
        if dbc.query("show tables") == 0 : # no tables
S
Steven Li 已提交
809
            # logger.debug("Found DB ONLY state")
S
Steven Li 已提交
810
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
811
            return StateDbOnly()
S
Steven Li 已提交
812 813
        if dbc.query("SELECT * FROM db.{}".format(self.getFixedTableName()) ) == 0 : # no data
            # logger.debug("Found TABLE_ONLY state")
S
Steven Li 已提交
814
            logger.debug("[STT] TABLE_ONLY found, between {} and {}".format(ts, time.time()))
815 816
            return StateTableOnly()
        else:
S
Steven Li 已提交
817
            # logger.debug("Found HAS_DATA state")
S
Steven Li 已提交
818
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
819
            return StateHasData()
820
    
821 822 823
    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
            return # do nothing
824

825
        self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
826

827 828 829 830
        # Generic Checks, first based on the start state
        if self._state.canCreateDb():
            self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
831

832 833 834
        if self._state.canDropDb():
            self._state.assertIfExistThenSuccess(tasks, DropDbTask)
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
835

836 837 838
        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
839

840 841 842
        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
843

844 845
        # if self._state.canAddData():
        #     self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true actually
846

847 848
        # if self._state.canReadData():
            # Nothing for sure
849

850
        newState = self._findCurrentState()
S
Steven Li 已提交
851
        logger.debug("[STT] New DB state determined: {}".format(newState))
852 853
        self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
        self._state = newState
854

855 856 857 858
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

859 860 861
    def getCurStep(self):
        return self._curStep

862 863
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
864

865 866
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
867

868 869
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
870

S
Steven Li 已提交
871
class Task():
872 873 874 875
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Steven Li 已提交
876 877 878
        Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
879

S
Steven Li 已提交
880
    def __init__(self, dbState: DbState, execStats: ExecutionStats):        
881
        self._dbState = dbState
882
        self._workerThread = None 
883
        self._err = None
884
        self._curStep = None
885
        self._numRows = None # Number of rows affected
886 887 888

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
889
        # logger.debug("Creating new task {}...".format(self._taskNum))
890

891 892
        self._execStats = execStats

893 894
    def isSuccess(self):
        return self._err == None
895

896 897
    def clone(self): # TODO: why do we need this again?
        newTask = self.__class__(self._dbState, self._execStats)
898 899 900
        return newTask

    def logDebug(self, msg):
S
Steven Li 已提交
901
        self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
902 903

    def logInfo(self, msg):
S
Steven Li 已提交
904
        self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
905

906
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
907
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
908

909 910
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
911
        self._workerThread = wt # type: ignore
912 913

        te = wt.getTaskExecutor()
914 915
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
916 917

        self._err = None
918
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
919 920 921
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
S
Steven Li 已提交
922
            self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
923 924
            self._err = err
        except:
S
Steven Li 已提交
925
            self.logDebug("[=] Unexpected exception")
926
            raise
927
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
928
        
929 930
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
931

932
    def execSql(self, sql):
933
        return self._dbState.execute(sql)
934

935 936 937 938 939 940 941
                  
class ExecutionStats:    
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
942 943
        self._execStartTime = None
        self._elapsedTime = 0.0 # total elapsed time
944 945
        self._accRunTime = 0.0 # accumulated run time

946 947 948 949 950 951 952 953 954
        self._failed = False
        self._failureReason = None

    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

976 977 978 979
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

980
    def logStats(self):
981 982 983 984
        logger.info("----------------------------------------------------------------------")
        logger.info("| Crash_Gen test {}, with the following stats:".
            format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
        logger.info("| Task Execution Times (success/total):")
985 986 987
        execTimesAny = 0
        for k, n in self._execTimes.items():            
            execTimesAny += n[1]
988
            logger.info("|    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
989
                
990 991 992 993 994 995 996
        logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
        logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
        logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
        logger.info("----------------------------------------------------------------------")
        
997 998 999 1000 1001 1002 1003 1004 1005 1006


class StateTransitionTask(Task):
    # @classmethod
    # def getAllTaskClasses(cls): # static
    #     return cls.__subclasses__()
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

1007 1008 1009
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1010 1011

    @classmethod
S
Steven Li 已提交
1012
    def getEndState(cls): # returning the class name
1013
        return cls.getInfo()[0]
1014 1015

    @classmethod
1016 1017 1018
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028

    def execute(self, wt: WorkerThread):
        super().execute(wt)
        


class CreateDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1029
            # [AnyState.STATE_EMPTY], # can begin from
S
Steven Li 已提交
1030
            StateDbOnly() # end state
1031 1032
        ]

1033 1034 1035 1036
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1037 1038 1039
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("create database db")

1040 1041 1042 1043
class DropDbTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1044
            # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1045
            StateEmpty()
1046 1047
        ]

1048 1049 1050 1051
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1052 1053
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        wt.execSql("drop database db")
S
Steven Li 已提交
1054
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1055

1056 1057 1058 1059
class CreateFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1060
            # [AnyState.STATE_DB_ONLY],
S
Steven Li 已提交
1061
            StateTableOnly()
1062
        ]
1063

1064 1065 1066 1067
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateFixedTable()

1068 1069 1070
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
S
Steven Li 已提交
1071

1072 1073 1074 1075
class ReadFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1076
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
1077 1078 1079
            None # meaning doesn't affect state
        ]

1080 1081 1082 1083
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1084 1085
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
1086
        wt.execSql("select * from db.{}".format(tblName)) # TODO: analyze result set later
1087 1088
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

1089 1090 1091 1092
class DropFixedTableTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1093
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1094
            StateDbOnly() # meaning doesn't affect state
1095 1096
        ]

1097 1098 1099 1100
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropFixedTable()

1101 1102 1103 1104 1105 1106 1107 1108
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbState.getFixedTableName()        
        wt.execSql("drop table db.{}".format(tblName))

class AddFixedDataTask(StateTransitionTask):
    @classmethod
    def getInfo(cls):
        return [
1109
            # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
S
Steven Li 已提交
1110
            StateHasData()
1111
        ]
1112 1113 1114 1115

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        ds = self._dbState
        sql = "insert into db.{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
        wt.execSql(sql) 


#---------- Non State-Transition Related Tasks ----------#

class CreateTableTask(Task):    
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self._dbState.addTable()
        self.logDebug("Creating a table {} ...".format(tIndex))
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
        self.logDebug("Table {} created.".format(tIndex))
        self._dbState.releaseTable(tIndex)

S
Steven Li 已提交
1133
class DropTableTask(Task):
1134
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1135
        tableName = self._dbState.getTableNameToDelete()
S
Steven Li 已提交
1136
        if ( not tableName ): # May be "False"
1137
            self.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
1138
            return
1139
        self.logInfo("Dropping a table db.{} ...".format(tableName))
1140
        wt.execSql("drop table db.{}".format(tableName))
1141
        
1142

S
Steven Li 已提交
1143 1144

class AddDataTask(Task):
1145
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1146
        ds = self._dbState
1147
        self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
1148 1149
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
1150
            self.logInfo("No table found to add data, skipping...")
1151
            return
1152
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
1153
        self.logDebug("Executing SQL: {}".format(sql))
1154 1155
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
1156
        self.logDebug("Finished adding data")
S
Steven Li 已提交
1157

1158

S
Steven Li 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
1181 1182
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1183 1184

    @classmethod
1185
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
1186 1187
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
1188
        return random.randrange(start, stop)
S
Steven Li 已提交
1189 1190 1191


# Anyone needing to carry out work should simply come here
1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
1214

S
Steven Li 已提交
1215 1216
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Steven Li 已提交
1217 1218 1219
        if ( record.levelno >= logging.INFO ) :
            return True # info or above always log

S
Steven Li 已提交
1220 1221 1222
        msg = record.msg
        # print("type = {}, value={}".format(type(msg), msg))
        # sys.exit()
S
Steven Li 已提交
1223 1224 1225 1226 1227

        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
1228 1229 1230 1231
        return True

        

1232
def main():
1233
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
1234 1235 1236 1237 1238 1239 1240 1241 1242
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
1243 1244 1245 1246
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
1247 1248 1249 1250
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
1251

1252
    global gConfig
1253
    gConfig = parser.parse_args()
1254 1255 1256
    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()
1257

1258
    global logger
1259
    logger = logging.getLogger('CrashGen')
S
Steven Li 已提交
1260
    logger.addFilter(LoggingFilter())
1261 1262
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1263 1264
    else:
        logger.setLevel(logging.INFO)
S
Steven Li 已提交
1265 1266 1267 1268
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
1269 1270
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
1271
        ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
1272
        # WorkDispatcher(dbState), # Obsolete?
1273
        dbState
1274
        )
S
Steven Li 已提交
1275 1276 1277 1278 1279 1280

    # Sandbox testing code
    # dbc = dbState.getDbConn()
    # while True:
    #     rows = dbc.query("show databases") 
    #     print("Rows: {}, time={}".format(rows, time.time()))
1281
    
1282
    tc.run()
1283 1284 1285
    tc.logStats()
    dbState.cleanUp()    
    
S
Steven Li 已提交
1286
    # logger.info("Crash_Gen execution finished")
1287 1288 1289

if __name__ == "__main__":
    main()