crash_gen_main.py 99.4 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
17

S
Shuduo Sang 已提交
18 19 20
from typing import Set
from typing import Dict
from typing import List
21
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
22

S
Shuduo Sang 已提交
23 24
import textwrap
import time
25
import datetime
S
Shuduo Sang 已提交
26
import random
27
import logging
S
Shuduo Sang 已提交
28 29 30 31
import threading
import copy
import argparse
import getopt
32

S
Steven Li 已提交
33
import sys
34
import os
35
import signal
36
import traceback
37
import resource
38
# from guppy import hpy
39
import gc
40

S
Steven Li 已提交
41 42 43
from crash_gen.service_manager import ServiceManager, TdeInstance
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
44
import crash_gen.settings 
45 46 47

import taos
import requests
48

49 50
crash_gen.settings.init()

51 52 53 54
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
55
# Global variables, tried to keep a small number.
56 57 58

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
59 60
gConfig:    argparse.Namespace 
gSvcMgr:    ServiceManager # TODO: refactor this hack, use dep injection
61
# logger:     logging.Logger
62
gContainer: Container
S
Steven Li 已提交
63

64 65
# def runThread(wt: WorkerThread):
#     wt.run()
66

67

S
Steven Li 已提交
68
class WorkerThread:
69 70 71 72
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
        """
            Note: this runs in the main thread context
        """                 
S
Shuduo Sang 已提交
73
        # self._curStep = -1
74
        self._pool = pool
S
Shuduo Sang 已提交
75 76
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
77
        # self.threadIdent = threading.get_ident()
78 79
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
80
        self._stepGate = threading.Event()
S
Steven Li 已提交
81

82
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
83
        if (gConfig.per_thread_db_connection):  # type: ignore
84
            # print("connector_type = {}".format(gConfig.connector_type))
85 86 87
            tInst = gContainer.defTdeInstance
            if gConfig.connector_type == 'native':                
                self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
88
            elif gConfig.connector_type == 'rest':
89
                self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
90 91 92 93 94 95 96
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
97

98
        # self._dbInUse = False  # if "use db" was executed already
99

100
    def logDebug(self, msg):
101
        Logging.debug("    TRD[{}] {}".format(self._tid, msg))
102 103

    def logInfo(self, msg):
104
        Logging.info("    TRD[{}] {}".format(self._tid, msg))
105

106 107
    # def dbInUse(self):
    #     return self._dbInUse
108

109 110 111 112
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
113

114
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
115
        return self._tc.getTaskExecutor()
116

S
Steven Li 已提交
117
    def start(self):
118
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
119

S
Shuduo Sang 已提交
120
    def run(self):
S
Steven Li 已提交
121
        # initialization after thread starts, in the thread context
122
        # self.isSleeping = False
123
        Logging.info("Starting to run thread: {}".format(self._tid))
124

S
Shuduo Sang 已提交
125
        if (gConfig.per_thread_db_connection):  # type: ignore
126
            Logging.debug("Worker thread openning database connection")
127
            self._dbConn.open()
S
Steven Li 已提交
128

S
Shuduo Sang 已提交
129 130
        self._doTaskLoop()

131
        # clean up
S
Shuduo Sang 已提交
132
        if (gConfig.per_thread_db_connection):  # type: ignore
133 134 135
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
136
                Logging.warning("Cleaning up worker thread, dbConn already closed")
137

S
Shuduo Sang 已提交
138
    def _doTaskLoop(self):
139 140
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
141 142
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
143 144 145
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
146
                print("_bto", end="")
147
                Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
148 149
                break

150
            Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
151
            self.crossStepGate()   # then per-thread gate, after being tapped
152
            Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
153
            if not self._tc.isRunning():
154
                print("_wts", end="")
155
                Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
156 157
                break

158
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
159
            try:
160 161 162
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
163
                # self.useDb() # might encounter exceptions. TODO: catch
164 165
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
166
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
167 168 169 170 171 172
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

173
            # Fetch a task from the Thread Coordinator
174
            Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
175
            task = tc.fetchTask()
176 177

            # Execute such a task
178
            Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
179
                    self._tid, task.__class__.__name__))
180
            task.execute(self)
181
            tc.saveExecutedTask(task)
182
            Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
183

184
            # self._dbInUse = False  # there may be changes between steps
185
        # print("_wtd", end=None) # worker thread died
186

S
Shuduo Sang 已提交
187 188
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
189 190
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
191 192
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
193 194 195
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
196
        if (not self._thread.is_alive()):
S
Steven Li 已提交
197 198
            raise RuntimeError("Unexpected dead thread")

199
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
200 201
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
202 203
        self.verifyThreadSelf()  # only allowed by ourselves

204
        # Wait again at the "gate", waiting to be "tapped"
205
        Logging.debug(
S
Shuduo Sang 已提交
206 207 208
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
209
        self._stepGate.clear()
S
Shuduo Sang 已提交
210

211
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
212

S
Shuduo Sang 已提交
213
    def tapStepGate(self):  # give it a tap, release the thread waiting there
214
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
215 216
        self.verifyThreadMain()  # only allowed for main thread

217
        if self._thread.is_alive():
218
            Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
219 220 221 222
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
223

S
Shuduo Sang 已提交
224
    def execSql(self, sql):  # TODO: expose DbConn directly
225
        return self.getDbConn().execute(sql)
226

S
Shuduo Sang 已提交
227
    def querySql(self, sql):  # TODO: expose DbConn directly
228
        return self.getDbConn().query(sql)
229 230

    def getQueryResult(self):
231
        return self.getDbConn().getQueryResult()
232

233
    def getDbConn(self) -> DbConn :
S
Shuduo Sang 已提交
234 235
        if (gConfig.per_thread_db_connection):
            return self._dbConn
236
        else:
237
            return self._tc.getDbManager().getDbConn()
238

239 240
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
241
    #         return self._dbConn.query(sql)
242 243
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
244

245
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
246 247


248
class ThreadCoordinator:
249
    WORKER_THREAD_TIMEOUT = 120  # Normal: 120
250

251
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
252
        self._curStep = -1  # first step is 0
253
        self._pool = pool
254
        # self._wd = wd
S
Shuduo Sang 已提交
255
        self._te = None  # prepare for every new step
256
        self._dbManager = dbManager
S
Shuduo Sang 已提交
257 258
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
259

S
Shuduo Sang 已提交
260 261
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
262
        self._execStats = ExecutionStats()
263
        self._runStatus = Status.STATUS_RUNNING
264
        self._initDbs()
265
        self._stepStartTime = None  # Track how long it takes to execute each step
S
Steven Li 已提交
266

267 268 269
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
270
    def getDbManager(self) -> DbManager:
271
        return self._dbManager
272

273 274
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
275

276
    def requestToStop(self):
277
        self._runStatus = Status.STATUS_STOPPING
278 279
        self._execStats.registerFailure("User Interruption")

280
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
281 282 283
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
284
        if self._runStatus != Status.STATUS_RUNNING:
285 286 287 288 289
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
290 291
        if workerTimeout:
            return True
292 293 294 295 296 297 298 299 300 301 302 303 304
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
305
        Logging.debug(
306 307 308 309 310 311 312
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

313
        Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
314 315
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
316
        self.tapAllThreads() # release all worker thread from their "gates"
317 318 319 320 321

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
322
        Logging.debug("[TRD] Main thread about to cross the barrier")
323
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
324
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
325
        Logging.debug("[TRD] Main thread finished crossing the barrier")
326 327 328 329

    def _doTransition(self):
        transitionFailed = False
        try:
330 331 332
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
333
                Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
334 335 336
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
337
                Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
338 339

            # Due to limitation (or maybe not) of the TD Python library,
340
            # we cannot share connections across threads
341 342 343 344
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
345
            #         Logging.debug("[DB] use db for all worker threads")
346
            #         t.useDb()
347 348
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
349

350 351
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
352
                Logging.info("DB connection broken, execution failed")
353 354 355 356 357 358
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
359 360
            if isinstance(err, CrashGenError): # our own transition failure
                Logging.info("State transition error")
361
                # TODO: saw an error here once, let's print out stack info for err?
362 363 364
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
365
                self._execStats.registerFailure("State transition error: {}".format(err))
366 367
            else:
                raise
S
Steven Li 已提交
368
        # return transitionFailed # Why did we have this??!!
369 370 371

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
372
        Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
373 374
        return transitionFailed

S
Shuduo Sang 已提交
375
    def run(self):
376
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
377 378

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
379
        self._curStep = -1  # not started yet
380
        
S
Shuduo Sang 已提交
381
        self._execStats.startExec()  # start the stop watch
382 383
        transitionFailed = False
        hasAbortedTask = False
384 385
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
386 387 388
            if not gConfig.debug: # print this only if we are not in debug mode    
                Progress.emit(Progress.STEP_BOUNDARY)            
                # print(".", end="", flush=True)
389 390 391 392 393 394 395 396
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
397
                        
398 399
            try:
                self._syncAtBarrier() # For now just cross the barrier
400
                Progress.emit(Progress.END_THREAD_STEP)
401 402 403 404
                if self._stepStartTime :
                    stepExecTime = time.time() - self._stepStartTime
                    Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
                    DbConnNative.resetTotalRequests() # reset to zero
405 406
            except threading.BrokenBarrierError as err:
                self._execStats.registerFailure("Aborted due to worker thread timeout")
407 408 409 410
                Logging.error("\n")
                Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
                    ThreadCoordinator.WORKER_THREAD_TIMEOUT))
                Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
411 412 413
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
414 415 416 417 418 419

                # Enable below for deadlock debugging, using gdb to attach to process
                # while True:
                #     Logging.error("Deadlock detected")
                #     time.sleep(60.0)

420
                break
421 422

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
423 424
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
425 426
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
427
                Logging.info("Aborted task encountered, exiting test program")
428
                self._execStats.registerFailure("Aborted Task Encountered")
429
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
430

431
            # Ending previous step
432 433 434 435
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
436
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
437
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
438
                Logging.info(errMsg)
439
                traceback.print_exc()
S
Steven Li 已提交
440
                self._execStats.registerFailure(errMsg)
441

442
            # Then we move on to the next step
443
            Progress.emit(Progress.BEGIN_THREAD_STEP)
444
            self._stepStartTime = time.time()
445
            self._releaseAllWorkerThreads(transitionFailed)                    
446

447
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
448
            Logging.debug("Abnormal ending of main thraed")
449
        elif workerTimeout:
450
            Logging.debug("Abnormal ending of main thread, due to worker timeout")
451
        else: # regular ending, workers waiting at "barrier"
452
            Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
453
            self._syncAtBarrier()
454

455
        self._te = None  # No more executor, time to end
456
        Logging.debug("Main thread tapping all threads one last time...")
457
        self.tapAllThreads()  # Let the threads run one last time
458

459 460
        Logging.debug("\r\n\n--> Main thread ready to finish up...")
        Logging.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
461
        self._pool.joinAll()  # Get all threads to finish
S
Steven Li 已提交
462
        Logging.info(". . . All worker threads finished") # No CR/LF before
463 464
        self._execStats.endExec()

465 466 467 468 469 470 471 472 473 474 475 476 477
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
        self._executedTasks = None
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


478 479
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
480

S
Steven Li 已提交
481 482 483 484 485 486
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
487
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
488
        wakeSeq = []
S
Shuduo Sang 已提交
489 490
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
491 492 493
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
494
        Logging.debug(
S
Shuduo Sang 已提交
495 496
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
497
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
498
        for i in wakeSeq:
S
Shuduo Sang 已提交
499 500 501
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
502

503
    def isRunning(self):
S
Shuduo Sang 已提交
504
        return self._te is not None
505

506 507 508 509 510 511
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
        if gConfig.max_dbs == 0:
            self._dbs.append(Database(0, dbc))
S
Steven Li 已提交
512 513
        else:            
            baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
514
                )*333) % 888 if gConfig.dynamic_db_table_names else 0
515
            for i in range(gConfig.max_dbs):
516
                self._dbs.append(Database(baseDbNumber + i, dbc))
517 518 519 520 521 522 523 524

    def pickDatabase(self):
        idxDb = 0
        if gConfig.max_dbs != 0 :
            idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
525
    def fetchTask(self) -> Task:
526 527 528
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
529
        if (not self.isRunning()):  # no task
530
            raise RuntimeError("Cannot fetch task when not running")
531

S
Shuduo Sang 已提交
532
        # pick a task type for current state
533
        db = self.pickDatabase()
534
        taskType = db.getStateMachine().pickTaskType() # dynamic name of class
535
        return taskType(self._execStats, db)  # create a task from it
536 537

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
538
        self._executedTasks = []  # should be under single thread
539 540 541 542

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
543

544
class ThreadPool:
545
    def __init__(self, numThreads, maxSteps):
546 547 548 549
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
550 551
        self.threadList = []  # type: List[WorkerThread]

552
    # starting to run all the threads, in locking steps
553
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
554 555
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
556
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
557
            workerThread.start()  # start, but should block immediately before step 0
558 559 560

    def joinAll(self):
        for workerThread in self.threadList:
561
            Logging.debug("Joining thread...")
562 563
            workerThread._thread.join()

564 565 566
    def cleanup(self):
        self.threadList = None # maybe clean up each?

567 568
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
569 570


S
Steven Li 已提交
571 572
class LinearQueue():
    def __init__(self):
573
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
574
        self.lastIndex = 0
S
Shuduo Sang 已提交
575 576
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
577

578
    def toText(self):
S
Shuduo Sang 已提交
579 580
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
581 582

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
583
    def push(self):
584
        with self._lock:
S
Shuduo Sang 已提交
585 586
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
587
            #     return self.firstIndex
588 589
            # Otherwise we have something
            self.lastIndex += 1
590 591
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
592
            return self.lastIndex
S
Steven Li 已提交
593 594

    def pop(self):
595
        with self._lock:
S
Shuduo Sang 已提交
596 597 598 599
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

600
            index = self.firstIndex
S
Shuduo Sang 已提交
601
            if (index in self.inUse):
602 603
                return False

604 605 606 607 608 609 610
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
611
        with self._lock:
612 613 614 615
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
616
    def allocate(self, i):
617
        with self._lock:
618
            # Logging.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
619 620 621
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
622 623
            self.inUse.add(i)

S
Steven Li 已提交
624
    def release(self, i):
625
        with self._lock:
626
            # Logging.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
627
            self.inUse.remove(i)  # KeyError possible, TODO: why?
628 629 630 631

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
632
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
633
        if (self.isEmpty()):
634 635
            return None
        with self._lock:
S
Shuduo Sang 已提交
636
            cnt = 0  # counting the interations
637 638
            while True:
                cnt += 1
S
Shuduo Sang 已提交
639
                if (cnt > self.size() * 10):  # 10x iteration already
640 641
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
642 643
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
644 645 646
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
647

648
class AnyState:
S
Shuduo Sang 已提交
649 650 651
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
652
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
653
    STATE_HAS_DATA = 3  # we have some data in the table
654 655 656 657
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
658 659 660
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
661 662
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
663 664 665 666 667 668 669
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
670 671
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
672

673 674
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
675 676 677
    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
678 679 680 681 682 683
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
684 685 686
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
687

688 689 690
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
691 692 693
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

694 695
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
696

697 698
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
699

700
    def canDropDb(self):
701 702
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
703
        if gConfig.max_dbs > 0 or gConfig.use_shadow_db : 
704
            return False
705
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
706

707 708
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
709

710
    def canDropFixedSuperTable(self):
711 712
        if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
            return False
713
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
714

715 716
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
717

718 719 720 721 722
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
723
        for task in tasks:
724 725 726
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
727
                # task.logDebug("Task success found")
728
                sCnt += 1
S
Shuduo Sang 已提交
729
                if (sCnt >= 2):
730
                    raise CrashGenError(
S
Shuduo Sang 已提交
731
                        "Unexpected more than 1 success with task: {}".format(cls))
732 733 734 735

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
736
        for task in tasks:
737 738
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
739
            exists = True  # we have a valid instance
740 741
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
742
        if (exists and sCnt <= 0):
743
            raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
S
Steven Li 已提交
744
                .format(cls, tasks))
745 746

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
747
        for task in tasks:
748
            if isinstance(task, cls):
S
Shuduo Sang 已提交
749 750
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
751 752

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
753
        for task in tasks:
754 755
            if isinstance(task, cls):
                if task.isSuccess():
756
                    raise CrashGenError(
S
Shuduo Sang 已提交
757
                        "Unexpected successful task: {}".format(cls))
758 759

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
760
        for task in tasks:
761 762 763 764 765 766
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
767
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
768
        for task in tasks:
S
Steven Li 已提交
769 770 771 772
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
773

774 775 776 777
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
778 779 780
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
781 782 783 784
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
785

786 787 788 789
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
790 791 792
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
793 794
        ]

S
Shuduo Sang 已提交
795 796
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
797
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
798 799 800 801
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

802 803 804 805 806 807 808 809 810 811 812

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
813 814 815
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
816 817 818 819 820

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
821

S
Shuduo Sang 已提交
822

823
class StateSuperTableOnly(AnyState):
824 825 826 827 828 829 830 831 832
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
833
        if (self.hasSuccess(tasks, TaskDropSuperTable)
834
                ):  # we are able to drop the table
835
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
836 837
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
838

839
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
840 841
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
842
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
843 844 845
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
846
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
847 848 849
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
850

S
Shuduo Sang 已提交
851

852 853 854 855 856 857 858 859 860 861
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
862
        if (newState.equals(AnyState.STATE_EMPTY)):
863
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
864 865 866 867
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
868
                ):  # without a create_db task
S
Shuduo Sang 已提交
869 870
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
871
            self.hasSuccess(tasks, TaskDropSuperTable)
872
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
873 874 875 876
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
877
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
878 879
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
880
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
881 882 883
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
884
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
885 886
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
887
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
888

S
Shuduo Sang 已提交
889

890
class StateMechine:
891 892 893
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
894
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
895

896
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
897 898 899 900 901 902
        try:
            self._curState = self._findCurrentState(dbc)  # starting state
        except taos.error.ProgrammingError as err:            
            Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
            traceback.print_stack()
            raise # re-throw
903 904

    # TODO: seems no lnoger used, remove?
905 906 907
    def getCurrentState(self):
        return self._curState

908 909 910
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

911
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
912
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
913 914 915 916 917 918
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
919
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
920 921
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
922
            # t = tc(self) # create task object
923 924
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
925 926 927 928 929 930 931 932
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
933
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
934
                    taskTypes.append(tc)  # gather it
935 936

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
937 938 939
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
940
        Logging.debug(
S
Shuduo Sang 已提交
941 942 943
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
944 945
        return taskTypes

946
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
947
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
948 949
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
950
            Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
951
            return StateEmpty()
S
Shuduo Sang 已提交
952 953
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
954
        dbc.use(dbName)
955
        if not dbc.hasTables():  # no tables
956
            Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
957
            return StateDbOnly()
958

959
        # For sure we have tables, which means we must have the super table. # TODO: are we sure?
960
        sTable = self._db.getFixedSuperTable()
961
        if sTable.hasRegTables(dbc):  # no regular tables
962
            Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
963
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
964
        else:  # has actual tables
965
            Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
966 967
            return StateHasData()

968 969
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
970 971
        global gSvcMgr
        
S
Shuduo Sang 已提交
972
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
973
            Logging.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
974
            return  # do nothing
975

S
Shuduo Sang 已提交
976
        # this should show up in the server log, separating steps
977
        dbc.execute("show dnodes")
978 979 980 981

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
982 983
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
984 985

        if self._curState.canDropDb():
986
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
987
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
988 989
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
990 991 992

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
993 994
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
995 996 997

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
998 999
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1000 1001

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1002 1003
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1004 1005 1006 1007

        # if self._state.canReadData():
            # Nothing for sure

1008
        newState = self._findCurrentState(dbc)
1009
        Logging.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1010 1011
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1012 1013 1014
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1015 1016
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1017 1018 1019
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1020 1021 1022
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1023
            else:
S
Shuduo Sang 已提交
1024 1025
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1026
        i = self._weighted_choice_sub(weights)
1027
        # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1028 1029
        return taskTypes[i]

S
Shuduo Sang 已提交
1030 1031 1032 1033 1034
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1035 1036 1037 1038
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1039

1040 1041 1042 1043 1044
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
1045 1046

        TODO: consider moving, but keep in mind it contains "StateMachine"
1047
    '''
1048 1049 1050
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
    _lastTick = 0
1051
    _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
1052

1053 1054 1055 1056
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1057
          
1058
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1059

1060 1061
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1062

1063 1064
    def getDbNum(self):
        return self._dbNum
1065

1066 1067
    def getName(self):
        return "db_{}".format(self._dbNum)
1068

1069 1070 1071 1072 1073 1074
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1075

1076 1077 1078 1079 1080
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1081

1082 1083 1084 1085
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

1086 1087
    def getFixedSuperTable(self) -> TdSuperTable:
        return TdSuperTable(self.getFixedSuperTableName(), self.getName())
1088 1089 1090 1091 1092 1093

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1094
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1095 1096
    @classmethod
    def setupLastTick(cls):
1097
        t1 = datetime.datetime(2020, 6, 1)
1098
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1099 1100 1101 1102
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1103
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1104 1105 1106
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1107
        Logging.info("Setting up TICKS to start from: {}".format(t4))
1108 1109
        return t4

1110
    @classmethod
1111 1112 1113 1114
    def getNextTick(cls):       
        '''
            Fetch a timestamp tick, with some random factor, may not be unique.
        ''' 
1115
        with cls._clsLock:  # prevent duplicate tick
S
Steven Li 已提交
1116
            if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
1117
                # 10k at 1/20 chance, should be enough to avoid overlaps
S
Steven Li 已提交
1118 1119
                tick = cls.setupLastTick()
                cls._lastTick = tick
1120
                cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2)  # lagging behind 2 minutes, should catch up fast
S
Steven Li 已提交
1121
                # if : # should be quite a bit into the future
1122

1123 1124
            if gConfig.mix_oos_data and Dice.throw(20) == 0:  # if asked to do so, and 1 in 20 chance, return lagging tick
                cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
1125
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1126 1127
            else:  # regular
                # add one second to it
1128 1129
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1130 1131

    def getNextInt(self):
1132 1133 1134
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1135 1136

    def getNextBinary(self):
S
Shuduo Sang 已提交
1137 1138
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1139 1140

    def getNextFloat(self):
1141 1142 1143
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1144

1145 1146 1147 1148 1149
    ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']

    def getNextColor(self):
        return random.choice(self.ALL_COLORS)

1150

1151
class TaskExecutor():
1152
    class BoundedList:
S
Shuduo Sang 已提交
1153
        def __init__(self, size=10):
1154 1155
            self._size = size
            self._list = []
S
Steven Li 已提交
1156
            self._lock = threading.Lock()
1157

S
Shuduo Sang 已提交
1158
        def add(self, n: int):
S
Steven Li 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1185 1186 1187 1188 1189 1190

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1191 1192 1193
    def __init__(self, curStep):
        self._curStep = curStep

1194 1195 1196 1197
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1198 1199 1200
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1201
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1202
        task.execute(wt)
1203

1204 1205 1206 1207
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1208
    # def logInfo(self, msg):
1209
    #     Logging.info("    T[{}.x]: ".format(self._curStep) + msg)
1210

1211
    # def logDebug(self, msg):
1212
    #     Logging.debug("    T[{}.x]: ".format(self._curStep) + msg)
1213

S
Shuduo Sang 已提交
1214

S
Steven Li 已提交
1215
class Task():
1216 1217 1218 1219
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1220
    taskSn = 100
1221 1222
    _lock = threading.Lock()
    _tableLocks: Dict[str, threading.Lock] = {}
1223 1224 1225

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1226
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
1227
        # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
S
Steven Li 已提交
1228
        return Task.taskSn
1229

1230
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1231
        self._workerThread = None
1232
        self._err: Optional[Exception] = None
1233
        self._aborted = False
1234
        self._curStep = None
S
Shuduo Sang 已提交
1235
        self._numRows = None  # Number of rows affected
1236

S
Shuduo Sang 已提交
1237
        # Assign an incremental task serial number
1238
        self._taskNum = self.allocTaskNum()
1239
        # Logging.debug("Creating new task {}...".format(self._taskNum))
1240

1241
        self._execStats = execStats
1242
        self._db = db # A task is always associated/for a specific DB
1243

1244 1245
        

1246
    def isSuccess(self):
S
Shuduo Sang 已提交
1247
        return self._err is None
1248

1249 1250 1251
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1252
    def clone(self):  # TODO: why do we need this again?
1253
        newTask = self.__class__(self._execStats, self._db)
1254 1255
        return newTask

1256 1257 1258
    def getDb(self):
        return self._db

1259
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1260 1261 1262
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1263 1264

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1265 1266 1267
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1268

1269
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1270 1271 1272
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1273

1274 1275 1276 1277 1278
    def _isServiceStable(self):
        if not gSvcMgr:
            return True  # we don't run service, so let's assume it's stable
        return gSvcMgr.isStable() # otherwise let's examine the service

1279 1280 1281
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1282
                0x0B,  # Unable to establish connection, more details in TD-1648
1283
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1284
                0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
1285
                0x213, # "Disconnected from service", result of "kill connection ???"
1286
                0x217, # "db not selected", client side defined error code
1287 1288 1289 1290
                # 0x218, # "Table does not exist" client side defined error code
                0x360, # Table already exists
                0x362, 
                # 0x369, # tag already exists
1291 1292 1293 1294 1295 1296 1297
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1298
                0x14,   # db not ready, errno changed
1299
                0x600,  # Invalid table ID, why?
1300
                0x218,  # Table does not exist
1301 1302 1303
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1304 1305 1306 1307 1308 1309 1310
        # This case handled below already.
        # elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
        #     return True # We may get "network unavilable" when restarting service
        elif gConfig.ignore_errors: # something is specified on command line
            moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
            if errno in moreErrnos:
                return True
1311 1312 1313
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1314 1315 1316 1317
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1318
        elif not self._isServiceStable(): # We are managing service, and ...
1319
            Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1320
            return True
1321 1322 1323 1324
        
        return False # Not an acceptable error


1325 1326
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1327
        self._workerThread = wt  # type: ignore
1328 1329

        te = wt.getTaskExecutor()
1330
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1331 1332
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1333

1334
        self._err = None # TODO: type hint mess up?
1335 1336
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1337 1338 1339

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1340
        try:
S
Shuduo Sang 已提交
1341
            self._executeInternal(te, wt)  # TODO: no return value?
1342
        except taos.error.ProgrammingError as err:
1343
            errno2 = Helper.convertErrno(err.errno)
1344
            if (gConfig.continue_on_exception):  # user choose to continue
1345
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1346
                        errno2, err, wt.getDbConn().getLastSql()))
1347
                self._err = err
1348 1349
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1350
                        errno2, err, wt.getDbConn().getLastSql()))
1351 1352
                # print("_", end="", flush=True)
                Progress.emit(Progress.ACCEPTABLE_ERROR)
S
Shuduo Sang 已提交
1353
                self._err = err
1354
            else: # not an acceptable error
1355 1356 1357
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1358
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1359
                if gConfig.debug:
1360 1361
                    # raise # so that we see full stack
                    traceback.print_exc()
1362 1363
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1364 1365 1366 1367
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1368
        except Exception as e:
S
Steven Li 已提交
1369
            Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
S
Shuduo Sang 已提交
1370
            self._err = e
S
Steven Li 已提交
1371
            self._aborted = True
1372
            traceback.print_exc()
1373
        except BaseException as e:
1374
            self.logInfo("Python base exception encountered")
1375
            self._err = e
1376
            self._aborted = True
S
Steven Li 已提交
1377
            traceback.print_exc()
1378
        except BaseException: # TODO: what is this again??!!
1379 1380 1381 1382 1383
            raise RuntimeError("Punt")
            # self.logDebug(
            #     "[=] Unexpected exception, SQL: {}".format(
            #         wt.getDbConn().getLastSql()))
            # raise
1384
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1385 1386 1387 1388

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1389
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1390

1391
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1392
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1393
        """ Haha """
1394 1395
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1396
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1397 1398
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1399
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1400 1401
        return wt.getQueryResult()

1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419
    def lockTable(self, ftName): # full table name
        # print(" <<" + ftName + '_', end="", flush=True)
        with Task._lock:
            if not ftName in Task._tableLocks:
                Task._tableLocks[ftName] = threading.Lock()
        
        Task._tableLocks[ftName].acquire()

    def unlockTable(self, ftName):
        # print('_' + ftName + ">> ", end="", flush=True)
        with Task._lock:
            if not ftName in self._tableLocks:
                raise RuntimeError("Corrupt state, no such lock")
            lock = Task._tableLocks[ftName]
            if not lock.locked():
                raise RuntimeError("Corrupte state, already unlocked")
        lock.release()

1420

1421
class ExecutionStats:
1422
    def __init__(self):
S
Shuduo Sang 已提交
1423 1424
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1425 1426 1427
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1428
        self._execStartTime = None
1429
        self._errors = {}
S
Shuduo Sang 已提交
1430 1431
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1432

1433 1434 1435
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1436
    def __str__(self):
S
Shuduo Sang 已提交
1437 1438
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1439 1440

    def isFailed(self):
S
Shuduo Sang 已提交
1441
        return self._failed
S
Steven Li 已提交
1442

1443 1444 1445 1446 1447 1448
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1449
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1450 1451
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1452 1453
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1454
        if isSuccess:
S
Shuduo Sang 已提交
1455
            t[1] += 1  # index 1 has the "success" execution times
1456 1457 1458 1459 1460
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1461 1462 1463

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1464 1465
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1466 1467 1468 1469 1470
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1471
            if self._tasksInProgress == 0:  # all tasks have stopped
1472 1473 1474
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1475 1476 1477 1478
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1479
    def printStats(self):
1480
        Logging.info(
S
Shuduo Sang 已提交
1481
            "----------------------------------------------------------------------")
1482
        Logging.info(
S
Shuduo Sang 已提交
1483 1484 1485
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1486
        Logging.info("| Task Execution Times (success/total):")
1487
        execTimesAny = 0.0
S
Shuduo Sang 已提交
1488
        for k, n in self._execTimes.items():
1489
            execTimesAny += n[0]
1490 1491 1492 1493 1494 1495 1496
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
1497
            Logging.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1498

1499
        Logging.info(
S
Shuduo Sang 已提交
1500
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
1501
        Logging.info(
S
Shuduo Sang 已提交
1502 1503
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
1504
        Logging.info(
S
Shuduo Sang 已提交
1505 1506
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
1507
        Logging.info(
S
Shuduo Sang 已提交
1508
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
1509
        Logging.info(
S
Shuduo Sang 已提交
1510 1511
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1512 1513 1514
        Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
        Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
1515 1516
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
1517 1518
        Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
        Logging.info(
S
Shuduo Sang 已提交
1519
            "----------------------------------------------------------------------")
1520 1521 1522


class StateTransitionTask(Task):
1523 1524 1525 1526 1527
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1528 1529
    _baseTableNumber = None

1530
    _endState = None # TODO: no longter used?
1531

1532
    @classmethod
S
Shuduo Sang 已提交
1533
    def getInfo(cls):  # each sub class should supply their own information
1534
        raise RuntimeError("Overriding method expected")
1535
    
1536
    @classmethod
S
Shuduo Sang 已提交
1537
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1538 1539
        raise RuntimeError("Overriding method expected")

1540 1541 1542
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1543

1544 1545 1546
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1547 1548

    @classmethod
1549 1550 1551
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1552

1553 1554
    @classmethod
    def getRegTableName(cls, i):
1555
        if ( StateTransitionTask._baseTableNumber is None): # Set it one time
S
Steven Li 已提交
1556 1557
            StateTransitionTask._baseTableNumber = Dice.throw(
                999) if gConfig.dynamic_db_table_names else 0
1558
        return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
1559

1560 1561
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1562 1563


1564
class TaskCreateDb(StateTransitionTask):
1565
    @classmethod
1566
    def getEndState(cls):
S
Shuduo Sang 已提交
1567
        return StateDbOnly()
1568

1569 1570 1571 1572
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1573
    # Actually creating the database(es)
1574
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1575
        # was: self.execWtSql(wt, "create database db")
1576 1577
        repStr = ""
        if gConfig.max_replicas != 1:
1578 1579
            # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
            numReplica = gConfig.max_replicas # fixed, always
1580
            repStr = "replica {}".format(numReplica)
1581 1582 1583 1584 1585
        updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
        dbName = self._db.getName()
        self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
        if dbName == "db_0" and gConfig.use_shadow_db:
            self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
1586

1587
class TaskDropDb(StateTransitionTask):
1588
    @classmethod
1589 1590
    def getEndState(cls):
        return StateEmpty()
1591

1592 1593 1594 1595
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1596
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1597
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
1598
        Logging.debug("[OPS] database dropped at {}".format(time.time()))
1599

1600
class TaskCreateSuperTable(StateTransitionTask):
1601
    @classmethod
1602 1603
    def getEndState(cls):
        return StateSuperTableOnly()
1604

1605 1606
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1607
        return state.canCreateFixedSuperTable()
1608

1609
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1610
        if not self._db.exists(wt.getDbConn()):
1611
            Logging.debug("Skipping task, no DB yet")
1612 1613
            return

1614
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1615
        # wt.execSql("use db")    # should always be in place
S
Steven Li 已提交
1616

1617 1618
        sTable.create(wt.getDbConn(), 
            {'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
S
Steven Li 已提交
1619 1620
            dropIfExists = True
            )
1621
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1622 1623
        # No need to create the regular tables, INSERT will do that
        # automatically
1624

S
Steven Li 已提交
1625

1626
class TdSuperTable:
1627
    def __init__(self, stName, dbName):
1628
        self._stName = stName
1629
        self._dbName = dbName
1630

1631 1632 1633
    def getName(self):
        return self._stName

1634 1635 1636
    def drop(self, dbc, skipCheck = False):
        dbName = self._dbName
        if self.exists(dbc) : # if myself exists
S
Steven Li 已提交
1637 1638 1639 1640 1641 1642
            fullTableName = dbName + '.' + self._stName                
            dbc.execute("DROP TABLE {}".format(fullTableName))
        else:
            if not skipCheck:
                raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))

1643 1644
    def exists(self, dbc):
        dbc.execute("USE " + self._dbName)
S
Steven Li 已提交
1645 1646
        return dbc.existsSuperTable(self._stName)

1647
    # TODO: odd semantic, create() method is usually static?
1648
    def create(self, dbc, cols: dict, tags: dict,
S
Steven Li 已提交
1649 1650
        dropIfExists = False
        ):
1651
        '''Creating a super table'''
1652 1653

        dbName = self._dbName
S
Steven Li 已提交
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
        dbc.execute("USE " + dbName)
        fullTableName = dbName + '.' + self._stName       
        if dbc.existsSuperTable(self._stName):
            if dropIfExists: 
                dbc.execute("DROP TABLE {}".format(fullTableName))
            else: # error
                raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
                 
        # Now let's create
        sql = "CREATE TABLE {} ({})".format(
            fullTableName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
        if tags is None :
            sql += " TAGS (dummy int) "
        else:
            sql += " TAGS ({})".format(
                ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
1671 1672 1673
            )
        dbc.execute(sql)        

1674 1675
    def getRegTables(self, dbc: DbConn):
        dbName = self._dbName
1676
        try:
1677
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1678
        except taos.error.ProgrammingError as err:                    
1679
            errno2 = Helper.convertErrno(err.errno) 
1680
            Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
1681 1682 1683 1684 1685
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1686 1687
    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
1688

1689 1690
    def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
        dbName = self._dbName
1691
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1692 1693
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1694 1695

        # acquire a lock first, so as to be able to *verify*. More details in TD-1471
S
Steven Li 已提交
1696 1697 1698
        fullTableName = dbName + '.' + regTableName      
        if task is not None:  # optional lock
            task.lockTable(fullTableName)
1699
        Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
S
Steven Li 已提交
1700
        # print("(" + fullTableName[-3:] + ")", end="", flush=True)  
1701 1702
        try:
            sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
1703
                fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
1704 1705 1706
            )
            dbc.execute(sql)
        finally:
S
Steven Li 已提交
1707 1708
            if task is not None:
                task.unlockTable(fullTableName) # no matter what
1709

1710 1711
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1725 1726
    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
1727 1728 1729 1730 1731 1732
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1733 1734
    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
1735 1736
            return
        # sTable.addTag("extraTag", "int")
1737 1738
        sql = "alter table {}.{} add tag {} {}".format(
            self._dbName, self._stName, tagName, tagType)
1739 1740
        dbc.execute(sql)

1741 1742
    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
1743
            return
1744
        sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
1745 1746
        dbc.execute(sql)

1747 1748
    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
1749 1750 1751 1752
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1753
        sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
1754 1755
        dbc.execute(sql)

1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
    def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
        ''' Generate queries to test/exercise this super table '''
        ret = [] # type: List[SqlQuery]

        for rTbName in self.getRegTables(dbc):  # regular tables
            
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])

            # Run the query against the regular table first
            doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
            if not doAggr: # don't do aggregate query, just simple one
                ret.append(SqlQuery( # reg table
                    "select {} from {}.{}".format('*', self._dbName, rTbName)))
                ret.append(SqlQuery( # super table
                    "select {} from {}.{}".format('*', self._dbName, self.getName())))
            else: # Aggregate query
                aggExpr = Dice.choice([                
                    'count(*)',
                    'avg(speed)',
                    # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                    'sum(speed)', 
                    'stddev(speed)', 
                    # SELECTOR functions
                    'min(speed)', 
                    'max(speed)', 
                    'first(speed)', 
                    'last(speed)',
                    'top(speed, 50)', # TODO: not supported?
                    'bottom(speed, 50)', # TODO: not supported?
                    'apercentile(speed, 10)', # TODO: TD-1316
1788
                    # 'last_row(speed)', # TODO: commented out per TD-3231, we should re-create
1789 1790 1791 1792 1793 1794
                    # Transformation Functions
                    # 'diff(speed)', # TODO: no supported?!
                    'spread(speed)'
                    ]) # TODO: add more from 'top'

            
1795 1796 1797 1798 1799 1800 1801
                # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
                sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
                if Dice.throw(3) == 0: # 1 in X chance
                    sql = sql + ' GROUP BY color'
                    Progress.emit(Progress.QUERY_GROUP_BY)
                    # Logging.info("Executing GROUP-BY query: " + sql)
                ret.append(SqlQuery(sql))
1802 1803 1804

        return ret        

1805
class TaskReadData(StateTransitionTask):
1806
    @classmethod
1807
    def getEndState(cls):
S
Shuduo Sang 已提交
1808
        return None  # meaning doesn't affect state
1809

1810 1811 1812 1813
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1814 1815 1816 1817 1818
    # def _canRestartService(self):
    #     if not gSvcMgr:
    #         return True # always
    #     return gSvcMgr.isActive() # only if it's running TODO: race condition here

1819 1820
    def _reconnectIfNeeded(self, wt):
        # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
1821
        if random.randrange(20)==0: # and self._canRestartService():  # TODO: break connection in all situations
1822 1823
            # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
            Progress.emit(Progress.SERVICE_RECONNECT_START) 
1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
            try:
                wt.getDbConn().close()
                wt.getDbConn().open()
            except ConnectionError as err: # may fail
                if not gSvcMgr:
                    Logging.error("Failed to reconnect in client-only mode")
                    raise # Not OK if we are running in client-only mode
                if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to 
                    Logging.error("Failed to reconnect when managed server is running")
                    raise # Not OK if we are running normally

1835 1836 1837 1838 1839
                Progress.emit(Progress.SERVICE_RECONNECT_FAILURE) 
                # Logging.info("Ignoring DB reconnect error")

            # print("_r", end="", flush=True)
            Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS) 
1840 1841 1842 1843 1844
            # The above might have taken a lot of time, service might be running
            # by now, causing error below to be incorrectly handled due to timing issue
            return # TODO: fix server restart status race condtion


1845 1846 1847
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        self._reconnectIfNeeded(wt)

1848
        dbc = wt.getDbConn()
1849 1850 1851
        sTable = self._db.getFixedSuperTable()
        
        for q in sTable.generateQueries(dbc):  # regular tables            
1852
            try:
1853 1854 1855 1856
                sql = q.getSql()
                # if 'GROUP BY' in sql:
                #     Logging.info("Executing GROUP-BY query: " + sql)
                dbc.execute(sql)
1857
            except taos.error.ProgrammingError as err:                    
1858
                errno2 = Helper.convertErrno(err.errno)
1859
                Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1860
                raise
S
Shuduo Sang 已提交
1861

1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874
class SqlQuery:
    @classmethod
    def buildRandom(cls, db: Database):
        '''Build a random query against a certain database'''
        
        dbName = db.getName()

    def __init__(self, sql:str = None):
        self._sql = sql

    def getSql(self):
        return self._sql
    
1875
class TaskDropSuperTable(StateTransitionTask):
1876
    @classmethod
1877
    def getEndState(cls):
S
Shuduo Sang 已提交
1878
        return StateDbOnly()
1879

1880 1881
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1882
        return state.canDropFixedSuperTable()
1883

1884
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1885
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
1886
        if Dice.throw(2) == 0:
1887
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
1888 1889 1890 1891
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1892
            isSuccess = True
S
Shuduo Sang 已提交
1893
            for i in tblSeq:
1894
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
1895
                try:
1896 1897
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
1898
                except taos.error.ProgrammingError as err:
1899 1900
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1901
                    if (errno2 in [0x362]):  # mnode invalid table name
1902
                        isSuccess = False
1903
                        Logging.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
1904
                    continue  # try to delete next regular table
1905 1906

                if (not tickOutput):
S
Shuduo Sang 已提交
1907 1908
                    tickOutput = True  # Print only one time
                    if isSuccess:
1909 1910
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1911
                        print("f", end="", flush=True)
1912 1913

        # Drop the super table itself
1914 1915
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
1916

S
Shuduo Sang 已提交
1917

1918 1919 1920
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1921
        return None  # meaning doesn't affect state
1922 1923 1924

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1925
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1926 1927

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1928 1929
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
1930
        sTable = self._db.getFixedSuperTable()
1931
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1932
        if dice == 0:
1933
            sTable.addTag(dbc, "extraTag", "int")
1934
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1935
        elif dice == 1:
1936
            sTable.dropTag(dbc, "extraTag")
1937
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1938
        elif dice == 2:
1939
            sTable.dropTag(dbc, "newTag")
1940
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1941
        else:  # dice == 3
1942
            sTable.changeTag(dbc, "extraTag", "newTag")
1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1959
    CHANCE_TO_RESTART_SERVICE = 200
1960 1961 1962 1963
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
1964

1965 1966
        with self._classLock:
            if self._isRunning:
S
Steven Li 已提交
1967
                Logging.info("Skipping restart task, another running already")
1968 1969 1970
                return
            self._isRunning = True

1971
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
1972 1973 1974
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
1975

1976
        self._isRunning = False
S
Shuduo Sang 已提交
1977

1978
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1979 1980
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1981

1982 1983 1984
    # We use these two files to record operations to DB, useful for power-off tests
    fAddLogReady = None # type: TextIOWrapper
    fAddLogDone  = None # type: TextIOWrapper
1985 1986 1987

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
1988 1989
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
1990
                Logging.info(
S
Shuduo Sang 已提交
1991
                    "Recording in a file operations to be performed...")
1992
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
1993
            if (cls.fAddLogDone is None):
1994
                Logging.info("Recording in a file operations completed...")
1995
                cls.fAddLogDone = open("add_log_done.txt", "w")
1996

1997
    @classmethod
1998 1999
    def getEndState(cls):
        return StateHasData()
2000 2001 2002 2003

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2004

2005 2006 2007 2008
    def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): 
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        
        fullTableName = db.getName() + '.' + regTableName

2009
        sql = "INSERT INTO {} VALUES ".format(fullTableName)
2010 2011 2012
        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2013 2014
            nextColor = db.getNextColor()
            sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
2015 2016
        dbc.execute(sql)

2017
    def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
2018 2019 2020 2021 2022
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS        

        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2023
            nextColor = db.getNextColor()
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036
            if gConfig.record_ops:
                self.prepToRecordOps()
                self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                self.fAddLogReady.flush()
                os.fsync(self.fAddLogReady)
                
            # TODO: too ugly trying to lock the table reliably, refactor...
            fullTableName = db.getName() + '.' + regTableName
            if gConfig.verify_data:
                self.lockTable(fullTableName) 
                # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written

            try:
2037
                sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
2038 2039 2040
                    fullTableName,
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2041
                    nextTick, nextInt, nextColor)
2042
                dbc.execute(sql)
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054

                # Quick hack, attach an update statement here. TODO: create an "update" task
                if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
                    nextInt = db.getNextInt()
                    nextColor = db.getNextColor()
                    sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
                    fullTableName,
                    nextTick, nextInt, nextColor)
                    # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
                    #     fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
                    dbc.execute(sql)

2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070
            except: # Any exception at all
                if gConfig.verify_data:
                    self.unlockTable(fullTableName)     
                raise

            # Now read it back and verify, we might encounter an error if table is dropped
            if gConfig.verify_data: # only if command line asks for it
                try:
                    readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
                        format(db.getName(), regTableName, nextTick))
                    if readBack != nextInt :
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data, wrote: {}, read: {}"
                            .format(nextInt, readBack), 0x999)
                except taos.error.ProgrammingError as err:
                    errno = Helper.convertErrno(err.errno)
2071
                    if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT]  : # not a single result
2072 2073
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data for tick: {}, wrote: {}, read: {}"
2074
                            .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092
                            errno)
                    elif errno in [0x218, 0x362]: # table doesn't exist
                        # do nothing
                        dummy = 0
                    else:
                        # Re-throw otherwise
                        raise
                finally:
                    self.unlockTable(fullTableName) # Unlock the table no matter what

            # Successfully wrote the data into the DB, let's record it somehow
            te.recordDataMark(nextInt)

            if gConfig.record_ops:
                self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                self.fAddLogDone.flush()
                os.fsync(self.fAddLogDone)

2093
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2094 2095
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2096
        dbc = wt.getDbConn()
2097 2098 2099 2100
        numTables  = self.LARGE_NUMBER_OF_TABLES  if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
        numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
        tblSeq = list(range(numTables ))
        random.shuffle(tblSeq) # now we have random sequence
S
Shuduo Sang 已提交
2101 2102
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2103 2104
                # print("x", end="", flush=True) # concurrent insertion
                Progress.emit(Progress.CONCURRENT_INSERTION)
2105
            else:
S
Shuduo Sang 已提交
2106
                self.activeTable.add(i)  # marking it active
2107
            
2108
            dbName = db.getName()
2109
            sTable = db.getFixedSuperTable()
2110
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)            
2111
            fullTableName = dbName + '.' + regTableName
2112
            # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
2113
            sTable.ensureTable(self, wt.getDbConn(), regTableName)  # Ensure the table exists           
2114
            # self._unlockTable(fullTableName)
2115
           
2116 2117 2118 2119
            if Dice.throw(1) == 0: # 1 in 2 chance
                self._addData(db, dbc, regTableName, te)
            else:
                self._addDataInBatch(db, dbc, regTableName, te)
2120

S
Shuduo Sang 已提交
2121
            self.activeTable.discard(i)  # not raising an error, unlike remove
2122 2123


2124 2125 2126 2127 2128 2129 2130 2131 2132
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
2133
        for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
2145
            print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
2146
            stackFrame = 0
2147
            for frame in stack: # was using: reversed(stack)
2148
                # print(frame)
2149 2150
                print("[{sf}] File {filename}, line {lineno}, in {name}".format(
                    sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
2151
                print("    {}".format(frame.line))
2152
                stackFrame += 1
2153
            print("-----> End of Thread Info ----->\n")
S
Shuduo Sang 已提交
2154

2155 2156
class ClientManager:
    def __init__(self):
S
Steven Li 已提交
2157
        Logging.info("Starting service manager")
2158 2159
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2160

2161
        self._status = Status.STATUS_RUNNING
2162 2163
        self.tc = None

2164 2165
        self.inSigHandler = False

2166
    def sigIntHandler(self, signalNumber, frame):
2167
        if self._status != Status.STATUS_RUNNING:
2168 2169 2170
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2171
        self._status = Status.STATUS_STOPPING  # immediately set our status
2172

2173
        print("ClientManager: Terminating program...")
2174 2175
        self.tc.requestToStop()

2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2246

2247
    def run(self, svcMgr):    
2248
        # self._printLastNumbers()
2249
        global gConfig
2250

2251 2252 2253 2254
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

2255
        dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget())  # Regular function
2256
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2257
        self.tc = ThreadCoordinator(thPool, dbManager)
2258
        
S
Steven Li 已提交
2259
        Logging.info("Starting client instance: {}".format(tInst))
2260
        self.tc.run()
S
Steven Li 已提交
2261 2262
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2263
        if svcMgr: # gConfig.auto_start_service:
2264
            svcMgr.stopTaosServices()
2265
            svcMgr = None
2266
        
2267 2268 2269 2270 2271

        # Release global variables
        gConfig = None
        gSvcMgr = None
        logger = None
2272 2273 2274 2275
        
        thPool = None
        dbManager.cleanUp() # destructor wouldn't run in time
        dbManager = None
2276

2277 2278 2279 2280 2281 2282
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()
2283 2284 2285 2286 2287 2288 2289 2290 2291
        # Release variables here
        self.tc = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
2292 2293

    def conclude(self):
2294
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
2295
        self.tc.printStats()
2296

2297
class MainExec:
2298 2299
    def __init__(self):        
        self._clientMgr = None
2300
        self._svcMgr = None # type: ServiceManager
2301

2302 2303 2304
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2305

2306 2307 2308 2309 2310 2311 2312
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
2313
        if  self._svcMgr:
2314
            self._svcMgr.sigIntHandler(signalNumber, frame)
2315
        if  self._clientMgr:
2316 2317 2318
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2319
        global gSvcMgr
2320
        if gConfig.auto_start_service:
2321 2322
            gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
            gSvcMgr.startTaosServices() # we start, don't run
2323 2324
        
        self._clientMgr = ClientManager()
2325 2326 2327 2328
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
2329
            Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
2330 2331
            # don't raise
        return ret
2332 2333

    def runService(self):
2334
        global gSvcMgr
2335
        gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
2336

2337 2338
        gSvcMgr.run() # run to some end state
        gSvcMgr = self._svcMgr = None 
2339

2340 2341 2342 2343
    def init(self): # TODO: refactor
        global gContainer
        gContainer = Container() # micky-mouse DI

2344 2345 2346
        global gSvcMgr # TODO: refactor away
        gSvcMgr = None

2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387
        # Super cool Python argument library:
        # https://docs.python.org/3/library/argparse.html
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawDescriptionHelpFormatter,
            description=textwrap.dedent('''\
                TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
                ---------------------------------------------------------------------
                1. You build TDengine in the top level ./build directory, as described in offical docs
                2. You run the server there before this script: ./build/bin/taosd -c test/cfg

                '''))                      

        parser.add_argument(
            '-a',
            '--auto-start-service',
            action='store_true',
            help='Automatically start/stop the TDengine service (default: false)')
        parser.add_argument(
            '-b',
            '--max-dbs',
            action='store',
            default=0,
            type=int,
            help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
        parser.add_argument(
            '-c',
            '--connector-type',
            action='store',
            default='native',
            type=str,
            help='Connector type to use: native, rest, or mixed (default: 10)')
        parser.add_argument(
            '-d',
            '--debug',
            action='store_true',
            help='Turn on DEBUG mode for more logging (default: false)')
        parser.add_argument(
            '-e',
            '--run-tdengine',
            action='store_true',
            help='Run TDengine service in foreground (default: false)')
2388 2389 2390 2391 2392 2393 2394
        parser.add_argument(
            '-g',
            '--ignore-errors',
            action='store',
            default=None,
            type=str,
            help='Ignore error codes, comma separated, 0x supported (default: None)')
2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406
        parser.add_argument(
            '-i',
            '--max-replicas',
            action='store',
            default=1,
            type=int,
            help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
        parser.add_argument(
            '-l',
            '--larger-data',
            action='store_true',
            help='Write larger amount of data during write operations (default: false)')
2407 2408 2409 2410 2411
        parser.add_argument(
            '-m',
            '--mix-oos-data',
            action='store_false',
            help='Mix out-of-sequence data into the test data stream (default: true)')
2412 2413 2414 2415
        parser.add_argument(
            '-n',
            '--dynamic-db-table-names',
            action='store_true',
2416
            help='Use non-fixed names for dbs/tables, for -b, useful for multi-instance executions (default: false)')        
2417 2418 2419 2420 2421 2422 2423
        parser.add_argument(
            '-o',
            '--num-dnodes',
            action='store',
            default=1,
            type=int,
            help='Number of Dnodes to initialize, used with -e option. (default: 1)')
2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452
        parser.add_argument(
            '-p',
            '--per-thread-db-connection',
            action='store_true',
            help='Use a single shared db connection (default: false)')
        parser.add_argument(
            '-r',
            '--record-ops',
            action='store_true',
            help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
        parser.add_argument(
            '-s',
            '--max-steps',
            action='store',
            default=1000,
            type=int,
            help='Maximum number of steps to run (default: 100)')
        parser.add_argument(
            '-t',
            '--num-threads',
            action='store',
            default=5,
            type=int,
            help='Number of threads to run (default: 10)')
        parser.add_argument(
            '-v',
            '--verify-data',
            action='store_true',
            help='Verify data written in a number of places by reading back (default: false)')
2453 2454 2455 2456 2457
        parser.add_argument(
            '-w',
            '--use-shadow-db',
            action='store_true',
            help='Use a shaddow database to verify data integrity (default: false)')
2458 2459 2460 2461 2462 2463 2464 2465
        parser.add_argument(
            '-x',
            '--continue-on-exception',
            action='store_true',
            help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')

        global gConfig
        gConfig = parser.parse_args()
2466 2467 2468 2469 2470
        crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var

        # Sanity check for arguments
        if gConfig.use_shadow_db and gConfig.max_dbs>1 :
            raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
S
Shuduo Sang 已提交
2471

2472
        Logging.clsInit(gConfig)
2473 2474 2475 2476 2477

        Dice.seed(0)  # initial seeding of dice

    def run(self):
        if gConfig.run_tdengine:  # run server
2478 2479 2480 2481 2482 2483
            try:
                self.runService()
                return 0 # success
            except ConnectionError as err:
                Logging.error("Failed to make DB connection, please check DB instance manually")
            return -1 # failure
2484 2485
        else:
            return self.runClient()
S
Steven Li 已提交
2486

2487

2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
2509