crash_gen.py 61.2 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17
import os
18
import traceback
19 20 21 22
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
23
import getopt
24
import argparse
25
import copy
S
Steven Li 已提交
26 27 28

import threading
import random
29
import time
S
Steven Li 已提交
30
import logging
31
import datetime
32
import textwrap
S
Steven Li 已提交
33

34
from typing import List
35
from typing import Dict
36
from typing import Set
37

S
Steven Li 已提交
38 39 40 41 42
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

43
import crash_gen
S
Steven Li 已提交
44 45
import taos

46
# Global variables, tried to keep a small number. 
47 48 49 50

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
gConfig = argparse.Namespace() # Dummy value, will be replaced later
51
logger = None
S
Steven Li 已提交
52

53 54
def runThread(wt: WorkerThread):    
    wt.run()
55

56 57 58 59 60 61 62 63
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
64
class WorkerThread:
65
    def __init__(self, pool: ThreadPool, tid, 
66 67 68 69
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
70
        self._pool = pool
71
        self._tid = tid        
72
        self._tc = tc # type: ThreadCoordinator
S
Steven Li 已提交
73
        # self.threadIdent = threading.get_ident()
74 75
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
76

77
        # Let us have a DB connection of our own
78 79 80
        if ( gConfig.per_thread_db_connection ): # type: ignore
            self._dbConn = DbConn()   

81 82
        self._dbInUse = False # if "use db" was executed already

83
    def logDebug(self, msg):
S
Steven Li 已提交
84
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
85 86

    def logInfo(self, msg):
S
Steven Li 已提交
87
        logger.info("    TRD[{}] {}".format(self._tid, msg))
88

89 90 91 92 93 94 95 96
    def dbInUse(self):
        return self._dbInUse

    def useDb(self):
        if ( not self._dbInUse ):
            self.execSql("use db")
        self._dbInUse = True

97 98
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
99

S
Steven Li 已提交
100
    def start(self):
101
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
102

103
    def run(self): 
S
Steven Li 已提交
104
        # initialization after thread starts, in the thread context
105
        # self.isSleeping = False
106 107
        logger.info("Starting to run thread: {}".format(self._tid))

108
        if ( gConfig.per_thread_db_connection ): # type: ignore
109
            self._dbConn.open()
S
Steven Li 已提交
110

111 112
        self._doTaskLoop()       
        
113
        # clean up
114
        if ( gConfig.per_thread_db_connection ): # type: ignore 
115
            self._dbConn.close()
116

117 118 119
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
120 121 122
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
S
Steven Li 已提交
123
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
124
            self.crossStepGate()   # then per-thread gate, after being tapped
S
Steven Li 已提交
125
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
126
            if not self._tc.isRunning():
S
Steven Li 已提交
127
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
128 129
                break

130
            # Fetch a task from the Thread Coordinator
S
Steven Li 已提交
131
            logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
132
            task = tc.fetchTask()
133 134

            # Execute such a task
S
Steven Li 已提交
135
            logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
136
            task.execute(self)
137
            tc.saveExecutedTask(task)
S
Steven Li 已提交
138
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
139 140

            self._dbInUse = False # there may be changes between steps
141
  
S
Steven Li 已提交
142
    def verifyThreadSelf(self): # ensure we are called by this own thread
143
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
144 145 146 147 148 149 150
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
151
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
152 153
            raise RuntimeError("Unexpected dead thread")

154
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
155 156 157 158
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
159
        # Wait again at the "gate", waiting to be "tapped"
S
Steven Li 已提交
160
        logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
161 162
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
163
        
164
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
165 166 167 168

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
169
 
S
Steven Li 已提交
170
        logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
171 172
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
173

174
    def execSql(self, sql): # TODO: expose DbConn directly
175 176 177
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
178
            return self._tc.getDbManager().getDbConn().execute(sql)
179

180 181 182 183 184 185 186 187 188 189 190 191
    def querySql(self, sql): # TODO: expose DbConn directly
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.query(sql)            
        else:
            return self._tc.getDbManager().getDbConn().query(sql)

    def getQueryResult(self):
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.getQueryResult()       
        else:
            return self._tc.getDbManager().getDbConn().getQueryResult()

192 193 194 195
    def getDbConn(self):
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn     
        else:
196
            return self._tc.getDbManager().getDbConn()
197

198 199 200 201 202
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
    #         return self._dbConn.query(sql)            
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
203

204
class ThreadCoordinator:
205
    def __init__(self, pool: ThreadPool, dbManager):
206 207
        self._curStep = -1 # first step is 0
        self._pool = pool
208
        # self._wd = wd
209
        self._te = None # prepare for every new step
210
        self._dbManager = dbManager
211 212
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
213

214
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
215
        self._execStats = ExecutionStats()
S
Steven Li 已提交
216

217 218 219
    def getTaskExecutor(self):
        return self._te

220 221
    def getDbManager(self) -> DbManager :
        return self._dbManager
222

223 224 225
    def crossStepBarrier(self):
        self._stepBarrier.wait()

226 227
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
228 229

        # Coordinate all threads step by step
230 231
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
232 233 234
        self._execStats.startExec() # start the stop watch
        failed = False
        while(self._curStep < maxSteps-1 and not failed):  # maxStep==10, last curStep should be 9
S
Steven Li 已提交
235 236 237
            if not gConfig.debug: 
                print(".", end="", flush=True) # print this only if we are not in debug mode
            logger.debug("[TRD] Main thread going to sleep")
238

239
            # Now ready to enter a step
240 241 242 243
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
244
            try:
245 246 247 248 249 250 251 252 253 254
                sm = self._dbManager.getStateMachine()
                logger.debug("[STT] starting transitions")
                sm.transition(self._executedTasks) # at end of step, transiton the DB state
                logger.debug("[STT] transition ended")
                if sm.hasDatabase() :
                    for t in self._pool.threadList:
                        logger.debug("[DB] use db for all worker threads")
                        t.useDb()
                        # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread

255 256 257 258 259 260 261 262 263 264 265 266 267
            except taos.error.ProgrammingError as err:
                if ( err.msg == 'network unavailable' ): # broken DB connection
                    logger.info("DB connection broken, execution failed")
                    traceback.print_stack()
                    failed = True
                    self._te = None # Not running any more
                    self._execStats.registerFailure("Broken DB Connection")
                    # continue # don't do that, need to tap all threads at end, and maybe signal them to stop
                else:
                    raise 
            finally:
                pass
            
268
            self.resetExecutedTasks() # clear the tasks after we are done
269 270

            # Get ready for next step
S
Steven Li 已提交
271
            logger.debug("<-- Step {} finished".format(self._curStep))
272 273
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
            logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
274

275
            # A new TE for the new step
276 277
            if not failed: # only if not failed
                self._te = TaskExecutor(self._curStep)
278

S
Steven Li 已提交
279
            logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
S
Steven Li 已提交
280 281
            self.tapAllThreads()

282
        logger.debug("Main thread ready to finish up...")
283 284 285 286 287 288 289 290
        if not failed: # only in regular situations
            self.crossStepBarrier() # Cross it one last time, after all threads finish
            self._stepBarrier.reset()
            logger.debug("Main thread in exclusive zone...")
            self._te = None # No more executor, time to end
            logger.debug("Main thread tapping all threads one last time...")
            self.tapAllThreads() # Let the threads run one last time

291 292
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
S
Steven Li 已提交
293
        logger.info("All worker thread finished")
294 295 296
        self._execStats.endExec()

    def logStats(self):
297
        self._execStats.logStats()
S
Steven Li 已提交
298 299 300

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
301
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
302 303 304 305
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
306
        logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
307
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
308
        for i in wakeSeq:
309
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
310 311
            time.sleep(0) # yield

312 313 314 315 316 317
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
318 319
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
320 321 322 323 324 325 326
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
327
        taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state
328
        return taskType(self.getDbManager(), self._execStats) # create a task from it
329 330 331

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
332 333 334 335

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
336 337

# We define a class to run a number of threads in locking steps.
338
class ThreadPool:
339
    def __init__(self, numThreads, maxSteps):
340 341 342 343
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
344
        self.threadList = [] # type: List[WorkerThread]
345 346
        
    # starting to run all the threads, in locking steps
347
    def createAndStartThreads(self, tc: ThreadCoordinator):
348
        for tid in range(0, self.numThreads): # Create the threads
349
            workerThread = WorkerThread(self, tid, tc)            
350 351 352 353 354 355 356 357
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

358 359
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Steven Li 已提交
360 361
class LinearQueue():
    def __init__(self):
362
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
363
        self.lastIndex = 0
364
        self._lock = threading.RLock() # our functions may call each other
365
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
366

367 368 369 370 371 372 373 374 375
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
376 377
            # Otherwise we have something
            self.lastIndex += 1
378 379
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
380
            return self.lastIndex
S
Steven Li 已提交
381 382

    def pop(self):
383
        with self._lock:
384
            if ( self.isEmpty() ): 
385 386 387
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
388
            index = self.firstIndex
389
            if ( index in self.inUse ):
390 391
                return False

392 393 394 395 396 397 398
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
399
        with self._lock:
400 401 402 403
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
404
    def allocate(self, i):
405
        with self._lock:
406
            # logger.debug("LQ allocating item {}".format(i))
407 408 409 410
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
411
    def release(self, i):
412
        with self._lock:
413 414
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
415 416 417 418

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
419
    def pickAndAllocate(self):
420 421 422
        if ( self.isEmpty() ):
            return None
        with self._lock:
423 424 425 426
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
427 428
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
429 430
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
431 432 433 434 435
                    self.allocate(ret)
                    return ret

class DbConn:
    def __init__(self):
436 437
        self._conn = None 
        self._cursor = None
438 439 440 441 442 443 444
        self.isOpen = False
        
    def open(self): # Open connection
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

        cfgPath = "../../build/test/cfg" 
445 446
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()
447

448 449
        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
450
        # self._cursor.execute('use db') # do this at the beginning of every step
451 452

        # Open connection
453
        self._tdSql = TDSql()
454
        self._tdSql.init(self._cursor)
455 456 457 458 459
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
460 461 462
        # self._tdSql.prepare() # Recreate database, etc.

        self._cursor.execute('drop database if exists db')
463 464
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
465 466
        # self._cursor.execute('use db')

467 468 469 470 471 472 473
        # tdSql.execute('show databases')

    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
        self.isOpen = False
S
Steven Li 已提交
474

475
    def execute(self, sql): 
476
        if ( not self.isOpen ):
477
            raise RuntimeError("Cannot execute database commands until connection is open")
478 479 480 481
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.execute(sql)
        logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
        return nRows
S
Steven Li 已提交
482

483
    def query(self, sql) :  # return rows affected
484 485
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
486 487
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.query(sql)
488
        logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
489
        return nRows
490
        # results are in: return self._tdSql.queryResult
491

492 493 494
    def getQueryResult(self):
        return self._tdSql.queryResult

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
    def _queryAny(self, sql) : # actual query result as an int
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        tSql = self._tdSql
        nRows = tSql.query(sql)
        if nRows != 1 :
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
        if tSql.queryRows != 1 or tSql.queryCols != 1:
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
        return tSql.queryResult[0][0]

    def queryScalar(self, sql) -> int :
        return self._queryAny(sql)

    def queryString(self, sql) -> str :
        return self._queryAny(sql)
    
class AnyState:
513
    STATE_INVALID    = -1
514 515 516 517
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
518 519 520 521 522
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
523 524
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
525 526 527 528 529 530 531
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Steven Li 已提交
532
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
533 534 535 536

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
537 538 539 540 541 542 543 544
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
            raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))

545 546 547
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
548 549 550
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

551 552 553 554 555 556
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
557 558 559 560
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
561 562 563 564 565 566 567 568 569 570 571
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
572
                # task.logDebug("Task success found")
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
        for task in tasks :
            if not isinstance(task, cls):
                continue
            exists = True # we have a valid instance
            if task.isSuccess():
                sCnt += 1
        if ( exists and sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
608 609 610 611 612 613
    def hasTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                return True
        return False

614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
            False, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    # def verifyTasksToState(self, tasks, newState):

class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
            True, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

S
Steven Li 已提交
634
    def verifyTasksToState(self, tasks, newState): 
635 636 637
        if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB
            if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks
                self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers
638 639 640 641 642 643 644 645 646 647 648

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
649 650 651
        if ( not self.hasTask(tasks, TaskCreateDb) ):
            self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more
        self.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Steven Li 已提交
652
        # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
653
        # Nothing to be said about adding data task
654
        # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
655
            # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
656
            # self.assertAtMostOneSuccess(tasks, DropDbTask)
657
            # self._state = self.STATE_EMPTY
658 659 660 661
        # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
        #     # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
        #     if ( not self.hasTask(tasks, TaskDropSuperTable) ): 
        #         self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
662
            # self.assertNoTask(tasks, DropDbTask) # should have have tried
663 664 665 666 667 668 669 670 671 672 673 674
            # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
            #     # can't say there's add-data attempts, since they may all fail
            #     self._state = self.STATE_TABLE_ONLY
            # else:                    
            #     self._state = self.STATE_HAS_DATA
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
        #     self._state = self.STATE_DB_ONLY  # no change

675
class StateSuperTableOnly(AnyState):
676 677 678 679 680 681 682 683 684
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
685
        if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
686 687 688
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
            self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it

689
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
690 691
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
692
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
693 694 695
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
696
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
697 698 699
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
700 701 702 703 704 705 706 707 708 709 710

class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Steven Li 已提交
711
        if ( newState.equals(AnyState.STATE_EMPTY) ):
712 713 714
            self.hasSuccess(tasks, TaskDropDb)
            if ( not self.hasTask(tasks, TaskCreateDb) ) : 
                self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
S
Steven Li 已提交
715
        elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
716 717 718
            if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task
                self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
            self.hasSuccess(tasks, TaskDropSuperTable)
719
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
S
Steven Li 已提交
720
        elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
721 722 723
            self.assertNoTask(tasks, TaskDropDb)
            self.assertNoTask(tasks, TaskDropSuperTable)
            self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
724
            # self.hasSuccess(tasks, DeleteDataTasks)
725
        else: # should be STATE_HAS_DATA
726 727
            if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
                self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
728 729
            if (not self.hasTask(tasks, TaskCreateSuperTable)) :  # if we didn't create the table
                self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it            
730
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
731

732
class StateMechine:
733 734 735 736 737 738 739 740
    def __init__(self, dbConn):
        self._dbConn = dbConn
        self._curState = self._findCurrentState() # starting state
        self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
        
    def getCurrentState(self):
        return self._curState

741 742 743
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

744 745
    # May be slow, use cautionsly...
    def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
746 747 748 749 750 751
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
        firstTaskTypes = []
        for tc in allTaskClasses:
            # t = tc(self) # create task object            
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
        # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy() # have to have these
        for task1 in firstTaskTypes: # each task type gathered so far
            endState = task1.getEndState() # figure the end state
            if endState == None: # does not change end state
                continue # no use, do nothing
            for tc in allTaskClasses: # what task can further begin from there?
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
                    taskTypes.append(tc) # gather it

        if len(taskTypes) <= 0:
            raise RuntimeError("No suitable task types found for state: {}".format(self._curState))   
770
        logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))     
771 772 773 774 775 776 777 778 779
        return taskTypes

    def _findCurrentState(self):
        dbc = self._dbConn
        ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
        if dbc.query("show databases") == 0 : # no database?!
            # logger.debug("Found EMPTY state")
            logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
            return StateEmpty()
780
        dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
        if dbc.query("show tables") == 0 : # no tables
            # logger.debug("Found DB ONLY state")
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
            return StateDbOnly()
        if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables
            # logger.debug("Found TABLE_ONLY state")
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
            return StateSuperTableOnly()
        else: # has actual tables
            # logger.debug("Found HAS_DATA state")
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
            return StateHasData()

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
796
            logger.debug("[STT] Starting State: {}".format(self._curState))
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847
            return # do nothing

        self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops

        if self._curState.canDropDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop

        # if self._state.canAddData():
        #     self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true actually

        # if self._state.canReadData():
            # Nothing for sure

        newState = self._findCurrentState()
        logger.debug("[STT] New DB state determined: {}".format(newState))
        self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
        self._curState = newState

    def pickTaskType(self):
        taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
                weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
848

849
# Manager of the Database Data/Connection
850
class DbManager():    
851
    def __init__(self, resetDb = True):
S
Steven Li 已提交
852
        self.tableNumQueue = LinearQueue()
853
        self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick
854 855
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
856
        
857 858
        # self.openDbServerConnection()
        self._dbConn = DbConn()
859 860 861 862
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
863
            if ( err.msg == 'client disconnected' ): # cannot open DB connection
864 865 866 867 868
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
S
Steven Li 已提交
869
            print("[=] Unexpected exception")
870
            raise        
871 872 873

        if resetDb :
            self._dbConn.resetDb() # drop and recreate DB            
874

875 876
        self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape
        
877 878 879
    def getDbConn(self):
        return self._dbConn

880
    def getStateMachine(self) -> StateMechine :
881 882 883 884
        return self._stateMachine

    # def getState(self):
    #     return self._stateMachine.getCurrentState()
885 886 887 888 889 890

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
891
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
892
    def setupLastTick(self):
893
        t1 = datetime.datetime(2020, 6, 1)
894
        t2 = datetime.datetime.now()
895 896
        elSec = int(t2.timestamp() - t1.timestamp()) # maybe a very large number, takes 69 years to exceed Python int range
        elSec2 = (  elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500 ) ) * 500 # a number representing seconds within 10 years
897 898
        # print("elSec = {}".format(elSec))
        t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
899
        t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above
900 901 902
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

S
Steven Li 已提交
903 904 905
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

906 907 908 909 910
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

911 912
    @classmethod
    def getFixedSuperTableName(cls):
913
        return "fs_table"
914

S
Steven Li 已提交
915 916 917
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

918
    def getNextTick(self):
919 920 921
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
922 923

    def getNextInt(self):
924 925 926
        with self._lock:
            self._lastInt += 1
            return self._lastInt
927 928

    def getNextBinary(self):
929
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(self.getNextInt())
930 931 932

    def getNextFloat(self):
        return 0.9 + self.getNextInt()
933
    
S
Steven Li 已提交
934
    def getTableNameToDelete(self):
935
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
936 937 938
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
939 940
        return "table_{}".format(tblNum)

941 942 943
    def cleanUp(self):
        self._dbConn.close()      

944 945 946 947
class TaskExecutor():
    def __init__(self, curStep):
        self._curStep = curStep

948 949 950
    def getCurStep(self):
        return self._curStep

951 952
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
953

954 955
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
956

957 958
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
959

S
Steven Li 已提交
960
class Task():
961 962 963 964
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Steven Li 已提交
965 966 967
        Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
968

969
    def __init__(self, dbManager: DbManager, execStats: ExecutionStats):        
970
        self._dbManager = dbManager
971
        self._workerThread = None 
972
        self._err = None
973
        self._curStep = None
974
        self._numRows = None # Number of rows affected
975 976 977

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
978
        # logger.debug("Creating new task {}...".format(self._taskNum))
979

980
        self._execStats = execStats
981
        self._lastSql = "" # last SQL executed/attempted
982

983 984
    def isSuccess(self):
        return self._err == None
985

986
    def clone(self): # TODO: why do we need this again?
987
        newTask = self.__class__(self._dbManager, self._execStats)
988 989 990
        return newTask

    def logDebug(self, msg):
S
Steven Li 已提交
991
        self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
992 993

    def logInfo(self, msg):
S
Steven Li 已提交
994
        self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
995

996
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
997
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
998

999 1000
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
1001
        self._workerThread = wt # type: ignore
1002 1003

        te = wt.getTaskExecutor()
1004 1005
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
1006 1007

        self._err = None
1008
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
1009 1010 1011
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
1012 1013 1014 1015 1016 1017 1018 1019
            errno2 = 0x80000000 + err.errno # positive error number
            if ( errno2 in [0x200, 0x360, 0x362, 0x381, 0x380, 0x600 ]) : # allowed errors
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
                print("e", end="", flush=True)
                self._err = err  
            else:
                self.logDebug("[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
                raise
1020
        except:
1021
            self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
1022
            raise
1023
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
1024
        
1025 1026
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
1027

1028
    def execSql(self, sql):
1029
        self._lastSql = sql
1030
        return self._dbManager.execute(sql)
1031

1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
        self._lastSql = sql
        return wt.execSql(sql)

    def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
        self._lastSql = sql
        return wt.querySql(sql)

    def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
        return wt.getQueryResult()


1044
                  
1045
class ExecutionStats:
1046 1047 1048 1049 1050
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1051 1052
        self._execStartTime = None
        self._elapsedTime = 0.0 # total elapsed time
1053 1054
        self._accRunTime = 0.0 # accumulated run time

1055 1056 1057 1058 1059 1060 1061 1062 1063
        self._failed = False
        self._failureReason = None

    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1085 1086 1087 1088
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1089
    def logStats(self):
1090 1091 1092 1093
        logger.info("----------------------------------------------------------------------")
        logger.info("| Crash_Gen test {}, with the following stats:".
            format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
        logger.info("| Task Execution Times (success/total):")
1094 1095
        execTimesAny = 0
        for k, n in self._execTimes.items():            
1096
            execTimesAny += n[0]
1097
            logger.info("|    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
1098
                
1099 1100 1101 1102 1103 1104 1105
        logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
        logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
        logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
        logger.info("----------------------------------------------------------------------")
        
1106 1107 1108


class StateTransitionTask(Task):
1109 1110 1111 1112 1113
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1114 1115 1116 1117
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

1118 1119 1120 1121 1122
    _endState = None 
    @classmethod
    def getEndState(cls): # TODO: optimize by calling it fewer times
        raise RuntimeError("Overriding method expected")

1123 1124 1125
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1126

1127 1128 1129
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1130 1131

    @classmethod
1132 1133 1134
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1135

1136 1137 1138 1139
    @classmethod
    def getRegTableName(cls, i):
        return "db.reg_table_{}".format(i)

1140 1141 1142
    def execute(self, wt: WorkerThread):
        super().execute(wt)
        
1143
class TaskCreateDb(StateTransitionTask):
1144
    @classmethod
1145 1146
    def getEndState(cls):
        return StateDbOnly() 
1147

1148 1149 1150 1151
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1152
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1153
        self.execWtSql(wt, "create database db")       
1154

1155
class TaskDropDb(StateTransitionTask):
1156
    @classmethod
1157 1158
    def getEndState(cls):
        return StateEmpty()
1159

1160 1161 1162 1163
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1164
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1165
        self.execWtSql(wt, "drop database db")
S
Steven Li 已提交
1166
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1167

1168
class TaskCreateSuperTable(StateTransitionTask):
1169
    @classmethod
1170 1171
    def getEndState(cls):
        return StateSuperTableOnly()
1172

1173 1174
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1175
        return state.canCreateFixedSuperTable()
1176

1177
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1178 1179 1180 1181 1182 1183 1184
        if not wt.dbInUse(): # no DB yet, to the best of our knowledge
            logger.debug("Skipping task, no DB yet")
            return

        tblName = self._dbManager.getFixedSuperTableName()   
        # wt.execSql("use db")    # should always be in place
        self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
1185 1186
        # No need to create the regular tables, INSERT will do that automatically

S
Steven Li 已提交
1187

1188
class TaskReadData(StateTransitionTask):
1189
    @classmethod
1190 1191
    def getEndState(cls):
        return None # meaning doesn't affect state
1192

1193 1194 1195 1196
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1197
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1198
        sTbName = self._dbManager.getFixedSuperTableName()        
1199 1200
        self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later

1201
        if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
1202 1203
            wt.getDbConn().close()
            wt.getDbConn().open()
1204
        else:
1205
            rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
1206 1207
            # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
            for rTbName in rTables : # regular tables
1208
                self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
1209

1210 1211
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

1212
class TaskDropSuperTable(StateTransitionTask):
1213
    @classmethod
1214 1215
    def getEndState(cls):
        return StateDbOnly() 
1216

1217 1218
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1219
        return state.canDropFixedSuperTable()
1220

1221
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
        if Dice.throw(2) == 0 : 
            tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) 
            random.shuffle(tblSeq) 
            tickOutput = False # if we have spitted out a "d" character for "drop regular table"
            for i in tblSeq: 
                regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)  
                try:
                    nRows = self.execWtSql(wt, "drop table {}".format(regTableName))  
                except taos.error.ProgrammingError as err:
                    errno2 = 0x80000000 + err.errno # positive error number
                    if ( errno2 in [0x362]) : # allowed errors
                        logger.debug("[DB] Acceptable error when dropping a table")
                    continue

                if (not tickOutput):
                    tickOutput = True # Print only one time
                    if nRows >= 1 :
                        print("d", end="", flush=True)
                    else:
                        print("f({})".format(nRows), end="", flush=True)                    

        # Drop the super table itself
        tblName = self._dbManager.getFixedSuperTableName()     
        self.execWtSql(wt, "drop table db.{}".format(tblName))
1247

1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
        return None # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropFixedSuperTable() # if we can drop it, we can alter tags

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbManager.getFixedSuperTableName()   
        dice = Dice.throw(4)
        if dice == 0 :
1261
            sql = "alter table db.{} add tag extraTag int".format(tblName)
1262
        elif dice == 1 :
1263
            sql = "alter table db.{} drop tag extraTag".format(tblName)
1264
        elif dice == 2 :
1265
            sql = "alter table db.{} drop tag newTag".format(tblName)
1266
        else: # dice == 3
1267 1268 1269
            sql = "alter table db.{} change tag extraTag newTag".format(tblName)

        self.execWtSql(wt, sql)
1270

1271
class TaskAddData(StateTransitionTask):
1272
    activeTable : Set[int] = set() # Track which table is being actively worked on
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286

    # We use these two files to record operations to DB, useful for power-off tests
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
        if gConfig.record_ops :            
            if ( cls.fAddLogReady == None ):
                logger.info("Recording in a file operations to be performed...")
                cls.fAddLogReady = open("add_log_ready.txt", "w")
            if ( cls.fAddLogDone == None ):
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
1287

1288
    @classmethod
1289 1290
    def getEndState(cls):
        return StateHasData()
1291 1292 1293 1294

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
1295 1296
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1297
        ds = self._dbManager
1298
        # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
1299
        tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) 
1300 1301 1302 1303 1304 1305 1306 1307
        random.shuffle(tblSeq) 
        for i in tblSeq: 
            if ( i in self.activeTable ): # wow already active
                # logger.info("Concurrent data insertion into table: {}".format(i))      
                # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table
                print("x", end="", flush=True)
            else:
                self.activeTable.add(i) # marking it active
1308 1309
            # No need to shuffle data sequence, unless later we decide to do non-increment insertion      
            regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)      
1310
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
1311
                nextInt = ds.getNextInt()                
1312 1313 1314 1315 1316 1317 1318
                if gConfig.record_ops:
                    self.prepToRecordOps()
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
                sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format(
                    regTableName, 
1319 1320
                    ds.getFixedSuperTableName(), 
                    ds.getNextBinary(), ds.getNextFloat(),
1321
                    ds.getNextTick(), nextInt)
1322
                self.execWtSql(wt, sql) 
1323 1324 1325 1326
                if gConfig.record_ops:
                    self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
1327
            self.activeTable.discard(i) # not raising an error, unlike remove
1328 1329


S
Steven Li 已提交
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
1352 1353
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1354 1355

    @classmethod
1356
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
1357 1358
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
1359
        return random.randrange(start, stop)
S
Steven Li 已提交
1360 1361 1362


# Anyone needing to carry out work should simply come here
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
1385

S
Steven Li 已提交
1386 1387
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Steven Li 已提交
1388 1389 1390
        if ( record.levelno >= logging.INFO ) :
            return True # info or above always log

S
Steven Li 已提交
1391 1392 1393
        msg = record.msg
        # print("type = {}, value={}".format(type(msg), msg))
        # sys.exit()
S
Steven Li 已提交
1394 1395 1396 1397 1398

        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
1399 1400
        return True

1401 1402 1403 1404 1405 1406 1407 1408 1409
class MainExec:
    @classmethod
    def runClient(cls):
        # resetDb = False # DEBUG only
        # dbState = DbState(resetDb)  # DBEUG only!
        dbManager = DbManager() # Regular function
        Dice.seed(0) # initial seeding of dice
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
        tc = ThreadCoordinator(thPool, dbManager)
S
Steven Li 已提交
1410
        
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
        tc.run()
        tc.logStats()
        dbManager.cleanUp()    

    @classmethod
    def runService(cls):
        print("Running service...")

    @classmethod
    def runTemp(cls): # for debugging purposes
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
        # sTbName = dbState.getFixedSuperTableName()   
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
        #     dbc.execute("use db")     

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
        
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
        #                 rTbName[0],                    
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
        #         for rTbName in rTables : # regular tables        
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
        #         logger.info("Initial READING operation is successful")       
        #     except taos.error.ProgrammingError as err:
        #         logger.info("Initial WRITE/READ error: {}".format(err))   
        
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
        #     rows = dbc.query("show databases") 
        #     print("Rows: {}, time={}".format(rows, time.time()))
        return 
S
Steven Li 已提交
1462

1463
def main():
1464
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
1465 1466 1467 1468 1469 1470 1471 1472 1473
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
1474

1475 1476
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
1477 1478
    parser.add_argument('-e', '--run-tdengine', action='store_true',                        
                        help='Run TDengine service in foreground (default: false)')
1479 1480
    parser.add_argument('-l', '--larger-data', action='store_true',                        
                        help='Write larger amount of data during write operations (default: false)')
1481
    parser.add_argument('-p', '--per-thread-db-connection', action='store_false',                        
1482
                        help='Use a single shared db connection (default: false)')
1483 1484
    parser.add_argument('-r', '--record-ops', action='store_true',                        
                        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
1485
    parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
1486
                        help='Maximum number of steps to run (default: 100)')
1487
    parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
1488
                        help='Number of threads to run (default: 10)')
1489

1490
    global gConfig
1491
    gConfig = parser.parse_args()
1492 1493 1494
    # if len(sys.argv) == 1:
    #     parser.print_help()
    #     sys.exit()
1495
 
1496
    global logger
1497
    logger = logging.getLogger('CrashGen')
S
Steven Li 已提交
1498
    logger.addFilter(LoggingFilter())
1499 1500
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1501 1502
    else:
        logger.setLevel(logging.INFO)
S
Steven Li 已提交
1503 1504 1505
    ch = logging.StreamHandler()
    logger.addHandler(ch)

1506 1507 1508 1509
    if gConfig.run_tdengine : # run server
        MainExec.runService()
    else :
        MainExec.runClient()
1510

1511
    
S
Steven Li 已提交
1512
    # logger.info("Crash_Gen execution finished")
1513 1514 1515

if __name__ == "__main__":
    main()