crash_gen_main.py 96.4 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
17

S
Shuduo Sang 已提交
18 19 20
from typing import Set
from typing import Dict
from typing import List
21
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
22

S
Shuduo Sang 已提交
23 24
import textwrap
import time
25
import datetime
S
Shuduo Sang 已提交
26
import random
27
import logging
S
Shuduo Sang 已提交
28 29 30 31
import threading
import copy
import argparse
import getopt
32

S
Steven Li 已提交
33
import sys
34
import os
35
import signal
36
import traceback
37 38 39
import resource
from guppy import hpy
import gc
40

S
Steven Li 已提交
41 42 43
from crash_gen.service_manager import ServiceManager, TdeInstance
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
44 45 46

import taos
import requests
47

48 49 50 51
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
52
# Global variables, tried to keep a small number.
53 54 55

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
56 57
gConfig:    argparse.Namespace 
gSvcMgr:    ServiceManager # TODO: refactor this hack, use dep injection
58
# logger:     logging.Logger
59
gContainer: Container
S
Steven Li 已提交
60

61 62
# def runThread(wt: WorkerThread):
#     wt.run()
63

64

S
Steven Li 已提交
65
class WorkerThread:
66 67 68 69
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
        """
            Note: this runs in the main thread context
        """                 
S
Shuduo Sang 已提交
70
        # self._curStep = -1
71
        self._pool = pool
S
Shuduo Sang 已提交
72 73
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
74
        # self.threadIdent = threading.get_ident()
75 76
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
77
        self._stepGate = threading.Event()
S
Steven Li 已提交
78

79
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
80
        if (gConfig.per_thread_db_connection):  # type: ignore
81
            # print("connector_type = {}".format(gConfig.connector_type))
82 83 84
            tInst = gContainer.defTdeInstance
            if gConfig.connector_type == 'native':                
                self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
85
            elif gConfig.connector_type == 'rest':
86
                self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
87 88 89 90 91 92 93
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
94

95
        # self._dbInUse = False  # if "use db" was executed already
96

97
    def logDebug(self, msg):
98
        Logging.debug("    TRD[{}] {}".format(self._tid, msg))
99 100

    def logInfo(self, msg):
101
        Logging.info("    TRD[{}] {}".format(self._tid, msg))
102

103 104
    # def dbInUse(self):
    #     return self._dbInUse
105

106 107 108 109
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
110

111
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
112
        return self._tc.getTaskExecutor()
113

S
Steven Li 已提交
114
    def start(self):
115
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
116

S
Shuduo Sang 已提交
117
    def run(self):
S
Steven Li 已提交
118
        # initialization after thread starts, in the thread context
119
        # self.isSleeping = False
120
        Logging.info("Starting to run thread: {}".format(self._tid))
121

S
Shuduo Sang 已提交
122
        if (gConfig.per_thread_db_connection):  # type: ignore
123
            Logging.debug("Worker thread openning database connection")
124
            self._dbConn.open()
S
Steven Li 已提交
125

S
Shuduo Sang 已提交
126 127
        self._doTaskLoop()

128
        # clean up
S
Shuduo Sang 已提交
129
        if (gConfig.per_thread_db_connection):  # type: ignore
130 131 132
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
133
                Logging.warning("Cleaning up worker thread, dbConn already closed")
134

S
Shuduo Sang 已提交
135
    def _doTaskLoop(self):
136 137
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
138 139
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
140 141 142
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
143
                print("_bto", end="")
144
                Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
145 146
                break

147
            Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
148
            self.crossStepGate()   # then per-thread gate, after being tapped
149
            Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
150
            if not self._tc.isRunning():
151
                print("_wts", end="")
152
                Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
153 154
                break

155
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
156
            try:
157 158 159
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
160
                # self.useDb() # might encounter exceptions. TODO: catch
161 162
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
163
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
164 165 166 167 168 169
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

170
            # Fetch a task from the Thread Coordinator
171
            Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
172
            task = tc.fetchTask()
173 174

            # Execute such a task
175
            Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
176
                    self._tid, task.__class__.__name__))
177
            task.execute(self)
178
            tc.saveExecutedTask(task)
179
            Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
180

181
            # self._dbInUse = False  # there may be changes between steps
182
        # print("_wtd", end=None) # worker thread died
183

S
Shuduo Sang 已提交
184 185
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
186 187
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
188 189
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
190 191 192
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
193
        if (not self._thread.is_alive()):
S
Steven Li 已提交
194 195
            raise RuntimeError("Unexpected dead thread")

196
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
197 198
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
199 200
        self.verifyThreadSelf()  # only allowed by ourselves

201
        # Wait again at the "gate", waiting to be "tapped"
202
        Logging.debug(
S
Shuduo Sang 已提交
203 204 205
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
206
        self._stepGate.clear()
S
Shuduo Sang 已提交
207

208
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
209

S
Shuduo Sang 已提交
210
    def tapStepGate(self):  # give it a tap, release the thread waiting there
211
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
212 213
        self.verifyThreadMain()  # only allowed for main thread

214
        if self._thread.is_alive():
215
            Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
216 217 218 219
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
220

S
Shuduo Sang 已提交
221
    def execSql(self, sql):  # TODO: expose DbConn directly
222
        return self.getDbConn().execute(sql)
223

S
Shuduo Sang 已提交
224
    def querySql(self, sql):  # TODO: expose DbConn directly
225
        return self.getDbConn().query(sql)
226 227

    def getQueryResult(self):
228
        return self.getDbConn().getQueryResult()
229

230
    def getDbConn(self) -> DbConn :
S
Shuduo Sang 已提交
231 232
        if (gConfig.per_thread_db_connection):
            return self._dbConn
233
        else:
234
            return self._tc.getDbManager().getDbConn()
235

236 237
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
238
    #         return self._dbConn.query(sql)
239 240
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
241

242
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
243 244


245
class ThreadCoordinator:
246
    WORKER_THREAD_TIMEOUT = 120  # Normal: 120
247

248
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
249
        self._curStep = -1  # first step is 0
250
        self._pool = pool
251
        # self._wd = wd
S
Shuduo Sang 已提交
252
        self._te = None  # prepare for every new step
253
        self._dbManager = dbManager
S
Shuduo Sang 已提交
254 255
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
256

S
Shuduo Sang 已提交
257 258
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
259
        self._execStats = ExecutionStats()
260
        self._runStatus = Status.STATUS_RUNNING
261
        self._initDbs()
S
Steven Li 已提交
262

263 264 265
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
266
    def getDbManager(self) -> DbManager:
267
        return self._dbManager
268

269 270
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
271

272
    def requestToStop(self):
273
        self._runStatus = Status.STATUS_STOPPING
274 275
        self._execStats.registerFailure("User Interruption")

276
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
277 278 279
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
280
        if self._runStatus != Status.STATUS_RUNNING:
281 282 283 284 285
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
286 287
        if workerTimeout:
            return True
288 289 290 291 292 293 294 295 296 297 298 299 300
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
301
        Logging.debug(
302 303 304 305 306 307 308
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

309
        Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
310 311
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
312
        self.tapAllThreads() # release all worker thread from their "gates"
313 314 315 316 317

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
318
        Logging.debug("[TRD] Main thread about to cross the barrier")
319
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
320
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
321
        Logging.debug("[TRD] Main thread finished crossing the barrier")
322 323 324 325

    def _doTransition(self):
        transitionFailed = False
        try:
326 327 328
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
329
                Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
330 331 332
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
333
                Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
334 335

            # Due to limitation (or maybe not) of the TD Python library,
336
            # we cannot share connections across threads
337 338 339 340
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
341
            #         Logging.debug("[DB] use db for all worker threads")
342
            #         t.useDb()
343 344
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
345

346 347
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
348
                Logging.info("DB connection broken, execution failed")
349 350 351 352 353 354
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
355 356 357 358 359 360
            if isinstance(err, CrashGenError): # our own transition failure
                Logging.info("State transition error")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("State transition error")
361 362
            else:
                raise
S
Steven Li 已提交
363
        # return transitionFailed # Why did we have this??!!
364 365 366

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
367
        Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
368 369
        return transitionFailed

S
Shuduo Sang 已提交
370
    def run(self):
371
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
372 373

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
374
        self._curStep = -1  # not started yet
375
        
S
Shuduo Sang 已提交
376
        self._execStats.startExec()  # start the stop watch
377 378
        transitionFailed = False
        hasAbortedTask = False
379 380
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
381 382 383
            if not gConfig.debug: # print this only if we are not in debug mode    
                Progress.emit(Progress.STEP_BOUNDARY)            
                # print(".", end="", flush=True)
384 385 386 387 388 389 390 391
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
392
                        
393 394
            try:
                self._syncAtBarrier() # For now just cross the barrier
395
                Progress.emit(Progress.END_THREAD_STEP)
396 397
            except threading.BrokenBarrierError as err:
                self._execStats.registerFailure("Aborted due to worker thread timeout")
398 399 400 401
                Logging.error("\n")
                Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
                    ThreadCoordinator.WORKER_THREAD_TIMEOUT))
                Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
402 403 404
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
405 406 407 408 409 410

                # Enable below for deadlock debugging, using gdb to attach to process
                # while True:
                #     Logging.error("Deadlock detected")
                #     time.sleep(60.0)

411
                break
412 413

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
414 415
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
416 417
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
418
                Logging.info("Aborted task encountered, exiting test program")
419
                self._execStats.registerFailure("Aborted Task Encountered")
420
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
421

422
            # Ending previous step
423 424 425 426
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
427
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
428
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
429
                Logging.info(errMsg)
430
                traceback.print_exc()
S
Steven Li 已提交
431
                self._execStats.registerFailure(errMsg)
432

433
            # Then we move on to the next step
434
            Progress.emit(Progress.BEGIN_THREAD_STEP)
435
            self._releaseAllWorkerThreads(transitionFailed)                    
436

437
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
438
            Logging.debug("Abnormal ending of main thraed")
439
        elif workerTimeout:
440
            Logging.debug("Abnormal ending of main thread, due to worker timeout")
441
        else: # regular ending, workers waiting at "barrier"
442
            Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
443
            self._syncAtBarrier()
444

445
        self._te = None  # No more executor, time to end
446
        Logging.debug("Main thread tapping all threads one last time...")
447
        self.tapAllThreads()  # Let the threads run one last time
448

449 450
        Logging.debug("\r\n\n--> Main thread ready to finish up...")
        Logging.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
451
        self._pool.joinAll()  # Get all threads to finish
S
Steven Li 已提交
452
        Logging.info(". . . All worker threads finished") # No CR/LF before
453 454
        self._execStats.endExec()

455 456 457 458 459 460 461 462 463 464 465 466 467
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
        self._executedTasks = None
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


468 469
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
470

S
Steven Li 已提交
471 472 473 474 475 476
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
477
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
478
        wakeSeq = []
S
Shuduo Sang 已提交
479 480
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
481 482 483
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
484
        Logging.debug(
S
Shuduo Sang 已提交
485 486
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
487
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
488
        for i in wakeSeq:
S
Shuduo Sang 已提交
489 490 491
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
492

493
    def isRunning(self):
S
Shuduo Sang 已提交
494
        return self._te is not None
495

496 497 498 499 500 501
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
        if gConfig.max_dbs == 0:
            self._dbs.append(Database(0, dbc))
S
Steven Li 已提交
502 503
        else:            
            baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
504
                )*333) % 888 if gConfig.dynamic_db_table_names else 0
505
            for i in range(gConfig.max_dbs):
506
                self._dbs.append(Database(baseDbNumber + i, dbc))
507 508 509 510 511 512 513 514

    def pickDatabase(self):
        idxDb = 0
        if gConfig.max_dbs != 0 :
            idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
515
    def fetchTask(self) -> Task:
516 517 518
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
519
        if (not self.isRunning()):  # no task
520
            raise RuntimeError("Cannot fetch task when not running")
521

S
Shuduo Sang 已提交
522
        # pick a task type for current state
523
        db = self.pickDatabase()
524
        taskType = db.getStateMachine().pickTaskType() # dynamic name of class
525
        return taskType(self._execStats, db)  # create a task from it
526 527

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
528
        self._executedTasks = []  # should be under single thread
529 530 531 532

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
533

534
class ThreadPool:
535
    def __init__(self, numThreads, maxSteps):
536 537 538 539
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
540 541
        self.threadList = []  # type: List[WorkerThread]

542
    # starting to run all the threads, in locking steps
543
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
544 545
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
546
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
547
            workerThread.start()  # start, but should block immediately before step 0
548 549 550

    def joinAll(self):
        for workerThread in self.threadList:
551
            Logging.debug("Joining thread...")
552 553
            workerThread._thread.join()

554 555 556
    def cleanup(self):
        self.threadList = None # maybe clean up each?

557 558
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
559 560


S
Steven Li 已提交
561 562
class LinearQueue():
    def __init__(self):
563
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
564
        self.lastIndex = 0
S
Shuduo Sang 已提交
565 566
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
567

568
    def toText(self):
S
Shuduo Sang 已提交
569 570
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
571 572

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
573
    def push(self):
574
        with self._lock:
S
Shuduo Sang 已提交
575 576
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
577
            #     return self.firstIndex
578 579
            # Otherwise we have something
            self.lastIndex += 1
580 581
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
582
            return self.lastIndex
S
Steven Li 已提交
583 584

    def pop(self):
585
        with self._lock:
S
Shuduo Sang 已提交
586 587 588 589
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

590
            index = self.firstIndex
S
Shuduo Sang 已提交
591
            if (index in self.inUse):
592 593
                return False

594 595 596 597 598 599 600
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
601
        with self._lock:
602 603 604 605
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
606
    def allocate(self, i):
607
        with self._lock:
608
            # Logging.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
609 610 611
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
612 613
            self.inUse.add(i)

S
Steven Li 已提交
614
    def release(self, i):
615
        with self._lock:
616
            # Logging.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
617
            self.inUse.remove(i)  # KeyError possible, TODO: why?
618 619 620 621

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
622
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
623
        if (self.isEmpty()):
624 625
            return None
        with self._lock:
S
Shuduo Sang 已提交
626
            cnt = 0  # counting the interations
627 628
            while True:
                cnt += 1
S
Shuduo Sang 已提交
629
                if (cnt > self.size() * 10):  # 10x iteration already
630 631
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
632 633
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
634 635 636
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
637

638
class AnyState:
S
Shuduo Sang 已提交
639 640 641
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
642
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
643
    STATE_HAS_DATA = 3  # we have some data in the table
644 645 646 647
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
648 649 650
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
651 652
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
653 654 655 656 657 658 659
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
660 661
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
662

663 664
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
665 666 667
    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
668 669 670 671 672 673
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
674 675 676
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
677

678 679 680
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
681 682 683
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

684 685
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
686

687 688
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
689

690
    def canDropDb(self):
691 692 693 694
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
        if gConfig.max_dbs > 0 : 
            return False
695
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
696

697 698
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
699

700 701
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
702

703 704
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
705

706 707 708 709 710
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
711
        for task in tasks:
712 713 714
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
715
                # task.logDebug("Task success found")
716
                sCnt += 1
S
Shuduo Sang 已提交
717
                if (sCnt >= 2):
718
                    raise CrashGenError(
S
Shuduo Sang 已提交
719
                        "Unexpected more than 1 success with task: {}".format(cls))
720 721 722 723

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
724
        for task in tasks:
725 726
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
727
            exists = True  # we have a valid instance
728 729
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
730
        if (exists and sCnt <= 0):
731
            raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
S
Steven Li 已提交
732
                .format(cls, tasks))
733 734

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
735
        for task in tasks:
736
            if isinstance(task, cls):
S
Shuduo Sang 已提交
737 738
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
739 740

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
741
        for task in tasks:
742 743
            if isinstance(task, cls):
                if task.isSuccess():
744
                    raise CrashGenError(
S
Shuduo Sang 已提交
745
                        "Unexpected successful task: {}".format(cls))
746 747

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
748
        for task in tasks:
749 750 751 752 753 754
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
755
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
756
        for task in tasks:
S
Steven Li 已提交
757 758 759 760
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
761

762 763 764 765
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
766 767 768
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
769 770 771 772
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
773

774 775 776 777
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
778 779 780
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
781 782
        ]

S
Shuduo Sang 已提交
783 784
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
785
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
786 787 788 789
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

790 791 792 793 794 795 796 797 798 799 800

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
801 802 803
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
804 805 806 807 808

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
809

S
Shuduo Sang 已提交
810

811
class StateSuperTableOnly(AnyState):
812 813 814 815 816 817 818 819 820
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
821
        if (self.hasSuccess(tasks, TaskDropSuperTable)
822
                ):  # we are able to drop the table
823
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
824 825
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
826

827
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
828 829
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
830
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
831 832 833
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
834
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
835 836 837
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
838

S
Shuduo Sang 已提交
839

840 841 842 843 844 845 846 847 848 849
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
850
        if (newState.equals(AnyState.STATE_EMPTY)):
851
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
852 853 854 855
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
856
                ):  # without a create_db task
S
Shuduo Sang 已提交
857 858
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
859
            self.hasSuccess(tasks, TaskDropSuperTable)
860
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
861 862 863 864
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
865
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
866 867
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
868
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
869 870 871
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
872
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
873 874
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
875
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
876

S
Shuduo Sang 已提交
877

878
class StateMechine:
879 880 881
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
882
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
883

884 885
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
        self._curState = self._findCurrentState(dbc)  # starting state
886
        Logging.debug("Found Starting State: {}".format(self._curState))
887 888

    # TODO: seems no lnoger used, remove?
889 890 891
    def getCurrentState(self):
        return self._curState

892 893 894
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

895
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
896
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
897 898 899 900 901 902
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
903
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
904 905
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
906
            # t = tc(self) # create task object
907 908
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
909 910 911 912 913 914 915 916
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
917
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
918
                    taskTypes.append(tc)  # gather it
919 920

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
921 922 923
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
924
        Logging.debug(
S
Shuduo Sang 已提交
925 926 927
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
928 929
        return taskTypes

930
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
931
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
932 933
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
934
            Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
935
            return StateEmpty()
S
Shuduo Sang 已提交
936 937
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
938
        dbc.use(dbName)
939
        if not dbc.hasTables():  # no tables
940
            Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
941
            return StateDbOnly()
942

943
        # For sure we have tables, which means we must have the super table. # TODO: are we sure?
944
        sTable = self._db.getFixedSuperTable()
945
        if sTable.hasRegTables(dbc):  # no regular tables
946
            Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
947
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
948
        else:  # has actual tables
949
            Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
950 951
            return StateHasData()

952 953
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
S
Shuduo Sang 已提交
954
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
955
            Logging.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
956
            return  # do nothing
957

S
Shuduo Sang 已提交
958
        # this should show up in the server log, separating steps
959
        dbc.execute("show dnodes")
960 961 962 963

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
964 965
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
966 967

        if self._curState.canDropDb():
968
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
969
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
970 971
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
972 973 974

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
975 976
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
977 978 979

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
980 981
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
982 983

        # if self._state.canAddData():
S
Shuduo Sang 已提交
984 985
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
986 987 988 989

        # if self._state.canReadData():
            # Nothing for sure

990
        newState = self._findCurrentState(dbc)
991
        Logging.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
992 993
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
994 995 996
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
997 998
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
999 1000 1001
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1002 1003 1004
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1005
            else:
S
Shuduo Sang 已提交
1006 1007
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1008
        i = self._weighted_choice_sub(weights)
1009
        # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1010 1011
        return taskTypes[i]

S
Shuduo Sang 已提交
1012 1013 1014 1015 1016
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1017 1018 1019 1020
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1021

1022 1023 1024 1025 1026
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
1027 1028

        TODO: consider moving, but keep in mind it contains "StateMachine"
1029
    '''
1030 1031 1032 1033 1034
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
    _lastTick = 0
    _lastLaggingTick = 0 # lagging tick, for unsequenced insersions

1035 1036 1037 1038
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1039
          
1040
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1041

1042 1043
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1044

1045 1046
    def getDbNum(self):
        return self._dbNum
1047

1048 1049
    def getName(self):
        return "db_{}".format(self._dbNum)
1050

1051 1052 1053 1054 1055 1056
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1057

1058 1059 1060 1061 1062
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1063

1064 1065 1066 1067
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

1068 1069
    def getFixedSuperTable(self) -> TdSuperTable:
        return TdSuperTable(self.getFixedSuperTableName(), self.getName())
1070 1071 1072 1073 1074 1075

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1076
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1077 1078
    @classmethod
    def setupLastTick(cls):
1079
        t1 = datetime.datetime(2020, 6, 1)
1080
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1081 1082 1083 1084
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1085
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1086 1087 1088
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
S
Steven Li 已提交
1089
        Logging.debug("Setting up TICKS to start from: {}".format(t4))
1090 1091
        return t4

1092 1093 1094
    @classmethod
    def getNextTick(cls):        
        with cls._clsLock:  # prevent duplicate tick
S
Steven Li 已提交
1095
            if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
1096
                # 10k at 1/20 chance, should be enough to avoid overlaps
S
Steven Li 已提交
1097 1098 1099 1100
                tick = cls.setupLastTick()
                cls._lastTick = tick
                cls._lastLaggingTick = tick + datetime.timedelta(0, -10000)                 
                # if : # should be quite a bit into the future
1101 1102 1103 1104

            if Dice.throw(20) == 0:  # 1 in 20 chance, return lagging tick
                cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1105 1106
            else:  # regular
                # add one second to it
1107 1108
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1109 1110

    def getNextInt(self):
1111 1112 1113
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1114 1115

    def getNextBinary(self):
S
Shuduo Sang 已提交
1116 1117
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1118 1119

    def getNextFloat(self):
1120 1121 1122
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1123

1124 1125 1126 1127 1128
    ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']

    def getNextColor(self):
        return random.choice(self.ALL_COLORS)

1129

1130
class TaskExecutor():
1131
    class BoundedList:
S
Shuduo Sang 已提交
1132
        def __init__(self, size=10):
1133 1134
            self._size = size
            self._list = []
S
Steven Li 已提交
1135
            self._lock = threading.Lock()
1136

S
Shuduo Sang 已提交
1137
        def add(self, n: int):
S
Steven Li 已提交
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1164 1165 1166 1167 1168 1169

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1170 1171 1172
    def __init__(self, curStep):
        self._curStep = curStep

1173 1174 1175 1176
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1177 1178 1179
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1180
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1181
        task.execute(wt)
1182

1183 1184 1185 1186
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1187
    # def logInfo(self, msg):
1188
    #     Logging.info("    T[{}.x]: ".format(self._curStep) + msg)
1189

1190
    # def logDebug(self, msg):
1191
    #     Logging.debug("    T[{}.x]: ".format(self._curStep) + msg)
1192

S
Shuduo Sang 已提交
1193

S
Steven Li 已提交
1194
class Task():
1195 1196 1197 1198
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1199
    taskSn = 100
1200 1201
    _lock = threading.Lock()
    _tableLocks: Dict[str, threading.Lock] = {}
1202 1203 1204

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1205
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
1206
        # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
S
Steven Li 已提交
1207
        return Task.taskSn
1208

1209
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1210
        self._workerThread = None
1211
        self._err: Optional[Exception] = None
1212
        self._aborted = False
1213
        self._curStep = None
S
Shuduo Sang 已提交
1214
        self._numRows = None  # Number of rows affected
1215

S
Shuduo Sang 已提交
1216
        # Assign an incremental task serial number
1217
        self._taskNum = self.allocTaskNum()
1218
        # Logging.debug("Creating new task {}...".format(self._taskNum))
1219

1220
        self._execStats = execStats
1221
        self._db = db # A task is always associated/for a specific DB
1222

1223 1224
        

1225
    def isSuccess(self):
S
Shuduo Sang 已提交
1226
        return self._err is None
1227

1228 1229 1230
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1231
    def clone(self):  # TODO: why do we need this again?
1232
        newTask = self.__class__(self._execStats, self._db)
1233 1234
        return newTask

1235 1236 1237
    def getDb(self):
        return self._db

1238
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1239 1240 1241
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1242 1243

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1244 1245 1246
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1247

1248
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1249 1250 1251
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1252

1253 1254 1255 1256 1257
    def _isServiceStable(self):
        if not gSvcMgr:
            return True  # we don't run service, so let's assume it's stable
        return gSvcMgr.isStable() # otherwise let's examine the service

1258 1259 1260
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1261
                0x0B,  # Unable to establish connection, more details in TD-1648
1262
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1263
                0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
1264
                0x213, # "Disconnected from service", result of "kill connection ???"
1265
                0x217, # "db not selected", client side defined error code
1266 1267 1268 1269
                # 0x218, # "Table does not exist" client side defined error code
                0x360, # Table already exists
                0x362, 
                # 0x369, # tag already exists
1270 1271 1272 1273 1274 1275 1276
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1277
                0x14,   # db not ready, errno changed
1278
                0x600,  # Invalid table ID, why?
1279 1280 1281
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1282 1283 1284 1285 1286 1287 1288
        # This case handled below already.
        # elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
        #     return True # We may get "network unavilable" when restarting service
        elif gConfig.ignore_errors: # something is specified on command line
            moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
            if errno in moreErrnos:
                return True
1289 1290 1291
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1292 1293 1294 1295
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1296
        elif not self._isServiceStable(): # We are managing service, and ...
1297
            Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1298
            return True
1299 1300 1301 1302
        
        return False # Not an acceptable error


1303 1304
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1305
        self._workerThread = wt  # type: ignore
1306 1307

        te = wt.getTaskExecutor()
1308
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1309 1310
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1311

1312
        self._err = None # TODO: type hint mess up?
1313 1314
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1315 1316 1317

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1318
        try:
S
Shuduo Sang 已提交
1319
            self._executeInternal(te, wt)  # TODO: no return value?
1320
        except taos.error.ProgrammingError as err:
1321
            errno2 = Helper.convertErrno(err.errno)
1322
            if (gConfig.continue_on_exception):  # user choose to continue
1323
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1324
                        errno2, err, wt.getDbConn().getLastSql()))
1325
                self._err = err
1326 1327
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1328
                        errno2, err, wt.getDbConn().getLastSql()))
1329
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1330
                self._err = err
1331
            else: # not an acceptable error
1332 1333 1334
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1335
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1336
                if gConfig.debug:
1337 1338
                    # raise # so that we see full stack
                    traceback.print_exc()
1339 1340
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1341 1342 1343 1344
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1345
        except Exception as e:
S
Steven Li 已提交
1346
            Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
S
Shuduo Sang 已提交
1347
            self._err = e
S
Steven Li 已提交
1348
            self._aborted = True
1349
            traceback.print_exc()
1350
        except BaseException as e:
1351
            self.logInfo("Python base exception encountered")
1352
            self._err = e
1353
            self._aborted = True
S
Steven Li 已提交
1354
            traceback.print_exc()
1355
        except BaseException: # TODO: what is this again??!!
1356 1357 1358 1359 1360
            raise RuntimeError("Punt")
            # self.logDebug(
            #     "[=] Unexpected exception, SQL: {}".format(
            #         wt.getDbConn().getLastSql()))
            # raise
1361
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1362 1363 1364 1365

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1366
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1367

1368
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1369
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1370
        """ Haha """
1371 1372
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1373
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1374 1375
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1376
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1377 1378
        return wt.getQueryResult()

1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396
    def lockTable(self, ftName): # full table name
        # print(" <<" + ftName + '_', end="", flush=True)
        with Task._lock:
            if not ftName in Task._tableLocks:
                Task._tableLocks[ftName] = threading.Lock()
        
        Task._tableLocks[ftName].acquire()

    def unlockTable(self, ftName):
        # print('_' + ftName + ">> ", end="", flush=True)
        with Task._lock:
            if not ftName in self._tableLocks:
                raise RuntimeError("Corrupt state, no such lock")
            lock = Task._tableLocks[ftName]
            if not lock.locked():
                raise RuntimeError("Corrupte state, already unlocked")
        lock.release()

1397

1398
class ExecutionStats:
1399
    def __init__(self):
S
Shuduo Sang 已提交
1400 1401
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1402 1403 1404
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1405
        self._execStartTime = None
1406
        self._errors = {}
S
Shuduo Sang 已提交
1407 1408
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1409

1410 1411 1412
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1413
    def __str__(self):
S
Shuduo Sang 已提交
1414 1415
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1416 1417

    def isFailed(self):
S
Shuduo Sang 已提交
1418
        return self._failed
S
Steven Li 已提交
1419

1420 1421 1422 1423 1424 1425
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1426
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1427 1428
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1429 1430
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1431
        if isSuccess:
S
Shuduo Sang 已提交
1432
            t[1] += 1  # index 1 has the "success" execution times
1433 1434 1435 1436 1437
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1438 1439 1440

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1441 1442
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1443 1444 1445 1446 1447
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1448
            if self._tasksInProgress == 0:  # all tasks have stopped
1449 1450 1451
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1452 1453 1454 1455
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1456
    def printStats(self):
1457
        Logging.info(
S
Shuduo Sang 已提交
1458
            "----------------------------------------------------------------------")
1459
        Logging.info(
S
Shuduo Sang 已提交
1460 1461 1462
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1463
        Logging.info("| Task Execution Times (success/total):")
1464
        execTimesAny = 0.0
S
Shuduo Sang 已提交
1465
        for k, n in self._execTimes.items():
1466
            execTimesAny += n[0]
1467 1468 1469 1470 1471 1472 1473
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
1474
            Logging.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1475

1476
        Logging.info(
S
Shuduo Sang 已提交
1477
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
1478
        Logging.info(
S
Shuduo Sang 已提交
1479 1480
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
1481
        Logging.info(
S
Shuduo Sang 已提交
1482 1483
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
1484
        Logging.info(
S
Shuduo Sang 已提交
1485
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
1486
        Logging.info(
S
Shuduo Sang 已提交
1487 1488
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1489 1490 1491
        Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
        Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
1492 1493
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
1494 1495
        Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
        Logging.info(
S
Shuduo Sang 已提交
1496
            "----------------------------------------------------------------------")
1497 1498 1499


class StateTransitionTask(Task):
1500 1501 1502 1503 1504
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1505 1506
    _baseTableNumber = None

1507
    _endState = None # TODO: no longter used?
1508

1509
    @classmethod
S
Shuduo Sang 已提交
1510
    def getInfo(cls):  # each sub class should supply their own information
1511
        raise RuntimeError("Overriding method expected")
1512
    
1513
    @classmethod
S
Shuduo Sang 已提交
1514
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1515 1516
        raise RuntimeError("Overriding method expected")

1517 1518 1519
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1520

1521 1522 1523
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1524 1525

    @classmethod
1526 1527 1528
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1529

1530 1531
    @classmethod
    def getRegTableName(cls, i):
1532
        if ( StateTransitionTask._baseTableNumber is None): # Set it one time
S
Steven Li 已提交
1533 1534
            StateTransitionTask._baseTableNumber = Dice.throw(
                999) if gConfig.dynamic_db_table_names else 0
1535
        return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
1536

1537 1538
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1539 1540


1541
class TaskCreateDb(StateTransitionTask):
1542
    @classmethod
1543
    def getEndState(cls):
S
Shuduo Sang 已提交
1544
        return StateDbOnly()
1545

1546 1547 1548 1549
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1550
    # Actually creating the database(es)
1551
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1552
        # was: self.execWtSql(wt, "create database db")
1553 1554
        repStr = ""
        if gConfig.max_replicas != 1:
1555 1556
            # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
            numReplica = gConfig.max_replicas # fixed, always
1557 1558 1559
            repStr = "replica {}".format(numReplica)
        self.execWtSql(wt, "create database {} {}"
            .format(self._db.getName(), repStr) )
1560

1561
class TaskDropDb(StateTransitionTask):
1562
    @classmethod
1563 1564
    def getEndState(cls):
        return StateEmpty()
1565

1566 1567 1568 1569
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1570
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1571
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
1572
        Logging.debug("[OPS] database dropped at {}".format(time.time()))
1573

1574
class TaskCreateSuperTable(StateTransitionTask):
1575
    @classmethod
1576 1577
    def getEndState(cls):
        return StateSuperTableOnly()
1578

1579 1580
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1581
        return state.canCreateFixedSuperTable()
1582

1583
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1584
        if not self._db.exists(wt.getDbConn()):
1585
            Logging.debug("Skipping task, no DB yet")
1586 1587
            return

1588
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1589
        # wt.execSql("use db")    # should always be in place
S
Steven Li 已提交
1590

1591 1592
        sTable.create(wt.getDbConn(), 
            {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
S
Steven Li 已提交
1593 1594
            dropIfExists = True
            )
1595
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1596 1597
        # No need to create the regular tables, INSERT will do that
        # automatically
1598

S
Steven Li 已提交
1599

1600
class TdSuperTable:
1601
    def __init__(self, stName, dbName):
1602
        self._stName = stName
1603
        self._dbName = dbName
1604

1605 1606 1607
    def getName(self):
        return self._stName

1608 1609 1610
    def drop(self, dbc, skipCheck = False):
        dbName = self._dbName
        if self.exists(dbc) : # if myself exists
S
Steven Li 已提交
1611 1612 1613 1614 1615 1616
            fullTableName = dbName + '.' + self._stName                
            dbc.execute("DROP TABLE {}".format(fullTableName))
        else:
            if not skipCheck:
                raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))

1617 1618
    def exists(self, dbc):
        dbc.execute("USE " + self._dbName)
S
Steven Li 已提交
1619 1620
        return dbc.existsSuperTable(self._stName)

1621
    # TODO: odd semantic, create() method is usually static?
1622
    def create(self, dbc, cols: dict, tags: dict,
S
Steven Li 已提交
1623 1624
        dropIfExists = False
        ):
1625
        '''Creating a super table'''
1626 1627

        dbName = self._dbName
S
Steven Li 已提交
1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644
        dbc.execute("USE " + dbName)
        fullTableName = dbName + '.' + self._stName       
        if dbc.existsSuperTable(self._stName):
            if dropIfExists: 
                dbc.execute("DROP TABLE {}".format(fullTableName))
            else: # error
                raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
                 
        # Now let's create
        sql = "CREATE TABLE {} ({})".format(
            fullTableName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
        if tags is None :
            sql += " TAGS (dummy int) "
        else:
            sql += " TAGS ({})".format(
                ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
1645 1646 1647
            )
        dbc.execute(sql)        

1648 1649
    def getRegTables(self, dbc: DbConn):
        dbName = self._dbName
1650
        try:
1651
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1652
        except taos.error.ProgrammingError as err:                    
1653
            errno2 = Helper.convertErrno(err.errno) 
1654
            Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
1655 1656 1657 1658 1659
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1660 1661
    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
1662

1663 1664
    def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
        dbName = self._dbName
1665
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1666 1667
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1668 1669

        # acquire a lock first, so as to be able to *verify*. More details in TD-1471
S
Steven Li 已提交
1670 1671 1672
        fullTableName = dbName + '.' + regTableName      
        if task is not None:  # optional lock
            task.lockTable(fullTableName)
1673
        Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
S
Steven Li 已提交
1674
        # print("(" + fullTableName[-3:] + ")", end="", flush=True)  
1675 1676
        try:
            sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
1677
                fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
1678 1679 1680
            )
            dbc.execute(sql)
        finally:
S
Steven Li 已提交
1681 1682
            if task is not None:
                task.unlockTable(fullTableName) # no matter what
1683

1684 1685
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1699 1700
    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
1701 1702 1703 1704 1705 1706
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1707 1708
    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
1709 1710
            return
        # sTable.addTag("extraTag", "int")
1711 1712
        sql = "alter table {}.{} add tag {} {}".format(
            self._dbName, self._stName, tagName, tagType)
1713 1714
        dbc.execute(sql)

1715 1716
    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
1717
            return
1718
        sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
1719 1720
        dbc.execute(sql)

1721 1722
    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
1723 1724 1725 1726
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1727
        sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
1728 1729
        dbc.execute(sql)

1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778
    def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
        ''' Generate queries to test/exercise this super table '''
        ret = [] # type: List[SqlQuery]

        for rTbName in self.getRegTables(dbc):  # regular tables
            
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])

            # Run the query against the regular table first
            doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
            if not doAggr: # don't do aggregate query, just simple one
                ret.append(SqlQuery( # reg table
                    "select {} from {}.{}".format('*', self._dbName, rTbName)))
                ret.append(SqlQuery( # super table
                    "select {} from {}.{}".format('*', self._dbName, self.getName())))
            else: # Aggregate query
                aggExpr = Dice.choice([                
                    'count(*)',
                    'avg(speed)',
                    # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                    'sum(speed)', 
                    'stddev(speed)', 
                    # SELECTOR functions
                    'min(speed)', 
                    'max(speed)', 
                    'first(speed)', 
                    'last(speed)',
                    'top(speed, 50)', # TODO: not supported?
                    'bottom(speed, 50)', # TODO: not supported?
                    'apercentile(speed, 10)', # TODO: TD-1316
                    'last_row(speed)',
                    # Transformation Functions
                    # 'diff(speed)', # TODO: no supported?!
                    'spread(speed)'
                    ]) # TODO: add more from 'top'

            
                if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
                    sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
                    if Dice.throw(3) == 0: # 1 in X chance
                        sql = sql + ' GROUP BY color'
                        Progress.emit(Progress.QUERY_GROUP_BY)
                        # Logging.info("Executing GROUP-BY query: " + sql)
                    ret.append(SqlQuery(sql))

        return ret        

1779
class TaskReadData(StateTransitionTask):
1780
    @classmethod
1781
    def getEndState(cls):
S
Shuduo Sang 已提交
1782
        return None  # meaning doesn't affect state
1783

1784 1785 1786 1787
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1788 1789 1790 1791 1792
    # def _canRestartService(self):
    #     if not gSvcMgr:
    #         return True # always
    #     return gSvcMgr.isActive() # only if it's running TODO: race condition here

1793 1794
    def _reconnectIfNeeded(self, wt):
        # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
1795
        if random.randrange(20)==0: # and self._canRestartService():  # TODO: break connection in all situations
1796 1797
            # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
            Progress.emit(Progress.SERVICE_RECONNECT_START) 
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808
            try:
                wt.getDbConn().close()
                wt.getDbConn().open()
            except ConnectionError as err: # may fail
                if not gSvcMgr:
                    Logging.error("Failed to reconnect in client-only mode")
                    raise # Not OK if we are running in client-only mode
                if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to 
                    Logging.error("Failed to reconnect when managed server is running")
                    raise # Not OK if we are running normally

1809 1810 1811 1812 1813
                Progress.emit(Progress.SERVICE_RECONNECT_FAILURE) 
                # Logging.info("Ignoring DB reconnect error")

            # print("_r", end="", flush=True)
            Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS) 
1814 1815 1816 1817 1818
            # The above might have taken a lot of time, service might be running
            # by now, causing error below to be incorrectly handled due to timing issue
            return # TODO: fix server restart status race condtion


1819 1820 1821
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        self._reconnectIfNeeded(wt)

1822
        dbc = wt.getDbConn()
1823 1824 1825
        sTable = self._db.getFixedSuperTable()
        
        for q in sTable.generateQueries(dbc):  # regular tables            
1826
            try:
1827 1828 1829 1830
                sql = q.getSql()
                # if 'GROUP BY' in sql:
                #     Logging.info("Executing GROUP-BY query: " + sql)
                dbc.execute(sql)
1831
            except taos.error.ProgrammingError as err:                    
1832
                errno2 = Helper.convertErrno(err.errno)
1833
                Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1834
                raise
S
Shuduo Sang 已提交
1835

1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
class SqlQuery:
    @classmethod
    def buildRandom(cls, db: Database):
        '''Build a random query against a certain database'''
        
        dbName = db.getName()

    def __init__(self, sql:str = None):
        self._sql = sql

    def getSql(self):
        return self._sql
    
1849
class TaskDropSuperTable(StateTransitionTask):
1850
    @classmethod
1851
    def getEndState(cls):
S
Shuduo Sang 已提交
1852
        return StateDbOnly()
1853

1854 1855
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1856
        return state.canDropFixedSuperTable()
1857

1858
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1859
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
1860
        if Dice.throw(2) == 0:
1861
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
1862 1863 1864 1865
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1866
            isSuccess = True
S
Shuduo Sang 已提交
1867
            for i in tblSeq:
1868
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
1869
                try:
1870 1871
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
1872
                except taos.error.ProgrammingError as err:
1873 1874
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1875
                    if (errno2 in [0x362]):  # mnode invalid table name
1876
                        isSuccess = False
1877
                        Logging.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
1878
                    continue  # try to delete next regular table
1879 1880

                if (not tickOutput):
S
Shuduo Sang 已提交
1881 1882
                    tickOutput = True  # Print only one time
                    if isSuccess:
1883 1884
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1885
                        print("f", end="", flush=True)
1886 1887

        # Drop the super table itself
1888 1889
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
1890

S
Shuduo Sang 已提交
1891

1892 1893 1894
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1895
        return None  # meaning doesn't affect state
1896 1897 1898

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1899
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1900 1901

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1902 1903
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
1904
        sTable = self._db.getFixedSuperTable()
1905
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1906
        if dice == 0:
1907
            sTable.addTag(dbc, "extraTag", "int")
1908
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1909
        elif dice == 1:
1910
            sTable.dropTag(dbc, "extraTag")
1911
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1912
        elif dice == 2:
1913
            sTable.dropTag(dbc, "newTag")
1914
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1915
        else:  # dice == 3
1916
            sTable.changeTag(dbc, "extraTag", "newTag")
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1933
    CHANCE_TO_RESTART_SERVICE = 200
1934 1935 1936 1937
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
1938

1939 1940
        with self._classLock:
            if self._isRunning:
S
Steven Li 已提交
1941
                Logging.info("Skipping restart task, another running already")
1942 1943 1944
                return
            self._isRunning = True

1945
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
1946 1947 1948
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
1949

1950
        self._isRunning = False
S
Shuduo Sang 已提交
1951

1952
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1953 1954
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1955

1956 1957 1958
    # We use these two files to record operations to DB, useful for power-off tests
    fAddLogReady = None # type: TextIOWrapper
    fAddLogDone  = None # type: TextIOWrapper
1959 1960 1961

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
1962 1963
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
1964
                Logging.info(
S
Shuduo Sang 已提交
1965
                    "Recording in a file operations to be performed...")
1966
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
1967
            if (cls.fAddLogDone is None):
1968
                Logging.info("Recording in a file operations completed...")
1969
                cls.fAddLogDone = open("add_log_done.txt", "w")
1970

1971
    @classmethod
1972 1973
    def getEndState(cls):
        return StateHasData()
1974 1975 1976 1977

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
1978

1979 1980 1981 1982 1983 1984 1985 1986
    def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): 
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        
        fullTableName = db.getName() + '.' + regTableName

        sql = "insert into {} values ".format(fullTableName)
        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
1987 1988
            nextColor = db.getNextColor()
            sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
1989 1990
        dbc.execute(sql)

1991
    def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
1992 1993 1994 1995 1996
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        

        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
1997
            nextColor = db.getNextColor()
1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010
            if gConfig.record_ops:
                self.prepToRecordOps()
                self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                self.fAddLogReady.flush()
                os.fsync(self.fAddLogReady)
                
            # TODO: too ugly trying to lock the table reliably, refactor...
            fullTableName = db.getName() + '.' + regTableName
            if gConfig.verify_data:
                self.lockTable(fullTableName) 
                # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written

            try:
2011
                sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
2012 2013 2014
                    fullTableName,
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2015
                    nextTick, nextInt, nextColor)
2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032
                dbc.execute(sql)
            except: # Any exception at all
                if gConfig.verify_data:
                    self.unlockTable(fullTableName)     
                raise

            # Now read it back and verify, we might encounter an error if table is dropped
            if gConfig.verify_data: # only if command line asks for it
                try:
                    readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
                        format(db.getName(), regTableName, nextTick))
                    if readBack != nextInt :
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data, wrote: {}, read: {}"
                            .format(nextInt, readBack), 0x999)
                except taos.error.ProgrammingError as err:
                    errno = Helper.convertErrno(err.errno)
2033
                    if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT]  : # not a single result
2034 2035
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data for tick: {}, wrote: {}, read: {}"
2036
                            .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054
                            errno)
                    elif errno in [0x218, 0x362]: # table doesn't exist
                        # do nothing
                        dummy = 0
                    else:
                        # Re-throw otherwise
                        raise
                finally:
                    self.unlockTable(fullTableName) # Unlock the table no matter what

            # Successfully wrote the data into the DB, let's record it somehow
            te.recordDataMark(nextInt)

            if gConfig.record_ops:
                self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                self.fAddLogDone.flush()
                os.fsync(self.fAddLogDone)

2055
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2056 2057
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2058
        dbc = wt.getDbConn()
2059 2060 2061 2062
        numTables  = self.LARGE_NUMBER_OF_TABLES  if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
        tblSeq = list(range(numTables ))
        random.shuffle(tblSeq) # now we have random sequence
S
Shuduo Sang 已提交
2063 2064
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2065
                print("x", end="", flush=True) # concurrent insertion
2066
            else:
S
Shuduo Sang 已提交
2067
                self.activeTable.add(i)  # marking it active
2068
            
2069
            dbName = db.getName()
2070
            sTable = db.getFixedSuperTable()
2071
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)            
2072
            fullTableName = dbName + '.' + regTableName
2073
            # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
2074
            sTable.ensureTable(self, wt.getDbConn(), regTableName)  # Ensure the table exists           
2075
            # self._unlockTable(fullTableName)
2076
           
2077 2078 2079 2080
            if Dice.throw(1) == 0: # 1 in 2 chance
                self._addData(db, dbc, regTableName, te)
            else:
                self._addDataInBatch(db, dbc, regTableName, te)
2081

S
Shuduo Sang 已提交
2082
            self.activeTable.discard(i)  # not raising an error, unlike remove
2083 2084


2085 2086 2087 2088 2089 2090 2091 2092 2093
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
2094
        for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
2106
            print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
2107
            stackFrame = 0
2108
            for frame in stack: # was using: reversed(stack)
2109
                # print(frame)
2110 2111
                print("[{sf}] File {filename}, line {lineno}, in {name}".format(
                    sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
2112
                print("    {}".format(frame.line))
2113
                stackFrame += 1
2114
            print("-----> End of Thread Info ----->\n")
S
Shuduo Sang 已提交
2115

2116 2117
class ClientManager:
    def __init__(self):
S
Steven Li 已提交
2118
        Logging.info("Starting service manager")
2119 2120
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2121

2122
        self._status = Status.STATUS_RUNNING
2123 2124
        self.tc = None

2125 2126
        self.inSigHandler = False

2127
    def sigIntHandler(self, signalNumber, frame):
2128
        if self._status != Status.STATUS_RUNNING:
2129 2130 2131
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2132
        self._status = Status.STATUS_STOPPING  # immediately set our status
2133

2134
        print("ClientManager: Terminating program...")
2135 2136
        self.tc.requestToStop()

2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2207

2208
    def run(self, svcMgr):    
2209
        # self._printLastNumbers()
2210
        global gConfig
2211

2212 2213 2214 2215
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

2216
        dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget())  # Regular function
2217
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2218
        self.tc = ThreadCoordinator(thPool, dbManager)
2219
        
S
Steven Li 已提交
2220
        Logging.info("Starting client instance: {}".format(tInst))
2221
        self.tc.run()
S
Steven Li 已提交
2222 2223
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2224
        if svcMgr: # gConfig.auto_start_service:
2225
            svcMgr.stopTaosServices()
2226
            svcMgr = None
2227
        
2228 2229 2230 2231 2232

        # Release global variables
        gConfig = None
        gSvcMgr = None
        logger = None
2233 2234 2235 2236
        
        thPool = None
        dbManager.cleanUp() # destructor wouldn't run in time
        dbManager = None
2237

2238 2239 2240 2241 2242 2243
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()
2244 2245 2246 2247 2248 2249 2250 2251 2252
        # Release variables here
        self.tc = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
2253 2254

    def conclude(self):
2255
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
2256
        self.tc.printStats()
2257

2258
class MainExec:
2259 2260
    def __init__(self):        
        self._clientMgr = None
2261
        self._svcMgr = None # type: ServiceManager
2262

2263 2264 2265
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2266

2267 2268 2269 2270 2271 2272 2273
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
2274
        if  self._svcMgr:
2275
            self._svcMgr.sigIntHandler(signalNumber, frame)
2276
        if  self._clientMgr:
2277 2278 2279
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2280
        global gSvcMgr
2281
        if gConfig.auto_start_service:
2282 2283
            gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
            gSvcMgr.startTaosServices() # we start, don't run
2284 2285
        
        self._clientMgr = ClientManager()
2286 2287 2288 2289
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
2290
            Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
2291 2292
            # don't raise
        return ret
2293 2294

    def runService(self):
2295
        global gSvcMgr
2296
        gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
2297

2298 2299
        gSvcMgr.run() # run to some end state
        gSvcMgr = self._svcMgr = None 
2300

2301 2302 2303 2304
    def init(self): # TODO: refactor
        global gContainer
        gContainer = Container() # micky-mouse DI

2305 2306 2307
        global gSvcMgr # TODO: refactor away
        gSvcMgr = None

2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348
        # Super cool Python argument library:
        # https://docs.python.org/3/library/argparse.html
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawDescriptionHelpFormatter,
            description=textwrap.dedent('''\
                TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
                ---------------------------------------------------------------------
                1. You build TDengine in the top level ./build directory, as described in offical docs
                2. You run the server there before this script: ./build/bin/taosd -c test/cfg

                '''))                      

        parser.add_argument(
            '-a',
            '--auto-start-service',
            action='store_true',
            help='Automatically start/stop the TDengine service (default: false)')
        parser.add_argument(
            '-b',
            '--max-dbs',
            action='store',
            default=0,
            type=int,
            help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
        parser.add_argument(
            '-c',
            '--connector-type',
            action='store',
            default='native',
            type=str,
            help='Connector type to use: native, rest, or mixed (default: 10)')
        parser.add_argument(
            '-d',
            '--debug',
            action='store_true',
            help='Turn on DEBUG mode for more logging (default: false)')
        parser.add_argument(
            '-e',
            '--run-tdengine',
            action='store_true',
            help='Run TDengine service in foreground (default: false)')
2349 2350 2351 2352 2353 2354 2355
        parser.add_argument(
            '-g',
            '--ignore-errors',
            action='store',
            default=None,
            type=str,
            help='Ignore error codes, comma separated, 0x supported (default: None)')
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367
        parser.add_argument(
            '-i',
            '--max-replicas',
            action='store',
            default=1,
            type=int,
            help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
        parser.add_argument(
            '-l',
            '--larger-data',
            action='store_true',
            help='Write larger amount of data during write operations (default: false)')
2368 2369 2370 2371
        parser.add_argument(
            '-n',
            '--dynamic-db-table-names',
            action='store_true',
2372
            help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)')        
2373 2374 2375 2376 2377 2378 2379
        parser.add_argument(
            '-o',
            '--num-dnodes',
            action='store',
            default=1,
            type=int,
            help='Number of Dnodes to initialize, used with -e option. (default: 1)')
2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416
        parser.add_argument(
            '-p',
            '--per-thread-db-connection',
            action='store_true',
            help='Use a single shared db connection (default: false)')
        parser.add_argument(
            '-r',
            '--record-ops',
            action='store_true',
            help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
        parser.add_argument(
            '-s',
            '--max-steps',
            action='store',
            default=1000,
            type=int,
            help='Maximum number of steps to run (default: 100)')
        parser.add_argument(
            '-t',
            '--num-threads',
            action='store',
            default=5,
            type=int,
            help='Number of threads to run (default: 10)')
        parser.add_argument(
            '-v',
            '--verify-data',
            action='store_true',
            help='Verify data written in a number of places by reading back (default: false)')
        parser.add_argument(
            '-x',
            '--continue-on-exception',
            action='store_true',
            help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')

        global gConfig
        gConfig = parser.parse_args()
S
Shuduo Sang 已提交
2417

2418
        Logging.clsInit(gConfig)
2419 2420 2421 2422 2423

        Dice.seed(0)  # initial seeding of dice

    def run(self):
        if gConfig.run_tdengine:  # run server
2424 2425 2426 2427 2428 2429
            try:
                self.runService()
                return 0 # success
            except ConnectionError as err:
                Logging.error("Failed to make DB connection, please check DB instance manually")
            return -1 # failure
2430 2431
        else:
            return self.runClient()
S
Steven Li 已提交
2432

2433

2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
2455