crash_gen_main.py 96.6 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
17

S
Shuduo Sang 已提交
18 19 20
from typing import Set
from typing import Dict
from typing import List
21
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
22

S
Shuduo Sang 已提交
23 24
import textwrap
import time
25
import datetime
S
Shuduo Sang 已提交
26
import random
27
import logging
S
Shuduo Sang 已提交
28 29 30 31
import threading
import copy
import argparse
import getopt
32

S
Steven Li 已提交
33
import sys
34
import os
35
import signal
36
import traceback
37 38 39
import resource
from guppy import hpy
import gc
40

S
Steven Li 已提交
41 42 43
from crash_gen.service_manager import ServiceManager, TdeInstance
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
44 45 46

import taos
import requests
47

48 49 50 51
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
52
# Global variables, tried to keep a small number.
53 54 55

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
56 57
gConfig:    argparse.Namespace 
gSvcMgr:    ServiceManager # TODO: refactor this hack, use dep injection
58
# logger:     logging.Logger
59
gContainer: Container
S
Steven Li 已提交
60

61 62
# def runThread(wt: WorkerThread):
#     wt.run()
63

64

S
Steven Li 已提交
65
class WorkerThread:
66 67 68 69
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
        """
            Note: this runs in the main thread context
        """                 
S
Shuduo Sang 已提交
70
        # self._curStep = -1
71
        self._pool = pool
S
Shuduo Sang 已提交
72 73
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
74
        # self.threadIdent = threading.get_ident()
75 76
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
77
        self._stepGate = threading.Event()
S
Steven Li 已提交
78

79
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
80
        if (gConfig.per_thread_db_connection):  # type: ignore
81
            # print("connector_type = {}".format(gConfig.connector_type))
82 83 84
            tInst = gContainer.defTdeInstance
            if gConfig.connector_type == 'native':                
                self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
85
            elif gConfig.connector_type == 'rest':
86
                self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
87 88 89 90 91 92 93
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
94

95
        # self._dbInUse = False  # if "use db" was executed already
96

97
    def logDebug(self, msg):
98
        Logging.debug("    TRD[{}] {}".format(self._tid, msg))
99 100

    def logInfo(self, msg):
101
        Logging.info("    TRD[{}] {}".format(self._tid, msg))
102

103 104
    # def dbInUse(self):
    #     return self._dbInUse
105

106 107 108 109
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
110

111
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
112
        return self._tc.getTaskExecutor()
113

S
Steven Li 已提交
114
    def start(self):
115
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
116

S
Shuduo Sang 已提交
117
    def run(self):
S
Steven Li 已提交
118
        # initialization after thread starts, in the thread context
119
        # self.isSleeping = False
120
        Logging.info("Starting to run thread: {}".format(self._tid))
121

S
Shuduo Sang 已提交
122
        if (gConfig.per_thread_db_connection):  # type: ignore
123
            Logging.debug("Worker thread openning database connection")
124
            self._dbConn.open()
S
Steven Li 已提交
125

S
Shuduo Sang 已提交
126 127
        self._doTaskLoop()

128
        # clean up
S
Shuduo Sang 已提交
129
        if (gConfig.per_thread_db_connection):  # type: ignore
130 131 132
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
133
                Logging.warning("Cleaning up worker thread, dbConn already closed")
134

S
Shuduo Sang 已提交
135
    def _doTaskLoop(self):
136 137
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
138 139
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
140 141 142
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
143
                print("_bto", end="")
144
                Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
145 146
                break

147
            Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
148
            self.crossStepGate()   # then per-thread gate, after being tapped
149
            Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
150
            if not self._tc.isRunning():
151
                print("_wts", end="")
152
                Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
153 154
                break

155
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
156
            try:
157 158 159
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
160
                # self.useDb() # might encounter exceptions. TODO: catch
161 162
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
163
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
164 165 166 167 168 169
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

170
            # Fetch a task from the Thread Coordinator
171
            Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
172
            task = tc.fetchTask()
173 174

            # Execute such a task
175
            Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
176
                    self._tid, task.__class__.__name__))
177
            task.execute(self)
178
            tc.saveExecutedTask(task)
179
            Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
180

181
            # self._dbInUse = False  # there may be changes between steps
182
        # print("_wtd", end=None) # worker thread died
183

S
Shuduo Sang 已提交
184 185
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
186 187
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
188 189
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
190 191 192
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
193
        if (not self._thread.is_alive()):
S
Steven Li 已提交
194 195
            raise RuntimeError("Unexpected dead thread")

196
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
197 198
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
199 200
        self.verifyThreadSelf()  # only allowed by ourselves

201
        # Wait again at the "gate", waiting to be "tapped"
202
        Logging.debug(
S
Shuduo Sang 已提交
203 204 205
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
206
        self._stepGate.clear()
S
Shuduo Sang 已提交
207

208
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
209

S
Shuduo Sang 已提交
210
    def tapStepGate(self):  # give it a tap, release the thread waiting there
211
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
212 213
        self.verifyThreadMain()  # only allowed for main thread

214
        if self._thread.is_alive():
215
            Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
216 217 218 219
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
220

S
Shuduo Sang 已提交
221
    def execSql(self, sql):  # TODO: expose DbConn directly
222
        return self.getDbConn().execute(sql)
223

S
Shuduo Sang 已提交
224
    def querySql(self, sql):  # TODO: expose DbConn directly
225
        return self.getDbConn().query(sql)
226 227

    def getQueryResult(self):
228
        return self.getDbConn().getQueryResult()
229

230
    def getDbConn(self) -> DbConn :
S
Shuduo Sang 已提交
231 232
        if (gConfig.per_thread_db_connection):
            return self._dbConn
233
        else:
234
            return self._tc.getDbManager().getDbConn()
235

236 237
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
238
    #         return self._dbConn.query(sql)
239 240
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
241

242
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
243 244


245
class ThreadCoordinator:
246
    WORKER_THREAD_TIMEOUT = 120  # Normal: 120
247

248
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
249
        self._curStep = -1  # first step is 0
250
        self._pool = pool
251
        # self._wd = wd
S
Shuduo Sang 已提交
252
        self._te = None  # prepare for every new step
253
        self._dbManager = dbManager
S
Shuduo Sang 已提交
254 255
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
256

S
Shuduo Sang 已提交
257 258
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
259
        self._execStats = ExecutionStats()
260
        self._runStatus = Status.STATUS_RUNNING
261
        self._initDbs()
S
Steven Li 已提交
262

263 264 265
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
266
    def getDbManager(self) -> DbManager:
267
        return self._dbManager
268

269 270
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
271

272
    def requestToStop(self):
273
        self._runStatus = Status.STATUS_STOPPING
274 275
        self._execStats.registerFailure("User Interruption")

276
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
277 278 279
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
280
        if self._runStatus != Status.STATUS_RUNNING:
281 282 283 284 285
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
286 287
        if workerTimeout:
            return True
288 289 290 291 292 293 294 295 296 297 298 299 300
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
301
        Logging.debug(
302 303 304 305 306 307 308
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

309
        Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
310 311
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
312
        self.tapAllThreads() # release all worker thread from their "gates"
313 314 315 316 317

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
318
        Logging.debug("[TRD] Main thread about to cross the barrier")
319
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
320
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
321
        Logging.debug("[TRD] Main thread finished crossing the barrier")
322 323 324 325

    def _doTransition(self):
        transitionFailed = False
        try:
326 327 328
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
329
                Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
330 331 332
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
333
                Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
334 335

            # Due to limitation (or maybe not) of the TD Python library,
336
            # we cannot share connections across threads
337 338 339 340
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
341
            #         Logging.debug("[DB] use db for all worker threads")
342
            #         t.useDb()
343 344
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
345

346 347
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
348
                Logging.info("DB connection broken, execution failed")
349 350 351 352 353 354
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
355 356 357 358 359 360
            if isinstance(err, CrashGenError): # our own transition failure
                Logging.info("State transition error")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("State transition error")
361 362
            else:
                raise
S
Steven Li 已提交
363
        # return transitionFailed # Why did we have this??!!
364 365 366

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
367
        Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
368 369
        return transitionFailed

S
Shuduo Sang 已提交
370
    def run(self):
371
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
372 373

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
374
        self._curStep = -1  # not started yet
375
        
S
Shuduo Sang 已提交
376
        self._execStats.startExec()  # start the stop watch
377 378
        transitionFailed = False
        hasAbortedTask = False
379 380
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
381 382 383
            if not gConfig.debug: # print this only if we are not in debug mode    
                Progress.emit(Progress.STEP_BOUNDARY)            
                # print(".", end="", flush=True)
384 385 386 387 388 389 390 391
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
392
                        
393 394
            try:
                self._syncAtBarrier() # For now just cross the barrier
395
                Progress.emit(Progress.END_THREAD_STEP)
396 397
            except threading.BrokenBarrierError as err:
                self._execStats.registerFailure("Aborted due to worker thread timeout")
398 399 400 401
                Logging.error("\n")
                Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
                    ThreadCoordinator.WORKER_THREAD_TIMEOUT))
                Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
402 403 404
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
405 406 407 408 409 410

                # Enable below for deadlock debugging, using gdb to attach to process
                # while True:
                #     Logging.error("Deadlock detected")
                #     time.sleep(60.0)

411
                break
412 413

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
414 415
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
416 417
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
418
                Logging.info("Aborted task encountered, exiting test program")
419
                self._execStats.registerFailure("Aborted Task Encountered")
420
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
421

422
            # Ending previous step
423 424 425 426
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
427
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
428
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
429
                Logging.info(errMsg)
430
                traceback.print_exc()
S
Steven Li 已提交
431
                self._execStats.registerFailure(errMsg)
432

433
            # Then we move on to the next step
434
            Progress.emit(Progress.BEGIN_THREAD_STEP)
435
            self._releaseAllWorkerThreads(transitionFailed)                    
436

437
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
438
            Logging.debug("Abnormal ending of main thraed")
439
        elif workerTimeout:
440
            Logging.debug("Abnormal ending of main thread, due to worker timeout")
441
        else: # regular ending, workers waiting at "barrier"
442
            Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
443
            self._syncAtBarrier()
444

445
        self._te = None  # No more executor, time to end
446
        Logging.debug("Main thread tapping all threads one last time...")
447
        self.tapAllThreads()  # Let the threads run one last time
448

449 450
        Logging.debug("\r\n\n--> Main thread ready to finish up...")
        Logging.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
451
        self._pool.joinAll()  # Get all threads to finish
S
Steven Li 已提交
452
        Logging.info(". . . All worker threads finished") # No CR/LF before
453 454
        self._execStats.endExec()

455 456 457 458 459 460 461 462 463 464 465 466 467
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
        self._executedTasks = None
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


468 469
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
470

S
Steven Li 已提交
471 472 473 474 475 476
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
477
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
478
        wakeSeq = []
S
Shuduo Sang 已提交
479 480
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
481 482 483
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
484
        Logging.debug(
S
Shuduo Sang 已提交
485 486
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
487
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
488
        for i in wakeSeq:
S
Shuduo Sang 已提交
489 490 491
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
492

493
    def isRunning(self):
S
Shuduo Sang 已提交
494
        return self._te is not None
495

496 497 498 499 500 501
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
        if gConfig.max_dbs == 0:
            self._dbs.append(Database(0, dbc))
S
Steven Li 已提交
502 503
        else:            
            baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
504
                )*333) % 888 if gConfig.dynamic_db_table_names else 0
505
            for i in range(gConfig.max_dbs):
506
                self._dbs.append(Database(baseDbNumber + i, dbc))
507 508 509 510 511 512 513 514

    def pickDatabase(self):
        idxDb = 0
        if gConfig.max_dbs != 0 :
            idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
515
    def fetchTask(self) -> Task:
516 517 518
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
519
        if (not self.isRunning()):  # no task
520
            raise RuntimeError("Cannot fetch task when not running")
521

S
Shuduo Sang 已提交
522
        # pick a task type for current state
523
        db = self.pickDatabase()
524
        taskType = db.getStateMachine().pickTaskType() # dynamic name of class
525
        return taskType(self._execStats, db)  # create a task from it
526 527

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
528
        self._executedTasks = []  # should be under single thread
529 530 531 532

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
533

534
class ThreadPool:
535
    def __init__(self, numThreads, maxSteps):
536 537 538 539
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
540 541
        self.threadList = []  # type: List[WorkerThread]

542
    # starting to run all the threads, in locking steps
543
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
544 545
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
546
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
547
            workerThread.start()  # start, but should block immediately before step 0
548 549 550

    def joinAll(self):
        for workerThread in self.threadList:
551
            Logging.debug("Joining thread...")
552 553
            workerThread._thread.join()

554 555 556
    def cleanup(self):
        self.threadList = None # maybe clean up each?

557 558
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
559 560


S
Steven Li 已提交
561 562
class LinearQueue():
    def __init__(self):
563
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
564
        self.lastIndex = 0
S
Shuduo Sang 已提交
565 566
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
567

568
    def toText(self):
S
Shuduo Sang 已提交
569 570
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
571 572

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
573
    def push(self):
574
        with self._lock:
S
Shuduo Sang 已提交
575 576
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
577
            #     return self.firstIndex
578 579
            # Otherwise we have something
            self.lastIndex += 1
580 581
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
582
            return self.lastIndex
S
Steven Li 已提交
583 584

    def pop(self):
585
        with self._lock:
S
Shuduo Sang 已提交
586 587 588 589
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

590
            index = self.firstIndex
S
Shuduo Sang 已提交
591
            if (index in self.inUse):
592 593
                return False

594 595 596 597 598 599 600
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
601
        with self._lock:
602 603 604 605
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
606
    def allocate(self, i):
607
        with self._lock:
608
            # Logging.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
609 610 611
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
612 613
            self.inUse.add(i)

S
Steven Li 已提交
614
    def release(self, i):
615
        with self._lock:
616
            # Logging.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
617
            self.inUse.remove(i)  # KeyError possible, TODO: why?
618 619 620 621

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
622
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
623
        if (self.isEmpty()):
624 625
            return None
        with self._lock:
S
Shuduo Sang 已提交
626
            cnt = 0  # counting the interations
627 628
            while True:
                cnt += 1
S
Shuduo Sang 已提交
629
                if (cnt > self.size() * 10):  # 10x iteration already
630 631
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
632 633
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
634 635 636
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
637

638
class AnyState:
S
Shuduo Sang 已提交
639 640 641
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
642
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
643
    STATE_HAS_DATA = 3  # we have some data in the table
644 645 646 647
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
648 649 650
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
651 652
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
653 654 655 656 657 658 659
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
660 661
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
662

663 664
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
665 666 667
    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
668 669 670 671 672 673
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
674 675 676
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
677

678 679 680
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
681 682 683
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

684 685
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
686

687 688
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
689

690
    def canDropDb(self):
691 692 693 694
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
        if gConfig.max_dbs > 0 : 
            return False
695
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
696

697 698
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
699

700 701
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
702

703 704
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
705

706 707 708 709 710
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
711
        for task in tasks:
712 713 714
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
715
                # task.logDebug("Task success found")
716
                sCnt += 1
S
Shuduo Sang 已提交
717
                if (sCnt >= 2):
718
                    raise CrashGenError(
S
Shuduo Sang 已提交
719
                        "Unexpected more than 1 success with task: {}".format(cls))
720 721 722 723

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
724
        for task in tasks:
725 726
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
727
            exists = True  # we have a valid instance
728 729
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
730
        if (exists and sCnt <= 0):
731
            raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
S
Steven Li 已提交
732
                .format(cls, tasks))
733 734

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
735
        for task in tasks:
736
            if isinstance(task, cls):
S
Shuduo Sang 已提交
737 738
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
739 740

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
741
        for task in tasks:
742 743
            if isinstance(task, cls):
                if task.isSuccess():
744
                    raise CrashGenError(
S
Shuduo Sang 已提交
745
                        "Unexpected successful task: {}".format(cls))
746 747

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
748
        for task in tasks:
749 750 751 752 753 754
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
755
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
756
        for task in tasks:
S
Steven Li 已提交
757 758 759 760
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
761

762 763 764 765
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
766 767 768
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
769 770 771 772
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
773

774 775 776 777
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
778 779 780
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
781 782
        ]

S
Shuduo Sang 已提交
783 784
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
785
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
786 787 788 789
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

790 791 792 793 794 795 796 797 798 799 800

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
801 802 803
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
804 805 806 807 808

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
809

S
Shuduo Sang 已提交
810

811
class StateSuperTableOnly(AnyState):
812 813 814 815 816 817 818 819 820
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
821
        if (self.hasSuccess(tasks, TaskDropSuperTable)
822
                ):  # we are able to drop the table
823
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
824 825
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
826

827
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
828 829
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
830
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
831 832 833
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
834
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
835 836 837
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
838

S
Shuduo Sang 已提交
839

840 841 842 843 844 845 846 847 848 849
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
850
        if (newState.equals(AnyState.STATE_EMPTY)):
851
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
852 853 854 855
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
856
                ):  # without a create_db task
S
Shuduo Sang 已提交
857 858
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
859
            self.hasSuccess(tasks, TaskDropSuperTable)
860
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
861 862 863 864
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
865
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
866 867
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
868
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
869 870 871
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
872
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
873 874
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
875
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
876

S
Shuduo Sang 已提交
877

878
class StateMechine:
879 880 881
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
882
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
883

884
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
885 886 887 888 889 890
        try:
            self._curState = self._findCurrentState(dbc)  # starting state
        except taos.error.ProgrammingError as err:            
            Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
            traceback.print_stack()
            raise # re-throw
891 892

    # TODO: seems no lnoger used, remove?
893 894 895
    def getCurrentState(self):
        return self._curState

896 897 898
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

899
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
900
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
901 902 903 904 905 906
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
907
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
908 909
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
910
            # t = tc(self) # create task object
911 912
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
913 914 915 916 917 918 919 920
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
921
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
922
                    taskTypes.append(tc)  # gather it
923 924

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
925 926 927
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
928
        Logging.debug(
S
Shuduo Sang 已提交
929 930 931
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
932 933
        return taskTypes

934
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
935
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
936 937
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
938
            Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
939
            return StateEmpty()
S
Shuduo Sang 已提交
940 941
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
942
        dbc.use(dbName)
943
        if not dbc.hasTables():  # no tables
944
            Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
945
            return StateDbOnly()
946

947
        # For sure we have tables, which means we must have the super table. # TODO: are we sure?
948
        sTable = self._db.getFixedSuperTable()
949
        if sTable.hasRegTables(dbc):  # no regular tables
950
            Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
951
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
952
        else:  # has actual tables
953
            Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
954 955
            return StateHasData()

956 957
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
S
Shuduo Sang 已提交
958
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
959
            Logging.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
960
            return  # do nothing
961

S
Shuduo Sang 已提交
962
        # this should show up in the server log, separating steps
963
        dbc.execute("show dnodes")
964 965 966 967

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
968 969
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
970 971

        if self._curState.canDropDb():
972
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
973
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
974 975
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
976 977 978

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
979 980
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
981 982 983

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
984 985
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
986 987

        # if self._state.canAddData():
S
Shuduo Sang 已提交
988 989
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
990 991 992 993

        # if self._state.canReadData():
            # Nothing for sure

994
        newState = self._findCurrentState(dbc)
995
        Logging.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
996 997
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
998 999 1000
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1001 1002
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1003 1004 1005
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1006 1007 1008
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1009
            else:
S
Shuduo Sang 已提交
1010 1011
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1012
        i = self._weighted_choice_sub(weights)
1013
        # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1014 1015
        return taskTypes[i]

S
Shuduo Sang 已提交
1016 1017 1018 1019 1020
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1021 1022 1023 1024
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1025

1026 1027 1028 1029 1030
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
1031 1032

        TODO: consider moving, but keep in mind it contains "StateMachine"
1033
    '''
1034 1035 1036 1037 1038
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
    _lastTick = 0
    _lastLaggingTick = 0 # lagging tick, for unsequenced insersions

1039 1040 1041 1042
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1043
          
1044
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1045

1046 1047
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1048

1049 1050
    def getDbNum(self):
        return self._dbNum
1051

1052 1053
    def getName(self):
        return "db_{}".format(self._dbNum)
1054

1055 1056 1057 1058 1059 1060
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1061

1062 1063 1064 1065 1066
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1067

1068 1069 1070 1071
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

1072 1073
    def getFixedSuperTable(self) -> TdSuperTable:
        return TdSuperTable(self.getFixedSuperTableName(), self.getName())
1074 1075 1076 1077 1078 1079

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1080
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1081 1082
    @classmethod
    def setupLastTick(cls):
1083
        t1 = datetime.datetime(2020, 6, 1)
1084
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1085 1086 1087 1088
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1089
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1090 1091 1092
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
S
Steven Li 已提交
1093
        Logging.debug("Setting up TICKS to start from: {}".format(t4))
1094 1095
        return t4

1096 1097 1098
    @classmethod
    def getNextTick(cls):        
        with cls._clsLock:  # prevent duplicate tick
S
Steven Li 已提交
1099
            if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
1100
                # 10k at 1/20 chance, should be enough to avoid overlaps
S
Steven Li 已提交
1101 1102 1103 1104
                tick = cls.setupLastTick()
                cls._lastTick = tick
                cls._lastLaggingTick = tick + datetime.timedelta(0, -10000)                 
                # if : # should be quite a bit into the future
1105 1106 1107 1108

            if Dice.throw(20) == 0:  # 1 in 20 chance, return lagging tick
                cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1109 1110
            else:  # regular
                # add one second to it
1111 1112
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1113 1114

    def getNextInt(self):
1115 1116 1117
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1118 1119

    def getNextBinary(self):
S
Shuduo Sang 已提交
1120 1121
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1122 1123

    def getNextFloat(self):
1124 1125 1126
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1127

1128 1129 1130 1131 1132
    ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']

    def getNextColor(self):
        return random.choice(self.ALL_COLORS)

1133

1134
class TaskExecutor():
1135
    class BoundedList:
S
Shuduo Sang 已提交
1136
        def __init__(self, size=10):
1137 1138
            self._size = size
            self._list = []
S
Steven Li 已提交
1139
            self._lock = threading.Lock()
1140

S
Shuduo Sang 已提交
1141
        def add(self, n: int):
S
Steven Li 已提交
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1168 1169 1170 1171 1172 1173

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1174 1175 1176
    def __init__(self, curStep):
        self._curStep = curStep

1177 1178 1179 1180
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1181 1182 1183
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1184
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1185
        task.execute(wt)
1186

1187 1188 1189 1190
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1191
    # def logInfo(self, msg):
1192
    #     Logging.info("    T[{}.x]: ".format(self._curStep) + msg)
1193

1194
    # def logDebug(self, msg):
1195
    #     Logging.debug("    T[{}.x]: ".format(self._curStep) + msg)
1196

S
Shuduo Sang 已提交
1197

S
Steven Li 已提交
1198
class Task():
1199 1200 1201 1202
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1203
    taskSn = 100
1204 1205
    _lock = threading.Lock()
    _tableLocks: Dict[str, threading.Lock] = {}
1206 1207 1208

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1209
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
1210
        # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
S
Steven Li 已提交
1211
        return Task.taskSn
1212

1213
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1214
        self._workerThread = None
1215
        self._err: Optional[Exception] = None
1216
        self._aborted = False
1217
        self._curStep = None
S
Shuduo Sang 已提交
1218
        self._numRows = None  # Number of rows affected
1219

S
Shuduo Sang 已提交
1220
        # Assign an incremental task serial number
1221
        self._taskNum = self.allocTaskNum()
1222
        # Logging.debug("Creating new task {}...".format(self._taskNum))
1223

1224
        self._execStats = execStats
1225
        self._db = db # A task is always associated/for a specific DB
1226

1227 1228
        

1229
    def isSuccess(self):
S
Shuduo Sang 已提交
1230
        return self._err is None
1231

1232 1233 1234
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1235
    def clone(self):  # TODO: why do we need this again?
1236
        newTask = self.__class__(self._execStats, self._db)
1237 1238
        return newTask

1239 1240 1241
    def getDb(self):
        return self._db

1242
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1243 1244 1245
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1246 1247

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1248 1249 1250
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1251

1252
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1253 1254 1255
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1256

1257 1258 1259 1260 1261
    def _isServiceStable(self):
        if not gSvcMgr:
            return True  # we don't run service, so let's assume it's stable
        return gSvcMgr.isStable() # otherwise let's examine the service

1262 1263 1264
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1265
                0x0B,  # Unable to establish connection, more details in TD-1648
1266
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1267
                0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
1268
                0x213, # "Disconnected from service", result of "kill connection ???"
1269
                0x217, # "db not selected", client side defined error code
1270 1271 1272 1273
                # 0x218, # "Table does not exist" client side defined error code
                0x360, # Table already exists
                0x362, 
                # 0x369, # tag already exists
1274 1275 1276 1277 1278 1279 1280
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1281
                0x14,   # db not ready, errno changed
1282
                0x600,  # Invalid table ID, why?
1283
                0x218,  # Table does not exist
1284 1285 1286
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1287 1288 1289 1290 1291 1292 1293
        # This case handled below already.
        # elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
        #     return True # We may get "network unavilable" when restarting service
        elif gConfig.ignore_errors: # something is specified on command line
            moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
            if errno in moreErrnos:
                return True
1294 1295 1296
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1297 1298 1299 1300
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1301
        elif not self._isServiceStable(): # We are managing service, and ...
1302
            Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1303
            return True
1304 1305 1306 1307
        
        return False # Not an acceptable error


1308 1309
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1310
        self._workerThread = wt  # type: ignore
1311 1312

        te = wt.getTaskExecutor()
1313
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1314 1315
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1316

1317
        self._err = None # TODO: type hint mess up?
1318 1319
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1320 1321 1322

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1323
        try:
S
Shuduo Sang 已提交
1324
            self._executeInternal(te, wt)  # TODO: no return value?
1325
        except taos.error.ProgrammingError as err:
1326
            errno2 = Helper.convertErrno(err.errno)
1327
            if (gConfig.continue_on_exception):  # user choose to continue
1328
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1329
                        errno2, err, wt.getDbConn().getLastSql()))
1330
                self._err = err
1331 1332
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1333
                        errno2, err, wt.getDbConn().getLastSql()))
1334
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1335
                self._err = err
1336
            else: # not an acceptable error
1337 1338 1339
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1340
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1341
                if gConfig.debug:
1342 1343
                    # raise # so that we see full stack
                    traceback.print_exc()
1344 1345
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1346 1347 1348 1349
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1350
        except Exception as e:
S
Steven Li 已提交
1351
            Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
S
Shuduo Sang 已提交
1352
            self._err = e
S
Steven Li 已提交
1353
            self._aborted = True
1354
            traceback.print_exc()
1355
        except BaseException as e:
1356
            self.logInfo("Python base exception encountered")
1357
            self._err = e
1358
            self._aborted = True
S
Steven Li 已提交
1359
            traceback.print_exc()
1360
        except BaseException: # TODO: what is this again??!!
1361 1362 1363 1364 1365
            raise RuntimeError("Punt")
            # self.logDebug(
            #     "[=] Unexpected exception, SQL: {}".format(
            #         wt.getDbConn().getLastSql()))
            # raise
1366
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1367 1368 1369 1370

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1371
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1372

1373
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1374
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1375
        """ Haha """
1376 1377
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1378
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1379 1380
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1381
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1382 1383
        return wt.getQueryResult()

1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
    def lockTable(self, ftName): # full table name
        # print(" <<" + ftName + '_', end="", flush=True)
        with Task._lock:
            if not ftName in Task._tableLocks:
                Task._tableLocks[ftName] = threading.Lock()
        
        Task._tableLocks[ftName].acquire()

    def unlockTable(self, ftName):
        # print('_' + ftName + ">> ", end="", flush=True)
        with Task._lock:
            if not ftName in self._tableLocks:
                raise RuntimeError("Corrupt state, no such lock")
            lock = Task._tableLocks[ftName]
            if not lock.locked():
                raise RuntimeError("Corrupte state, already unlocked")
        lock.release()

1402

1403
class ExecutionStats:
1404
    def __init__(self):
S
Shuduo Sang 已提交
1405 1406
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1407 1408 1409
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1410
        self._execStartTime = None
1411
        self._errors = {}
S
Shuduo Sang 已提交
1412 1413
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1414

1415 1416 1417
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1418
    def __str__(self):
S
Shuduo Sang 已提交
1419 1420
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1421 1422

    def isFailed(self):
S
Shuduo Sang 已提交
1423
        return self._failed
S
Steven Li 已提交
1424

1425 1426 1427 1428 1429 1430
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1431
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1432 1433
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1434 1435
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1436
        if isSuccess:
S
Shuduo Sang 已提交
1437
            t[1] += 1  # index 1 has the "success" execution times
1438 1439 1440 1441 1442
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1443 1444 1445

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1446 1447
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1448 1449 1450 1451 1452
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1453
            if self._tasksInProgress == 0:  # all tasks have stopped
1454 1455 1456
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1457 1458 1459 1460
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1461
    def printStats(self):
1462
        Logging.info(
S
Shuduo Sang 已提交
1463
            "----------------------------------------------------------------------")
1464
        Logging.info(
S
Shuduo Sang 已提交
1465 1466 1467
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1468
        Logging.info("| Task Execution Times (success/total):")
1469
        execTimesAny = 0.0
S
Shuduo Sang 已提交
1470
        for k, n in self._execTimes.items():
1471
            execTimesAny += n[0]
1472 1473 1474 1475 1476 1477 1478
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
1479
            Logging.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1480

1481
        Logging.info(
S
Shuduo Sang 已提交
1482
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
1483
        Logging.info(
S
Shuduo Sang 已提交
1484 1485
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
1486
        Logging.info(
S
Shuduo Sang 已提交
1487 1488
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
1489
        Logging.info(
S
Shuduo Sang 已提交
1490
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
1491
        Logging.info(
S
Shuduo Sang 已提交
1492 1493
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1494 1495 1496
        Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
        Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
1497 1498
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
1499 1500
        Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
        Logging.info(
S
Shuduo Sang 已提交
1501
            "----------------------------------------------------------------------")
1502 1503 1504


class StateTransitionTask(Task):
1505 1506 1507 1508 1509
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1510 1511
    _baseTableNumber = None

1512
    _endState = None # TODO: no longter used?
1513

1514
    @classmethod
S
Shuduo Sang 已提交
1515
    def getInfo(cls):  # each sub class should supply their own information
1516
        raise RuntimeError("Overriding method expected")
1517
    
1518
    @classmethod
S
Shuduo Sang 已提交
1519
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1520 1521
        raise RuntimeError("Overriding method expected")

1522 1523 1524
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1525

1526 1527 1528
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1529 1530

    @classmethod
1531 1532 1533
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1534

1535 1536
    @classmethod
    def getRegTableName(cls, i):
1537
        if ( StateTransitionTask._baseTableNumber is None): # Set it one time
S
Steven Li 已提交
1538 1539
            StateTransitionTask._baseTableNumber = Dice.throw(
                999) if gConfig.dynamic_db_table_names else 0
1540
        return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
1541

1542 1543
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1544 1545


1546
class TaskCreateDb(StateTransitionTask):
1547
    @classmethod
1548
    def getEndState(cls):
S
Shuduo Sang 已提交
1549
        return StateDbOnly()
1550

1551 1552 1553 1554
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1555
    # Actually creating the database(es)
1556
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1557
        # was: self.execWtSql(wt, "create database db")
1558 1559
        repStr = ""
        if gConfig.max_replicas != 1:
1560 1561
            # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
            numReplica = gConfig.max_replicas # fixed, always
1562 1563 1564
            repStr = "replica {}".format(numReplica)
        self.execWtSql(wt, "create database {} {}"
            .format(self._db.getName(), repStr) )
1565

1566
class TaskDropDb(StateTransitionTask):
1567
    @classmethod
1568 1569
    def getEndState(cls):
        return StateEmpty()
1570

1571 1572 1573 1574
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1575
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1576
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
1577
        Logging.debug("[OPS] database dropped at {}".format(time.time()))
1578

1579
class TaskCreateSuperTable(StateTransitionTask):
1580
    @classmethod
1581 1582
    def getEndState(cls):
        return StateSuperTableOnly()
1583

1584 1585
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1586
        return state.canCreateFixedSuperTable()
1587

1588
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1589
        if not self._db.exists(wt.getDbConn()):
1590
            Logging.debug("Skipping task, no DB yet")
1591 1592
            return

1593
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1594
        # wt.execSql("use db")    # should always be in place
S
Steven Li 已提交
1595

1596 1597
        sTable.create(wt.getDbConn(), 
            {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
S
Steven Li 已提交
1598 1599
            dropIfExists = True
            )
1600
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1601 1602
        # No need to create the regular tables, INSERT will do that
        # automatically
1603

S
Steven Li 已提交
1604

1605
class TdSuperTable:
1606
    def __init__(self, stName, dbName):
1607
        self._stName = stName
1608
        self._dbName = dbName
1609

1610 1611 1612
    def getName(self):
        return self._stName

1613 1614 1615
    def drop(self, dbc, skipCheck = False):
        dbName = self._dbName
        if self.exists(dbc) : # if myself exists
S
Steven Li 已提交
1616 1617 1618 1619 1620 1621
            fullTableName = dbName + '.' + self._stName                
            dbc.execute("DROP TABLE {}".format(fullTableName))
        else:
            if not skipCheck:
                raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))

1622 1623
    def exists(self, dbc):
        dbc.execute("USE " + self._dbName)
S
Steven Li 已提交
1624 1625
        return dbc.existsSuperTable(self._stName)

1626
    # TODO: odd semantic, create() method is usually static?
1627
    def create(self, dbc, cols: dict, tags: dict,
S
Steven Li 已提交
1628 1629
        dropIfExists = False
        ):
1630
        '''Creating a super table'''
1631 1632

        dbName = self._dbName
S
Steven Li 已提交
1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649
        dbc.execute("USE " + dbName)
        fullTableName = dbName + '.' + self._stName       
        if dbc.existsSuperTable(self._stName):
            if dropIfExists: 
                dbc.execute("DROP TABLE {}".format(fullTableName))
            else: # error
                raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
                 
        # Now let's create
        sql = "CREATE TABLE {} ({})".format(
            fullTableName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
        if tags is None :
            sql += " TAGS (dummy int) "
        else:
            sql += " TAGS ({})".format(
                ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
1650 1651 1652
            )
        dbc.execute(sql)        

1653 1654
    def getRegTables(self, dbc: DbConn):
        dbName = self._dbName
1655
        try:
1656
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1657
        except taos.error.ProgrammingError as err:                    
1658
            errno2 = Helper.convertErrno(err.errno) 
1659
            Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
1660 1661 1662 1663 1664
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1665 1666
    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
1667

1668 1669
    def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
        dbName = self._dbName
1670
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1671 1672
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1673 1674

        # acquire a lock first, so as to be able to *verify*. More details in TD-1471
S
Steven Li 已提交
1675 1676 1677
        fullTableName = dbName + '.' + regTableName      
        if task is not None:  # optional lock
            task.lockTable(fullTableName)
1678
        Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
S
Steven Li 已提交
1679
        # print("(" + fullTableName[-3:] + ")", end="", flush=True)  
1680 1681
        try:
            sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
1682
                fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
1683 1684 1685
            )
            dbc.execute(sql)
        finally:
S
Steven Li 已提交
1686 1687
            if task is not None:
                task.unlockTable(fullTableName) # no matter what
1688

1689 1690
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1704 1705
    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
1706 1707 1708 1709 1710 1711
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1712 1713
    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
1714 1715
            return
        # sTable.addTag("extraTag", "int")
1716 1717
        sql = "alter table {}.{} add tag {} {}".format(
            self._dbName, self._stName, tagName, tagType)
1718 1719
        dbc.execute(sql)

1720 1721
    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
1722
            return
1723
        sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
1724 1725
        dbc.execute(sql)

1726 1727
    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
1728 1729 1730 1731
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1732
        sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
1733 1734
        dbc.execute(sql)

1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
    def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
        ''' Generate queries to test/exercise this super table '''
        ret = [] # type: List[SqlQuery]

        for rTbName in self.getRegTables(dbc):  # regular tables
            
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])

            # Run the query against the regular table first
            doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
            if not doAggr: # don't do aggregate query, just simple one
                ret.append(SqlQuery( # reg table
                    "select {} from {}.{}".format('*', self._dbName, rTbName)))
                ret.append(SqlQuery( # super table
                    "select {} from {}.{}".format('*', self._dbName, self.getName())))
            else: # Aggregate query
                aggExpr = Dice.choice([                
                    'count(*)',
                    'avg(speed)',
                    # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                    'sum(speed)', 
                    'stddev(speed)', 
                    # SELECTOR functions
                    'min(speed)', 
                    'max(speed)', 
                    'first(speed)', 
                    'last(speed)',
                    'top(speed, 50)', # TODO: not supported?
                    'bottom(speed, 50)', # TODO: not supported?
                    'apercentile(speed, 10)', # TODO: TD-1316
                    'last_row(speed)',
                    # Transformation Functions
                    # 'diff(speed)', # TODO: no supported?!
                    'spread(speed)'
                    ]) # TODO: add more from 'top'

            
                if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
                    sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
                    if Dice.throw(3) == 0: # 1 in X chance
                        sql = sql + ' GROUP BY color'
                        Progress.emit(Progress.QUERY_GROUP_BY)
                        # Logging.info("Executing GROUP-BY query: " + sql)
                    ret.append(SqlQuery(sql))

        return ret        

1784
class TaskReadData(StateTransitionTask):
1785
    @classmethod
1786
    def getEndState(cls):
S
Shuduo Sang 已提交
1787
        return None  # meaning doesn't affect state
1788

1789 1790 1791 1792
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1793 1794 1795 1796 1797
    # def _canRestartService(self):
    #     if not gSvcMgr:
    #         return True # always
    #     return gSvcMgr.isActive() # only if it's running TODO: race condition here

1798 1799
    def _reconnectIfNeeded(self, wt):
        # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
1800
        if random.randrange(20)==0: # and self._canRestartService():  # TODO: break connection in all situations
1801 1802
            # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
            Progress.emit(Progress.SERVICE_RECONNECT_START) 
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
            try:
                wt.getDbConn().close()
                wt.getDbConn().open()
            except ConnectionError as err: # may fail
                if not gSvcMgr:
                    Logging.error("Failed to reconnect in client-only mode")
                    raise # Not OK if we are running in client-only mode
                if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to 
                    Logging.error("Failed to reconnect when managed server is running")
                    raise # Not OK if we are running normally

1814 1815 1816 1817 1818
                Progress.emit(Progress.SERVICE_RECONNECT_FAILURE) 
                # Logging.info("Ignoring DB reconnect error")

            # print("_r", end="", flush=True)
            Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS) 
1819 1820 1821 1822 1823
            # The above might have taken a lot of time, service might be running
            # by now, causing error below to be incorrectly handled due to timing issue
            return # TODO: fix server restart status race condtion


1824 1825 1826
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        self._reconnectIfNeeded(wt)

1827
        dbc = wt.getDbConn()
1828 1829 1830
        sTable = self._db.getFixedSuperTable()
        
        for q in sTable.generateQueries(dbc):  # regular tables            
1831
            try:
1832 1833 1834 1835
                sql = q.getSql()
                # if 'GROUP BY' in sql:
                #     Logging.info("Executing GROUP-BY query: " + sql)
                dbc.execute(sql)
1836
            except taos.error.ProgrammingError as err:                    
1837
                errno2 = Helper.convertErrno(err.errno)
1838
                Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1839
                raise
S
Shuduo Sang 已提交
1840

1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853
class SqlQuery:
    @classmethod
    def buildRandom(cls, db: Database):
        '''Build a random query against a certain database'''
        
        dbName = db.getName()

    def __init__(self, sql:str = None):
        self._sql = sql

    def getSql(self):
        return self._sql
    
1854
class TaskDropSuperTable(StateTransitionTask):
1855
    @classmethod
1856
    def getEndState(cls):
S
Shuduo Sang 已提交
1857
        return StateDbOnly()
1858

1859 1860
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1861
        return state.canDropFixedSuperTable()
1862

1863
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1864
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
1865
        if Dice.throw(2) == 0:
1866
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
1867 1868 1869 1870
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1871
            isSuccess = True
S
Shuduo Sang 已提交
1872
            for i in tblSeq:
1873
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
1874
                try:
1875 1876
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
1877
                except taos.error.ProgrammingError as err:
1878 1879
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1880
                    if (errno2 in [0x362]):  # mnode invalid table name
1881
                        isSuccess = False
1882
                        Logging.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
1883
                    continue  # try to delete next regular table
1884 1885

                if (not tickOutput):
S
Shuduo Sang 已提交
1886 1887
                    tickOutput = True  # Print only one time
                    if isSuccess:
1888 1889
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1890
                        print("f", end="", flush=True)
1891 1892

        # Drop the super table itself
1893 1894
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
1895

S
Shuduo Sang 已提交
1896

1897 1898 1899
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1900
        return None  # meaning doesn't affect state
1901 1902 1903

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1904
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1905 1906

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1907 1908
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
1909
        sTable = self._db.getFixedSuperTable()
1910
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1911
        if dice == 0:
1912
            sTable.addTag(dbc, "extraTag", "int")
1913
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1914
        elif dice == 1:
1915
            sTable.dropTag(dbc, "extraTag")
1916
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1917
        elif dice == 2:
1918
            sTable.dropTag(dbc, "newTag")
1919
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1920
        else:  # dice == 3
1921
            sTable.changeTag(dbc, "extraTag", "newTag")
1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1938
    CHANCE_TO_RESTART_SERVICE = 200
1939 1940 1941 1942
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
1943

1944 1945
        with self._classLock:
            if self._isRunning:
S
Steven Li 已提交
1946
                Logging.info("Skipping restart task, another running already")
1947 1948 1949
                return
            self._isRunning = True

1950
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
1951 1952 1953
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
1954

1955
        self._isRunning = False
S
Shuduo Sang 已提交
1956

1957
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1958 1959
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1960

1961 1962 1963
    # We use these two files to record operations to DB, useful for power-off tests
    fAddLogReady = None # type: TextIOWrapper
    fAddLogDone  = None # type: TextIOWrapper
1964 1965 1966

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
1967 1968
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
1969
                Logging.info(
S
Shuduo Sang 已提交
1970
                    "Recording in a file operations to be performed...")
1971
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
1972
            if (cls.fAddLogDone is None):
1973
                Logging.info("Recording in a file operations completed...")
1974
                cls.fAddLogDone = open("add_log_done.txt", "w")
1975

1976
    @classmethod
1977 1978
    def getEndState(cls):
        return StateHasData()
1979 1980 1981 1982

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
1983

1984 1985 1986 1987 1988 1989 1990 1991
    def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): 
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        
        fullTableName = db.getName() + '.' + regTableName

        sql = "insert into {} values ".format(fullTableName)
        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
1992 1993
            nextColor = db.getNextColor()
            sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
1994 1995
        dbc.execute(sql)

1996
    def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
1997 1998 1999 2000 2001
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        

        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2002
            nextColor = db.getNextColor()
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015
            if gConfig.record_ops:
                self.prepToRecordOps()
                self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                self.fAddLogReady.flush()
                os.fsync(self.fAddLogReady)
                
            # TODO: too ugly trying to lock the table reliably, refactor...
            fullTableName = db.getName() + '.' + regTableName
            if gConfig.verify_data:
                self.lockTable(fullTableName) 
                # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written

            try:
2016
                sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
2017 2018 2019
                    fullTableName,
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2020
                    nextTick, nextInt, nextColor)
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037
                dbc.execute(sql)
            except: # Any exception at all
                if gConfig.verify_data:
                    self.unlockTable(fullTableName)     
                raise

            # Now read it back and verify, we might encounter an error if table is dropped
            if gConfig.verify_data: # only if command line asks for it
                try:
                    readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
                        format(db.getName(), regTableName, nextTick))
                    if readBack != nextInt :
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data, wrote: {}, read: {}"
                            .format(nextInt, readBack), 0x999)
                except taos.error.ProgrammingError as err:
                    errno = Helper.convertErrno(err.errno)
2038
                    if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT]  : # not a single result
2039 2040
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data for tick: {}, wrote: {}, read: {}"
2041
                            .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059
                            errno)
                    elif errno in [0x218, 0x362]: # table doesn't exist
                        # do nothing
                        dummy = 0
                    else:
                        # Re-throw otherwise
                        raise
                finally:
                    self.unlockTable(fullTableName) # Unlock the table no matter what

            # Successfully wrote the data into the DB, let's record it somehow
            te.recordDataMark(nextInt)

            if gConfig.record_ops:
                self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                self.fAddLogDone.flush()
                os.fsync(self.fAddLogDone)

2060
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2061 2062
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2063
        dbc = wt.getDbConn()
2064 2065 2066 2067
        numTables  = self.LARGE_NUMBER_OF_TABLES  if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
        tblSeq = list(range(numTables ))
        random.shuffle(tblSeq) # now we have random sequence
S
Shuduo Sang 已提交
2068 2069
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2070
                print("x", end="", flush=True) # concurrent insertion
2071
            else:
S
Shuduo Sang 已提交
2072
                self.activeTable.add(i)  # marking it active
2073
            
2074
            dbName = db.getName()
2075
            sTable = db.getFixedSuperTable()
2076
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)            
2077
            fullTableName = dbName + '.' + regTableName
2078
            # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
2079
            sTable.ensureTable(self, wt.getDbConn(), regTableName)  # Ensure the table exists           
2080
            # self._unlockTable(fullTableName)
2081
           
2082 2083 2084 2085
            if Dice.throw(1) == 0: # 1 in 2 chance
                self._addData(db, dbc, regTableName, te)
            else:
                self._addDataInBatch(db, dbc, regTableName, te)
2086

S
Shuduo Sang 已提交
2087
            self.activeTable.discard(i)  # not raising an error, unlike remove
2088 2089


2090 2091 2092 2093 2094 2095 2096 2097 2098
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
2099
        for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
2111
            print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
2112
            stackFrame = 0
2113
            for frame in stack: # was using: reversed(stack)
2114
                # print(frame)
2115 2116
                print("[{sf}] File {filename}, line {lineno}, in {name}".format(
                    sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
2117
                print("    {}".format(frame.line))
2118
                stackFrame += 1
2119
            print("-----> End of Thread Info ----->\n")
S
Shuduo Sang 已提交
2120

2121 2122
class ClientManager:
    def __init__(self):
S
Steven Li 已提交
2123
        Logging.info("Starting service manager")
2124 2125
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2126

2127
        self._status = Status.STATUS_RUNNING
2128 2129
        self.tc = None

2130 2131
        self.inSigHandler = False

2132
    def sigIntHandler(self, signalNumber, frame):
2133
        if self._status != Status.STATUS_RUNNING:
2134 2135 2136
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2137
        self._status = Status.STATUS_STOPPING  # immediately set our status
2138

2139
        print("ClientManager: Terminating program...")
2140 2141
        self.tc.requestToStop()

2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2212

2213
    def run(self, svcMgr):    
2214
        # self._printLastNumbers()
2215
        global gConfig
2216

2217 2218 2219 2220
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

2221
        dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget())  # Regular function
2222
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2223
        self.tc = ThreadCoordinator(thPool, dbManager)
2224
        
S
Steven Li 已提交
2225
        Logging.info("Starting client instance: {}".format(tInst))
2226
        self.tc.run()
S
Steven Li 已提交
2227 2228
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2229
        if svcMgr: # gConfig.auto_start_service:
2230
            svcMgr.stopTaosServices()
2231
            svcMgr = None
2232
        
2233 2234 2235 2236 2237

        # Release global variables
        gConfig = None
        gSvcMgr = None
        logger = None
2238 2239 2240 2241
        
        thPool = None
        dbManager.cleanUp() # destructor wouldn't run in time
        dbManager = None
2242

2243 2244 2245 2246 2247 2248
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()
2249 2250 2251 2252 2253 2254 2255 2256 2257
        # Release variables here
        self.tc = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
2258 2259

    def conclude(self):
2260
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
2261
        self.tc.printStats()
2262

2263
class MainExec:
2264 2265
    def __init__(self):        
        self._clientMgr = None
2266
        self._svcMgr = None # type: ServiceManager
2267

2268 2269 2270
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2271

2272 2273 2274 2275 2276 2277 2278
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
2279
        if  self._svcMgr:
2280
            self._svcMgr.sigIntHandler(signalNumber, frame)
2281
        if  self._clientMgr:
2282 2283 2284
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2285
        global gSvcMgr
2286
        if gConfig.auto_start_service:
2287 2288
            gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
            gSvcMgr.startTaosServices() # we start, don't run
2289 2290
        
        self._clientMgr = ClientManager()
2291 2292 2293 2294
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
2295
            Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
2296 2297
            # don't raise
        return ret
2298 2299

    def runService(self):
2300
        global gSvcMgr
2301
        gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
2302

2303 2304
        gSvcMgr.run() # run to some end state
        gSvcMgr = self._svcMgr = None 
2305

2306 2307 2308 2309
    def init(self): # TODO: refactor
        global gContainer
        gContainer = Container() # micky-mouse DI

2310 2311 2312
        global gSvcMgr # TODO: refactor away
        gSvcMgr = None

2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353
        # Super cool Python argument library:
        # https://docs.python.org/3/library/argparse.html
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawDescriptionHelpFormatter,
            description=textwrap.dedent('''\
                TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
                ---------------------------------------------------------------------
                1. You build TDengine in the top level ./build directory, as described in offical docs
                2. You run the server there before this script: ./build/bin/taosd -c test/cfg

                '''))                      

        parser.add_argument(
            '-a',
            '--auto-start-service',
            action='store_true',
            help='Automatically start/stop the TDengine service (default: false)')
        parser.add_argument(
            '-b',
            '--max-dbs',
            action='store',
            default=0,
            type=int,
            help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
        parser.add_argument(
            '-c',
            '--connector-type',
            action='store',
            default='native',
            type=str,
            help='Connector type to use: native, rest, or mixed (default: 10)')
        parser.add_argument(
            '-d',
            '--debug',
            action='store_true',
            help='Turn on DEBUG mode for more logging (default: false)')
        parser.add_argument(
            '-e',
            '--run-tdengine',
            action='store_true',
            help='Run TDengine service in foreground (default: false)')
2354 2355 2356 2357 2358 2359 2360
        parser.add_argument(
            '-g',
            '--ignore-errors',
            action='store',
            default=None,
            type=str,
            help='Ignore error codes, comma separated, 0x supported (default: None)')
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372
        parser.add_argument(
            '-i',
            '--max-replicas',
            action='store',
            default=1,
            type=int,
            help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
        parser.add_argument(
            '-l',
            '--larger-data',
            action='store_true',
            help='Write larger amount of data during write operations (default: false)')
2373 2374 2375 2376
        parser.add_argument(
            '-n',
            '--dynamic-db-table-names',
            action='store_true',
2377
            help='Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)')        
2378 2379 2380 2381 2382 2383 2384
        parser.add_argument(
            '-o',
            '--num-dnodes',
            action='store',
            default=1,
            type=int,
            help='Number of Dnodes to initialize, used with -e option. (default: 1)')
2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
        parser.add_argument(
            '-p',
            '--per-thread-db-connection',
            action='store_true',
            help='Use a single shared db connection (default: false)')
        parser.add_argument(
            '-r',
            '--record-ops',
            action='store_true',
            help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
        parser.add_argument(
            '-s',
            '--max-steps',
            action='store',
            default=1000,
            type=int,
            help='Maximum number of steps to run (default: 100)')
        parser.add_argument(
            '-t',
            '--num-threads',
            action='store',
            default=5,
            type=int,
            help='Number of threads to run (default: 10)')
        parser.add_argument(
            '-v',
            '--verify-data',
            action='store_true',
            help='Verify data written in a number of places by reading back (default: false)')
        parser.add_argument(
            '-x',
            '--continue-on-exception',
            action='store_true',
            help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')

        global gConfig
        gConfig = parser.parse_args()
S
Shuduo Sang 已提交
2422

2423
        Logging.clsInit(gConfig)
2424 2425 2426 2427 2428

        Dice.seed(0)  # initial seeding of dice

    def run(self):
        if gConfig.run_tdengine:  # run server
2429 2430 2431 2432 2433 2434
            try:
                self.runService()
                return 0 # success
            except ConnectionError as err:
                Logging.error("Failed to make DB connection, please check DB instance manually")
            return -1 # failure
2435 2436
        else:
            return self.runClient()
S
Steven Li 已提交
2437

2438

2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
2460