crash_gen.py 18.8 KB
Newer Older
S
Steven Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#!/usr/bin/python3
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
import sys
import getopt
16
import argparse
S
Steven Li 已提交
17 18 19 20

import threading
import random
import logging
21
import datetime
S
Steven Li 已提交
22 23 24 25 26 27 28 29 30

from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

import taos


31 32 33 34
# Command-line/Environment Configurations
gConfig = None # will set a bit later

def runThread(workerThread):    
S
Steven Li 已提交
35 36
    workerThread.run()

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
# Used by one process to block till another is ready
# class Baton:
#     def __init__(self):
#         self._lock = threading.Lock() # control access to object
#         self._baton = threading.Condition() # let thread block
#         self._hasGiver = False
#         self._hasTaker = False

#     def give(self):
#         with self._lock:
#             if ( self._hasGiver ): # already?
#                 raise RuntimeError("Cannot double-give a baton")
#             self._hasGiver = True

#         self._settle() # may block, OUTSIDE self lock

#     def take(self):
#         with self._lock:
#             if ( self._hasTaker):
#                 raise RuntimeError("Cannot double-take a baton")
#             self._hasTaker = True

#         self._settle()

#     def _settle(self):



S
Steven Li 已提交
65
class WorkerThread:
66
    def __init__(self, pool, tid, dbState): # note: main thread context!
67 68 69 70
        self._curStep = -1 
        self._pool = pool
        self._tid = tid
        self._dbState = dbState
S
Steven Li 已提交
71
        # self.threadIdent = threading.get_ident()
72 73
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
74

75
        # Let us have a DB connection of our own
76
        if ( gConfig.per_thread_db_connection ):
77
            self._dbConn = DbConn()        
78

S
Steven Li 已提交
79
    def start(self):
80
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
81 82 83

    def run(self):
        # initialization after thread starts, in the thread context
84
        # self.isSleeping = False
85 86 87 88 89
        logger.info("Starting to run thread: {}".format(self._tid))

        if ( gConfig.per_thread_db_connection ):
            self._dbConn.open()
            # self._dbConn.resetDb()
S
Steven Li 已提交
90

91
        while self._curStep < self._pool.maxSteps:
S
Steven Li 已提交
92 93 94 95
            # stepNo = self.pool.waitForStep() # Step to run
            self.crossStepGate()  # self.curStep will get incremented
            self.doWork()

96
        # clean up
97 98
        if ( gConfig.per_thread_db_connection ):
            self._dbConn.close()
99

S
Steven Li 已提交
100
    def verifyThreadSelf(self): # ensure we are called by this own thread
101
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
102 103 104 105 106 107 108
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
109
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
110 111
            raise RuntimeError("Unexpected dead thread")

112 113 114
    # def verifyIsSleeping(self, isSleeping):
    #     if ( isSleeping != self.isSleeping ):
    #         raise RuntimeError("Unexpected thread sleep status")
S
Steven Li 已提交
115

116
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
117 118 119
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
120
        # self.verifyIsSleeping(False) # has to be awake
S
Steven Li 已提交
121

122
        # logger.debug("Worker thread {} about to cross pool barrier".format(self._tid))
123
        # self.isSleeping = True # TODO: maybe too early?
124
        self._pool.crossPoolBarrier() # wait for all other threads
S
Steven Li 已提交
125
        
126
        # Wait again at the "gate", waiting to be "tapped"
127
        # logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
128 129
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
130
        
131
        # logger.debug("Worker thread {} woke up".format(self._tid))
S
Steven Li 已提交
132
        # Someone will wake us up here
133
        self._curStep += 1  # off to a new step...
S
Steven Li 已提交
134 135 136 137

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
138
        # self.verifyIsSleeping(True) # has to be sleeping
S
Steven Li 已提交
139

140
        logger.debug("Tapping worker thread {}".format(self._tid))
141
        # self.stepGate.acquire()
S
Steven Li 已提交
142
        # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid))
143
        self._stepGate.set() # wake up!
S
Steven Li 已提交
144
        # logger.debug("Tapping worker thread {}, notified!".format(self.tid))
145 146
        # self.isSleeping = False # No race condition for sure
        # self.stepGate.release() # this finishes before .wait() can return
S
Steven Li 已提交
147 148 149 150
        # logger.debug("Tapping worker thread {}, lock released".format(self.tid))
        time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release

    def doWork(self):
151
        self.logInfo("Thread starting an execution")
152
        self._pool.dispatcher.doWork(self)
153

154 155 156 157 158 159
    def logInfo(self, msg):
        logger.info("    T[{}.{}]: ".format(self._curStep, self._tid) + msg)

    def logDebug(self, msg):
        logger.debug("    T[{}.{}]: ".format(self._curStep, self._tid) + msg)

160
    def execSql(self, sql):
161 162 163 164 165
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execSql(sql)            
        else:
            return self._dbState.getDbConn().execSql(sql)
                  
S
Steven Li 已提交
166 167 168

# We define a class to run a number of threads in locking steps.
class SteppingThreadPool:
169
    def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
S
Steven Li 已提交
170 171 172 173
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        self.funcSequencer = funcSequencer
        # Internal class variables
174
        self.dispatcher = WorkDispatcher(self, dbState)
S
Steven Li 已提交
175 176 177
        self.curStep = 0
        self.threadList = []
        # self.stepGate = threading.Condition() # Gate to hold/sync all threads
178 179
        # self.numWaitingThreads = 0
        
S
Steven Li 已提交
180
        # Thread coordination
181 182
        self._lock = threading.RLock() # lock to control access (e.g. even reading it is dangerous)
        self._poolBarrier = threading.Barrier(numThreads + 1) # do nothing before crossing this, except main thread
S
Steven Li 已提交
183 184

    # starting to run all the threads, in locking steps
185 186 187
    def run(self):                
        for tid in range(0, self.numThreads): # Create the threads
            workerThread = WorkerThread(self, tid, dbState)            
S
Steven Li 已提交
188 189 190 191 192 193
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

        # Coordinate all threads step by step
        self.curStep = -1 # not started yet
        while(self.curStep < self.maxSteps):
194
            print(".", end="", flush=True)
S
Steven Li 已提交
195
            logger.debug("Main thread going to sleep")
196

197 198 199 200 201 202 203 204 205 206 207
            # Now ready to enter a step
            self.crossPoolBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._poolBarrier.reset() # Other worker threads should now be at the "gate"            

            # Rare chance, when all threads should be blocked at the "step gate" for each thread
            logger.info("<-- Step {} finished".format(self.curStep))
            self.curStep += 1 # we are about to get into next step. TODO: race condition here!    
            logger.debug(" ") # line break
            logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep

            logger.debug("Main thread waking up at step {}, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
208 209 210 211
            self.tapAllThreads()

        # The threads will run through many steps
        for workerThread in self.threadList:
212
            workerThread._thread.join() # slight hack, accessing members
S
Steven Li 已提交
213 214

        logger.info("All threads finished")
215 216
        print("")
        print("Finished")
S
Steven Li 已提交
217

218
    def crossPoolBarrier(self):
219
        self._poolBarrier.wait()
S
Steven Li 已提交
220 221 222 223 224 225 226 227 228

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
        for i in range(self.numThreads): # generate a random sequence
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
        logger.info("Waking up threads: {}".format(str(wakeSeq)))
229
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
230 231 232 233 234 235 236
        for i in wakeSeq:
            self.threadList[i].tapStepGate()
            time.sleep(0) # yield

# A queue of continguous POSITIVE integers
class LinearQueue():
    def __init__(self):
237
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
238
        self.lastIndex = 0
239
        self._lock = threading.RLock() # our functions may call each other
240
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
241

242 243 244 245 246 247 248 249 250
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
251 252
            # Otherwise we have something
            self.lastIndex += 1
253 254
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
255
            return self.lastIndex
S
Steven Li 已提交
256 257

    def pop(self):
258
        with self._lock:
259
            if ( self.isEmpty() ): 
260 261 262
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
263
            index = self.firstIndex
264
            if ( index in self.inUse ):
265 266 267 268
                return False

            # if ( index in self.inUse ):
            #     self.inUse.remove(index) # TODO: what about discard?
269

270 271 272 273 274 275 276
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
277
        with self._lock:
278 279 280 281
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
282
    def allocate(self, i):
283
        with self._lock:
284
            # logger.debug("LQ allocating item {}".format(i))
285 286 287 288
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
289
    def release(self, i):
290
        with self._lock:
291 292
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
293 294 295 296

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
297
    def pickAndAllocate(self):
298 299 300
        if ( self.isEmpty() ):
            return None
        with self._lock:
301 302 303 304
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
305 306
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
307 308
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
309 310 311 312 313
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
314 315
        self._conn = None 
        self._cursor = None
316 317 318 319 320 321 322
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
323 324
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
325

326 327 328 329 330
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db')

        # Open connection
331
        self._tdSql = TDSql()
332
        self._tdSql.init(self._cursor)
333 334 335 336 337
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
338 339 340 341 342 343
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
        self._cursor.execute('create database db')
        # self._cursor.execute('use db')

344 345 346 347 348 349 350
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
351

352 353 354
    def execSql(self, sql): 
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
355
        return self._tdSql.execute(sql)
S
Steven Li 已提交
356 357 358 359 360

# State of the database as we believe it to be
class DbState():
    def __init__(self):
        self.tableNumQueue = LinearQueue()
361 362 363 364 365 366 367 368
        self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()

        # self.openDbServerConnection()
        self._dbConn = DbConn()
        self._dbConn.open() 
        self._dbConn.resetDb() # drop and recreate DB
369

370 371 372
    def getDbConn(self):
        return self._dbConn

S
Steven Li 已提交
373 374 375
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

376 377 378 379 380
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

S
Steven Li 已提交
381 382 383
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

384
    def getNextTick(self):
385 386 387
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
388 389

    def getNextInt(self):
390 391 392 393
        with self._lock:
            self._lastInt += 1
            return self._lastInt
    
S
Steven Li 已提交
394
    def getTableNameToDelete(self):
395
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
396 397 398
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
399 400
        return "table_{}".format(tblNum)

401 402 403 404 405 406
    def execSql(self, sql): # using the main DB connection
        return self._dbConn.execSql(sql)

    def cleanUp(self):
        self._dbConn.close()      

407
# A task is a long-living entity, carrying out short-lived "executions" for threads
S
Steven Li 已提交
408
class Task():
409 410 411
    def __init__(self, dbState):
        self.dbState = dbState

412 413 414
    def _executeInternal(self, wt):
        raise RuntimeError("To be implemeted by child classes")

415
    def execute(self, workerThread):
416 417
        self._executeInternal(workerThread) # TODO: no return value?
        workerThread.logDebug("[X] task execution completed")
S
Steven Li 已提交
418

419 420 421
    def execSql(self, sql):
        return self.dbState.execute(sql)

S
Steven Li 已提交
422
class CreateTableTask(Task):
423
    def _executeInternal(self, wt):
424
        tIndex = dbState.addTable()
425
        wt.logDebug("Creating a table {} ...".format(tIndex))
426
        wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
427
        wt.logDebug("Table {} created.".format(tIndex))
428
        dbState.releaseTable(tIndex)
S
Steven Li 已提交
429 430

class DropTableTask(Task):
431
    def _executeInternal(self, wt):
S
Steven Li 已提交
432 433
        tableName = dbState.getTableNameToDelete()
        if ( not tableName ): # May be "False"
434
            wt.logInfo("Cannot generate a table to delete, skipping...")
S
Steven Li 已提交
435
            return
436
        wt.logInfo("Dropping a table db.{} ...".format(tableName))
437
        wt.execSql("drop table db.{}".format(tableName))
S
Steven Li 已提交
438 439

class AddDataTask(Task):
440
    def _executeInternal(self, wt):
S
Steven Li 已提交
441
        ds = self.dbState
442
        wt.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
443 444
        tIndex = ds.pickAndAllocateTable()
        if ( tIndex == None ):
445
            wt.logInfo("No table found to add data, skipping...")
446
            return
447
        sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
448
        wt.logDebug("Executing SQL: {}".format(sql))
449 450
        wt.execSql(sql) 
        ds.releaseTable(tIndex)
451
        wt.logDebug("Finished adding data")
S
Steven Li 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486

# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
    def throw(cls, max): # get 0 to max-1
        return cls.throwRange(0, max)

    @classmethod
    def throwRange(cls, min, max): # up to max-1
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
        return random.randrange(min, max)


# Anyone needing to carry out work should simply come here
class WorkDispatcher():
487
    def __init__(self, pool, dbState):
S
Steven Li 已提交
488
        self.pool = pool
489
        # self.totalNumMethods = 2
S
Steven Li 已提交
490
        self.tasks = [
491
            CreateTableTask(dbState),
492
            DropTableTask(dbState),
S
Steven Li 已提交
493
            AddDataTask(dbState),
S
Steven Li 已提交
494 495 496
        ]

    def throwDice(self):
497 498 499 500
        max = len(self.tasks) - 1 
        dRes = random.randint(0, max)
        # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
        return dRes
S
Steven Li 已提交
501

502
    def doWork(self, workerThread):
S
Steven Li 已提交
503 504
        dice = self.throwDice()
        task = self.tasks[dice]
505
        task.execute(workerThread)
S
Steven Li 已提交
506 507

if __name__ == "__main__":
508 509 510 511 512 513 514 515 516
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
    parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator')
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
                        help='Use a single shared db connection (default: false)')
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')

    gConfig = parser.parse_args()

S
Steven Li 已提交
517
    logger = logging.getLogger('myApp')
518 519
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
520 521 522 523 524
    ch = logging.StreamHandler()
    logger.addHandler(ch)

    Dice.seed(0) # initial seeding of dice
    dbState = DbState()
525
    threadPool = SteppingThreadPool(dbState, 5, 500, 0) 
S
Steven Li 已提交
526 527
    threadPool.run()
    logger.info("Finished running thread pool")
528
    dbState.cleanUp()
S
Steven Li 已提交
529