crash_gen.py 19.8 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17 18 19 20
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
21
import getopt
22
import argparse
S
Steven Li 已提交
23 24 25 26

import threading
import random
import logging
27
import datetime
S
Steven Li 已提交
28 29 30 31 32 33

from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

34
import crash_gen
S
Steven Li 已提交
35 36
import taos

37 38 39
# Global variables, tried to keep a small number. 
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
S
Steven Li 已提交
40

41 42
def runThread(wt: WorkerThread):    
    wt.run()
43

S
Steven Li 已提交
44
class WorkerThread:
45 46 47 48 49
    def __init__(self, pool: SteppingThreadPool, tid, dbState, 
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
50 51 52
        self._pool = pool
        self._tid = tid
        self._dbState = dbState
53
        self._tc = tc
S
Steven Li 已提交
54
        # self.threadIdent = threading.get_ident()
55 56
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
57

58
        # Let us have a DB connection of our own
59 60 61 62 63
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
64

S
Steven Li 已提交
65
    def start(self):
66
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
67

68
    def run(self): 
S
Steven Li 已提交
69
        # initialization after thread starts, in the thread context
70
        # self.isSleeping = False
71 72
        logger.info("Starting to run thread: {}".format(self._tid))

73
        if ( gConfig.per_thread_db_connection ): # type: ignore
74
            self._dbConn.open()
S
Steven Li 已提交
75

76 77
        self._doTaskLoop()       
        
78
        # clean up
79
        if ( gConfig.per_thread_db_connection ): # type: ignore 
80
            self._dbConn.close()
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
        while True:              
            self._tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            logger.debug("Thread task loop exited barrier...")
            self.crossStepGate()   # then per-thread gate, after being tapped
            logger.debug("Thread task loop exited step gate...")
            if not self._tc.isRunning():
                break

            task = self._tc.fetchTask()
            task.execute(self)
  
S
Steven Li 已提交
96
    def verifyThreadSelf(self): # ensure we are called by this own thread
97
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
98 99 100 101 102 103 104
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
105
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
106 107
            raise RuntimeError("Unexpected dead thread")

108
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
109 110 111 112
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
113
        # Wait again at the "gate", waiting to be "tapped"
114
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
115 116
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
117
        
118
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
119 120 121 122

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
123
 
124
        logger.debug("Tapping worker thread {}".format(self._tid))
125 126
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
127

128
    def execSql(self, sql):
129 130 131 132 133
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execSql(sql)            
        else:
            return self._dbState.getDbConn().execSql(sql)
                  
134 135 136 137 138 139
class ThreadCoordinator:
    def __init__(self, pool, wd: WorkDispatcher):
        self._curStep = -1 # first step is 0
        self._pool = pool
        self._wd = wd
        self._te = None # prepare for every new step
S
Steven Li 已提交
140

141
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
S
Steven Li 已提交
142

143 144 145 146 147 148 149 150
    def getTaskExecutor(self):
        return self._te

    def crossStepBarrier(self):
        self._stepBarrier.wait()

    def run(self, dbState):              
        self._pool.createAndStartThreads(dbState, self)
S
Steven Li 已提交
151 152

        # Coordinate all threads step by step
153 154 155
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
        while(self._curStep < maxSteps):
156
            print(".", end="", flush=True)
S
Steven Li 已提交
157
            logger.debug("Main thread going to sleep")
158

159
            # Now ready to enter a step
160 161 162 163 164 165 166
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
            logger.info("<-- Step {} finished".format(self._curStep))
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
167

168
            self._te = TaskExecutor(self._curStep)
169

170
            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
171 172
            self.tapAllThreads()

173 174 175 176 177 178 179 180 181
        logger.debug("Main thread ready to finish up...")
        self.crossStepBarrier() # Cross it one last time, after all threads finish
        self._stepBarrier.reset()
        logger.debug("Main thread in exclusive zone...")
        self._te = None # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads() # Let the threads run one last time
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
182 183

        logger.info("All threads finished")
184
        print("\r\nFinished")
S
Steven Li 已提交
185 186 187

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
188
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
189 190 191 192 193
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
194
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
195
        for i in wakeSeq:
196
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
197 198
            time.sleep(0) # yield

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
        return self._wd.pickTask()

# We define a class to run a number of threads in locking steps.
class SteppingThreadPool:
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
        self.dispatcher = WorkDispatcher(dbState)
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
        # self.numWaitingThreads = 0
        
        
    # starting to run all the threads, in locking steps
    def createAndStartThreads(self, dbState, tc: ThreadCoordinator):
        for tid in range(0, self.numThreads): # Create the threads
            workerThread = WorkerThread(self, tid, dbState, tc)            
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

S
Steven Li 已提交
233 234 235
# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
236
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
237
        self.lastIndex = 0
238
        self._lock = threading.RLock() # our functions may call each other
239
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
240

241 242 243 244 245 246 247 248 249
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
250 251
            # Otherwise we have something
            self.lastIndex += 1
252 253
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
254
            return self.lastIndex
S
Steven Li 已提交
255 256

    def pop(self):
257
        with self._lock:
258
            if ( self.isEmpty() ): 
259 260 261
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
262
            index = self.firstIndex
263
            if ( index in self.inUse ):
264 265 266 267
                return False

            # if ( index in self.inUse ):
            #     self.inUse.remove(index) # TODO: what about discard?
268

269 270 271 272 273 274 275
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
276
        with self._lock:
277 278 279 280
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
281
    def allocate(self, i):
282
        with self._lock:
283
            # logger.debug("LQ allocating item {}".format(i))
284 285 286 287
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
288
    def release(self, i):
289
        with self._lock:
290 291
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
292 293 294 295

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
296
    def pickAndAllocate(self):
297 298 299
        if ( self.isEmpty() ):
            return None
        with self._lock:
300 301 302 303
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
304 305
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
306 307
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
308 309 310 311 312
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
313 314
        self._conn = None 
        self._cursor = None
315 316 317 318 319 320 321
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
322 323
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
324

325 326 327 328 329
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
330
        self._tdSql = TDSql()
331
        self._tdSql.init(self._cursor)
332 333 334 335 336
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
337 338 339 340 341 342
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
        self._cursor.execute('create database db')
        # self._cursor.execute('use db')

343 344 345 346 347 348 349
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
350

351 352 353
    def execSql(self, sql): 
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
354
        return self._tdSql.execute(sql)
S
Steven Li 已提交
355 356 357 358 359

# State of the database as we believe it to be
class DbState():
    def __init__(self):
        self.tableNumQueue = LinearQueue()
360 361 362 363 364 365 366 367
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()

        # self.openDbServerConnection()
        self._dbConn = DbConn()
        self._dbConn.open() 
        self._dbConn.resetDb() # drop and recreate DB
368

369 370 371
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
372 373 374
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

375 376 377 378 379
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

S
Steven Li 已提交
380 381 382
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

383
    def getNextTick(self):
384 385 386
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
387 388

    def getNextInt(self):
389 390 391 392
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
393
    def getTableNameToDelete(self):
394
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
395 396 397
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
398 399
        return "table_{}".format(tblNum)

400 401 402 403 404 405
    def execSql(self, sql): # using the main DB connection
        return self._dbConn.execSql(sql)

    def cleanUp(self):
        self._dbConn.close()      

406 407 408 409 410 411 412 413 414 415 416 417 418 419
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

    def execute(self, task, wt: WorkerThread): # execute a task on a thread
        task.execute(self, wt)

    def logInfo(self, msg):
        logger.info("    T[{}.x]: ".format(self._curStep) + msg)

    def logDebug(self, msg):
        logger.debug("    T[{}.x]: ".format(self._curStep) + msg)


S
Steven Li 已提交
420
class Task():
421 422 423
    def __init__(self, dbState):
        self.dbState = dbState

424
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
425 426
        raise RuntimeError("To be implemeted by child classes")

427 428 429 430 431 432
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()

        te = wt.getTaskExecutor()
        self._executeInternal(te, wt) # TODO: no return value?
        te.logDebug("[X] task execution completed")
S
Steven Li 已提交
433

434 435 436
    def execSql(self, sql):
        return self.dbState.execute(sql)

S
Steven Li 已提交
437
class CreateTableTask(Task):
438 439 440
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tIndex = self.dbState.addTable()
        te.logDebug("Creating a table {} ...".format(tIndex))
441
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
442 443
        te.logDebug("Table {} created.".format(tIndex))
        self.dbState.releaseTable(tIndex)
S
Steven Li 已提交
444 445

class DropTableTask(Task):
446 447
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tableName = self.dbState.getTableNameToDelete()
S
Steven Li 已提交
448
        if ( not tableName ): # May be "False"
449
            te.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
450
            return
451
        te.logInfo("Dropping a table db.{} ...".format(tableName))
452
        wt.execSql("drop table db.{}".format(tableName))
S
Steven Li 已提交
453 454

class AddDataTask(Task):
455
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Steven Li 已提交
456
        ds = self.dbState
457
        te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
458 459
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
460
            te.logInfo("No table found to add data, skipping...")
461
            return
462
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
463
        te.logDebug("Executing SQL: {}".format(sql))
464 465
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
466
        te.logDebug("Finished adding data")
S
Steven Li 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501

# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
    def throw(cls, max): # get 0 to max-1
        return cls.throwRange(0, max)

    @classmethod
    def throwRange(cls, min, max): # up to max-1
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
        return random.randrange(min, max)


# Anyone needing to carry out work should simply come here
class WorkDispatcher():
502
    def __init__(self, dbState):
503
        # self.totalNumMethods = 2
S
Steven Li 已提交
504
        self.tasks = [
505
            CreateTableTask(dbState),
506
            DropTableTask(dbState),
S
Steven Li 已提交
507
            AddDataTask(dbState),
S
Steven Li 已提交
508 509 510
        ]

    def throwDice(self):
511 512 513 514
        max = len(self.tasks) - 1 
        dRes = random.randint(0, max)
        # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
        return dRes
S
Steven Li 已提交
515

516
    def pickTask(self):
S
Steven Li 已提交
517
        dice = self.throwDice()
518 519 520 521
        return self.tasks[dice]

    def doWork(self, workerThread):
        task = self.pickTask()
522
        task.execute(workerThread)
S
Steven Li 已提交
523

524
def main():
525 526 527 528 529 530
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
    parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
531 532 533 534
    parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
                        help='Maximum number of steps to run (default: 100)')
    parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
                        help='Number of threads to run (default: 10)')
535

536
    global gConfig
537 538
    gConfig = parser.parse_args()

539
    global logger
S
Steven Li 已提交
540
    logger = logging.getLogger('myApp')
541 542
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
543 544 545 546
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    dbState = DbState()
547 548 549 550 551 552
    Dice.seed(0) # initial seeding of dice
    tc = ThreadCoordinator(
        SteppingThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), 
        WorkDispatcher(dbState) 
        )
    tc.run(dbState)
553
    dbState.cleanUp()
554 555 556 557
    logger.info("Finished running thread pool")

if __name__ == "__main__":
    main()