“5d0a2de595c2754429bf2232ae659ca10367b184”上不存在“develop/api_doc/v2/fluid/executor.html”
crash_gen_main.py 101.2 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2
###################################################################
3
#           Copyright (c) 2016-2021 by TAOS Technologies, Inc.
S
Steven Li 已提交
4 5 6 7 8 9 10 11 12 13
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
17

18
from typing import Any, Set, Tuple
S
Shuduo Sang 已提交
19 20
from typing import Dict
from typing import List
21
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
22

S
Shuduo Sang 已提交
23 24
import textwrap
import time
25
import datetime
S
Shuduo Sang 已提交
26 27 28
import random
import threading
import argparse
29

S
Steven Li 已提交
30
import sys
31
import os
32
import io
33
import signal
34
import traceback
35
import requests
36
# from guppy import hpy
37
import gc
38 39 40
import taos

from .shared.types import TdColumns, TdTags
41

42
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
43 44 45
# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
#     CrashGenError, Progress, MyTDSql, \
#     TdeInstance
46

47 48 49 50 51 52
from .service_manager import ServiceManager, TdeInstance

from .shared.config import Config
from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql
from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress
from .shared.types import TdDataType
53

54
# Config.init()
55

56 57 58 59
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
60
# Global variables, tried to keep a small number.
61 62 63

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
64 65
# gConfig:    argparse.Namespace 
gSvcMgr:    Optional[ServiceManager] # TODO: refactor this hack, use dep injection
66
# logger:     logging.Logger
67
gContainer: Container
S
Steven Li 已提交
68

69 70
# def runThread(wt: WorkerThread):
#     wt.run()
71

72

S
Steven Li 已提交
73
class WorkerThread:
74 75 76 77
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
        """
            Note: this runs in the main thread context
        """                 
S
Shuduo Sang 已提交
78
        # self._curStep = -1
79
        self._pool = pool
S
Shuduo Sang 已提交
80 81
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
82
        # self.threadIdent = threading.get_ident()
83 84
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
85
        self._stepGate = threading.Event()
S
Steven Li 已提交
86

87
        # Let us have a DB connection of our own
88
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
89
            # print("connector_type = {}".format(gConfig.connector_type))
90
            tInst = gContainer.defTdeInstance
91
            if Config.getConfig().connector_type == 'native':                
92
                self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
93
            elif Config.getConfig().connector_type == 'rest':
94
                self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
95
            elif Config.getConfig().connector_type == 'mixed':
96
                if Dice.throw(2) == 0: # 1/2 chance
97
                    self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
98
                else:
99
                    self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
100
            else:
101
                raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type))
102

103
        # self._dbInUse = False  # if "use db" was executed already
104

105
    def logDebug(self, msg):
106
        Logging.debug("    TRD[{}] {}".format(self._tid, msg))
107 108

    def logInfo(self, msg):
109
        Logging.info("    TRD[{}] {}".format(self._tid, msg))
110

111 112
    # def dbInUse(self):
    #     return self._dbInUse
113

114 115 116 117
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
118

119
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
120
        return self._tc.getTaskExecutor()
121

S
Steven Li 已提交
122
    def start(self):
123
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
124

S
Shuduo Sang 已提交
125
    def run(self):
S
Steven Li 已提交
126
        # initialization after thread starts, in the thread context
127
        # self.isSleeping = False
128
        Logging.info("Starting to run thread: {}".format(self._tid))
129

130
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
131
            Logging.debug("Worker thread openning database connection")
132
            self._dbConn.open()
S
Steven Li 已提交
133

S
Shuduo Sang 已提交
134 135
        self._doTaskLoop()

136
        # clean up
137
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
138 139 140
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
141
                Logging.warning("Cleaning up worker thread, dbConn already closed")
142

S
Shuduo Sang 已提交
143
    def _doTaskLoop(self):
144 145
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
146 147
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
148 149 150
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
151
                print("_bto", end="")
152
                Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
153 154
                break

155
            Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
156
            self.crossStepGate()   # then per-thread gate, after being tapped
157
            Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
158
            if not self._tc.isRunning():
159
                print("_wts", end="")
160
                Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
161 162
                break

163
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
164
            try:
165
                if (Config.getConfig().per_thread_db_connection):  # most likely TRUE
166 167
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
168
                # self.useDb() # might encounter exceptions. TODO: catch
169 170
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
171
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
172 173 174 175 176 177
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

178
            # Fetch a task from the Thread Coordinator
179
            Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
180
            task = tc.fetchTask()
181 182

            # Execute such a task
183
            Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
184
                    self._tid, task.__class__.__name__))
185
            task.execute(self)
186
            tc.saveExecutedTask(task)
187
            Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
188

189
            # self._dbInUse = False  # there may be changes between steps
190
        # print("_wtd", end=None) # worker thread died
191

S
Shuduo Sang 已提交
192 193
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
194 195
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
196 197
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
198 199 200
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
201
        if (not self._thread.is_alive()):
S
Steven Li 已提交
202 203
            raise RuntimeError("Unexpected dead thread")

204
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
205 206
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
207 208
        self.verifyThreadSelf()  # only allowed by ourselves

209
        # Wait again at the "gate", waiting to be "tapped"
210
        Logging.debug(
S
Shuduo Sang 已提交
211 212 213
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
214
        self._stepGate.clear()
S
Shuduo Sang 已提交
215

216
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
217

S
Shuduo Sang 已提交
218
    def tapStepGate(self):  # give it a tap, release the thread waiting there
219
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
220 221
        self.verifyThreadMain()  # only allowed for main thread

222
        if self._thread.is_alive():
223
            Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
224 225 226 227
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
228

S
Shuduo Sang 已提交
229
    def execSql(self, sql):  # TODO: expose DbConn directly
230
        return self.getDbConn().execute(sql)
231

S
Shuduo Sang 已提交
232
    def querySql(self, sql):  # TODO: expose DbConn directly
233
        return self.getDbConn().query(sql)
234 235

    def getQueryResult(self):
236
        return self.getDbConn().getQueryResult()
237

238
    def getDbConn(self) -> DbConn :
239
        if (Config.getConfig().per_thread_db_connection):
S
Shuduo Sang 已提交
240
            return self._dbConn
241
        else:
242
            return self._tc.getDbManager().getDbConn()
243

244 245
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
246
    #         return self._dbConn.query(sql)
247 248
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
249

250
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
251 252


253
class ThreadCoordinator:
254
    WORKER_THREAD_TIMEOUT = 120  # Normal: 120
255

256
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
257
        self._curStep = -1  # first step is 0
258
        self._pool = pool
259
        # self._wd = wd
S
Shuduo Sang 已提交
260
        self._te = None  # prepare for every new step
261
        self._dbManager = dbManager # type: Optional[DbManager] # may be freed
S
Shuduo Sang 已提交
262 263
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
264

S
Shuduo Sang 已提交
265 266
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
267
        self._execStats = ExecutionStats()
268
        self._runStatus = Status.STATUS_RUNNING
269
        self._initDbs()
270
        self._stepStartTime = None  # Track how long it takes to execute each step
S
Steven Li 已提交
271

272
    def getTaskExecutor(self):
273 274
        if self._te is None:
            raise CrashGenError("Unexpected empty TE")
275 276
        return self._te

S
Shuduo Sang 已提交
277
    def getDbManager(self) -> DbManager:
278 279
        if self._dbManager is None:
            raise ChildProcessError("Unexpected empty _dbManager")
280
        return self._dbManager
281

282 283
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
284

285
    def requestToStop(self):
286
        self._runStatus = Status.STATUS_STOPPING
287 288
        self._execStats.registerFailure("User Interruption")

289
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
290
        maxSteps = Config.getConfig().max_steps  # type: ignore
291 292
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
293
        if self._runStatus != Status.STATUS_RUNNING:
294 295 296 297 298
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
299 300
        if workerTimeout:
            return True
301 302 303 304 305 306 307 308 309 310 311 312 313
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
314
        Logging.debug(
315 316 317 318 319 320 321
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

322
        Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
323 324
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
325
        self.tapAllThreads() # release all worker thread from their "gates"
326 327 328 329 330

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
331
        Logging.debug("[TRD] Main thread about to cross the barrier")
332
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
333
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
334
        Logging.debug("[TRD] Main thread finished crossing the barrier")
335 336 337 338

    def _doTransition(self):
        transitionFailed = False
        try:
339 340 341
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
342
                Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
343 344 345
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
346
                Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
347 348

            # Due to limitation (or maybe not) of the TD Python library,
349
            # we cannot share connections across threads
350 351 352 353
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
354
            #         Logging.debug("[DB] use db for all worker threads")
355
            #         t.useDb()
356 357
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
358

359 360
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
361
                Logging.info("DB connection broken, execution failed")
362 363 364 365 366 367
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
368 369
            if isinstance(err, CrashGenError): # our own transition failure
                Logging.info("State transition error")
370
                # TODO: saw an error here once, let's print out stack info for err?
371 372 373
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
374
                self._execStats.registerFailure("State transition error: {}".format(err))
375 376
            else:
                raise
S
Steven Li 已提交
377
        # return transitionFailed # Why did we have this??!!
378 379 380

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
381
        Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
382 383
        return transitionFailed

S
Shuduo Sang 已提交
384
    def run(self):
385
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
386 387

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
388
        self._curStep = -1  # not started yet
389
        
S
Shuduo Sang 已提交
390
        self._execStats.startExec()  # start the stop watch
391 392
        transitionFailed = False
        hasAbortedTask = False
393 394
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
395
            if not Config.getConfig().debug: # print this only if we are not in debug mode    
396 397
                Progress.emit(Progress.STEP_BOUNDARY)            
                # print(".", end="", flush=True)
398 399 400 401 402 403 404 405
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
406
                        
407 408
            try:
                self._syncAtBarrier() # For now just cross the barrier
409
                Progress.emit(Progress.END_THREAD_STEP)
410 411 412 413
                if self._stepStartTime :
                    stepExecTime = time.time() - self._stepStartTime
                    Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
                    DbConnNative.resetTotalRequests() # reset to zero
414 415
            except threading.BrokenBarrierError as err:
                self._execStats.registerFailure("Aborted due to worker thread timeout")
416 417 418 419
                Logging.error("\n")
                Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
                    ThreadCoordinator.WORKER_THREAD_TIMEOUT))
                Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
420 421 422
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
423 424 425 426 427 428

                # Enable below for deadlock debugging, using gdb to attach to process
                # while True:
                #     Logging.error("Deadlock detected")
                #     time.sleep(60.0)

429
                break
430 431

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
432 433
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
434 435
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
436
                Logging.info("Aborted task encountered, exiting test program")
437
                self._execStats.registerFailure("Aborted Task Encountered")
438
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
439

440
            # Ending previous step
441 442 443 444
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
445
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
446
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
447
                Logging.info(errMsg)
448
                traceback.print_exc()
S
Steven Li 已提交
449
                self._execStats.registerFailure(errMsg)
450

451
            # Then we move on to the next step
452
            Progress.emit(Progress.BEGIN_THREAD_STEP)
453
            self._stepStartTime = time.time()
454
            self._releaseAllWorkerThreads(transitionFailed)                    
455

456
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
457
            Logging.debug("Abnormal ending of main thraed")
458
        elif workerTimeout:
459
            Logging.debug("Abnormal ending of main thread, due to worker timeout")
460
        else: # regular ending, workers waiting at "barrier"
461
            Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
462
            self._syncAtBarrier()
463

464
        self._te = None  # No more executor, time to end
465
        Logging.debug("Main thread tapping all threads one last time...")
466
        self.tapAllThreads()  # Let the threads run one last time
467

468 469
        Logging.debug("\r\n\n--> Main thread ready to finish up...")
        Logging.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
470
        self._pool.joinAll()  # Get all threads to finish
S
Steven Li 已提交
471
        Logging.info(". . . All worker threads finished") # No CR/LF before
472 473
        self._execStats.endExec()

474 475 476 477 478 479
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
480
        self._executedTasks = []
481 482 483 484 485 486
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


487 488
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
489

S
Steven Li 已提交
490 491 492 493 494 495
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
496
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
497
        wakeSeq = []
S
Shuduo Sang 已提交
498 499
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
500 501 502
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
503
        Logging.debug(
S
Shuduo Sang 已提交
504 505
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
506
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
507
        for i in wakeSeq:
S
Shuduo Sang 已提交
508 509 510
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
511

512
    def isRunning(self):
S
Shuduo Sang 已提交
513
        return self._te is not None
514

515 516 517 518
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
519
        if Config.getConfig().max_dbs == 0:
520
            self._dbs.append(Database(0, dbc))
S
Steven Li 已提交
521 522
        else:            
            baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
523 524
                )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0
            for i in range(Config.getConfig().max_dbs):
525
                self._dbs.append(Database(baseDbNumber + i, dbc))
526 527 528

    def pickDatabase(self):
        idxDb = 0
529 530
        if Config.getConfig().max_dbs != 0 :
            idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1
531 532 533
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
534
    def fetchTask(self) -> Task:
535 536 537
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
538
        if (not self.isRunning()):  # no task
539
            raise RuntimeError("Cannot fetch task when not running")
540

S
Shuduo Sang 已提交
541
        # pick a task type for current state
542
        db = self.pickDatabase()
543
        taskType = db.getStateMachine().pickTaskType() # dynamic name of class
544
        return taskType(self._execStats, db)  # create a task from it
545 546

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
547
        self._executedTasks = []  # should be under single thread
548 549 550 551

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
552

553
class ThreadPool:
554
    def __init__(self, numThreads, maxSteps):
555 556 557 558
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
559 560
        self.threadList = []  # type: List[WorkerThread]

561
    # starting to run all the threads, in locking steps
562
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
563 564
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
565
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
566
            workerThread.start()  # start, but should block immediately before step 0
567 568 569

    def joinAll(self):
        for workerThread in self.threadList:
570
            Logging.debug("Joining thread...")
571 572
            workerThread._thread.join()

573
    def cleanup(self):
574
        self.threadList = [] # maybe clean up each?
575

576 577
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
578 579


S
Steven Li 已提交
580 581
class LinearQueue():
    def __init__(self):
582
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
583
        self.lastIndex = 0
S
Shuduo Sang 已提交
584 585
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
586

587
    def toText(self):
S
Shuduo Sang 已提交
588 589
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
590 591

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
592
    def push(self):
593
        with self._lock:
S
Shuduo Sang 已提交
594 595
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
596
            #     return self.firstIndex
597 598
            # Otherwise we have something
            self.lastIndex += 1
599 600
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
601
            return self.lastIndex
S
Steven Li 已提交
602 603

    def pop(self):
604
        with self._lock:
S
Shuduo Sang 已提交
605 606 607 608
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

609
            index = self.firstIndex
S
Shuduo Sang 已提交
610
            if (index in self.inUse):
611 612
                return False

613 614 615 616 617 618 619
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
620
        with self._lock:
621 622 623 624
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
625
    def allocate(self, i):
626
        with self._lock:
627
            # Logging.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
628 629 630
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
631 632
            self.inUse.add(i)

S
Steven Li 已提交
633
    def release(self, i):
634
        with self._lock:
635
            # Logging.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
636
            self.inUse.remove(i)  # KeyError possible, TODO: why?
637 638 639 640

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
641
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
642
        if (self.isEmpty()):
643 644
            return None
        with self._lock:
S
Shuduo Sang 已提交
645
            cnt = 0  # counting the interations
646 647
            while True:
                cnt += 1
S
Shuduo Sang 已提交
648
                if (cnt > self.size() * 10):  # 10x iteration already
649 650
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
651 652
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
653 654 655
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
656

657
class AnyState:
S
Shuduo Sang 已提交
658 659 660
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
661
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
662
    STATE_HAS_DATA = 3  # we have some data in the table
663 664 665 666
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
667 668 669
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
670 671
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
672 673 674 675 676 677 678
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
679 680
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
681

682 683
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
684
    def getInfo(self) -> List[Any]:
685 686
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
687 688 689 690 691 692
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
693 694 695
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
696

697 698 699
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
700 701 702
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

703 704
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
705

706 707
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
708

709
    def canDropDb(self):
710 711
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
712
        if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db : 
713
            return False
714
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
715

716 717
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
718

719
    def canDropFixedSuperTable(self):
720
        if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
721
            return False
722
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
723

724 725
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
726

727 728 729 730 731
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
732
        for task in tasks:
733 734 735
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
736
                # task.logDebug("Task success found")
737
                sCnt += 1
S
Shuduo Sang 已提交
738
                if (sCnt >= 2):
739
                    raise CrashGenError(
S
Shuduo Sang 已提交
740
                        "Unexpected more than 1 success with task: {}".format(cls))
741 742 743 744

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
745
        for task in tasks:
746 747
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
748
            exists = True  # we have a valid instance
749 750
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
751
        if (exists and sCnt <= 0):
752
            raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
S
Steven Li 已提交
753
                .format(cls, tasks))
754 755

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
756
        for task in tasks:
757
            if isinstance(task, cls):
S
Shuduo Sang 已提交
758 759
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
760 761

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
762
        for task in tasks:
763 764
            if isinstance(task, cls):
                if task.isSuccess():
765
                    raise CrashGenError(
S
Shuduo Sang 已提交
766
                        "Unexpected successful task: {}".format(cls))
767 768

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
769
        for task in tasks:
770 771 772 773 774 775
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
776
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
777
        for task in tasks:
S
Steven Li 已提交
778 779 780 781
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
782

783 784 785 786
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
787 788 789
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
790 791 792 793
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
794

795 796 797 798
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
799 800 801
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
802 803
        ]

S
Shuduo Sang 已提交
804 805
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
806
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
807 808 809 810
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

811 812 813 814 815 816 817 818 819 820 821

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
822 823 824
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
825 826 827 828 829

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
830

S
Shuduo Sang 已提交
831

832
class StateSuperTableOnly(AnyState):
833 834 835 836 837 838 839 840 841
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
842
        if (self.hasSuccess(tasks, TaskDropSuperTable)
843
                ):  # we are able to drop the table
844
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
845 846
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
847

848
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
849 850
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
851
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
852 853 854
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
855
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
856 857 858
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
859

S
Shuduo Sang 已提交
860

861 862 863 864 865 866 867 868 869 870
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
871
        if (newState.equals(AnyState.STATE_EMPTY)):
872
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
873 874 875 876
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
877
                ):  # without a create_db task
S
Shuduo Sang 已提交
878 879
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
880
            self.hasSuccess(tasks, TaskDropSuperTable)
881
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
882 883 884 885
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
886
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
887 888
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
889
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
890 891 892
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
893
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
894 895
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
896
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
897

S
Shuduo Sang 已提交
898

899
class StateMechine:
900 901 902
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
903
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
904

905
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
906 907 908 909 910 911
        try:
            self._curState = self._findCurrentState(dbc)  # starting state
        except taos.error.ProgrammingError as err:            
            Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
            traceback.print_stack()
            raise # re-throw
912 913

    # TODO: seems no lnoger used, remove?
914 915 916
    def getCurrentState(self):
        return self._curState

917 918 919
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

920
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
921
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
922
        def typesToStrings(types) -> List:
923 924 925 926 927
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
928
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
929 930
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
931
            # t = tc(self) # create task object
932 933
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
934 935 936 937 938 939 940 941
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
942
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
943
                    taskTypes.append(tc)  # gather it
944 945

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
946 947 948
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
949
        Logging.debug(
S
Shuduo Sang 已提交
950 951 952
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
953 954
        return taskTypes

955
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
956
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
957 958
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
959
            Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
960
            return StateEmpty()
S
Shuduo Sang 已提交
961 962
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
963
        dbc.use(dbName)
964
        if not dbc.hasTables():  # no tables
965
            Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
966
            return StateDbOnly()
967

968
        # For sure we have tables, which means we must have the super table. # TODO: are we sure?
969
        sTable = self._db.getFixedSuperTable()
970
        if sTable.hasRegTables(dbc):  # no regular tables
971
            Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
972
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
973
        else:  # has actual tables
974
            Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
975 976
            return StateHasData()

977 978
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
979 980
        global gSvcMgr
        
S
Shuduo Sang 已提交
981
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
982
            Logging.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
983
            return  # do nothing
984

S
Shuduo Sang 已提交
985
        # this should show up in the server log, separating steps
986
        dbc.execute("show dnodes")
987 988 989 990

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
991 992
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
993 994

        if self._curState.canDropDb():
995
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
996
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
997 998
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
999 1000 1001

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1002 1003
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1004 1005 1006

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1007 1008
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1009 1010

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1011 1012
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1013 1014 1015 1016

        # if self._state.canReadData():
            # Nothing for sure

1017
        newState = self._findCurrentState(dbc)
1018
        Logging.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1019 1020
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1021 1022 1023
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1024 1025
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1026 1027 1028
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1029 1030 1031
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1032
            else:
S
Shuduo Sang 已提交
1033 1034
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1035
        i = self._weighted_choice_sub(weights)
1036
        # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1037 1038
        return taskTypes[i]

S
Shuduo Sang 已提交
1039 1040
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
1041
    def _weighted_choice_sub(self, weights) -> int:
S
Shuduo Sang 已提交
1042 1043
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1044 1045 1046 1047
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1048
        raise CrashGenError("Unexpected no choice")
1049

1050 1051 1052 1053 1054
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
1055 1056

        TODO: consider moving, but keep in mind it contains "StateMachine"
1057
    '''
1058 1059
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
1060 1061
    _lastTick = None # Optional[datetime]
    _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions
1062

1063 1064 1065 1066
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1067
          
1068
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1069

1070 1071
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1072

1073 1074
    def getDbNum(self):
        return self._dbNum
1075

1076 1077
    def getName(self):
        return "db_{}".format(self._dbNum)
1078

1079 1080 1081 1082 1083 1084
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1085

1086 1087 1088 1089 1090
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1091

1092 1093 1094 1095
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

1096 1097
    def getFixedSuperTable(self) -> TdSuperTable:
        return TdSuperTable(self.getFixedSuperTableName(), self.getName())
1098 1099 1100 1101 1102 1103

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1104
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1105 1106
    @classmethod
    def setupLastTick(cls):
1107
        t1 = datetime.datetime(2020, 6, 1)
1108
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1109 1110 1111 1112
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1113
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1114 1115 1116
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1117
        Logging.debug("Setting up TICKS to start from: {}".format(t4))
1118 1119
        return t4

1120
    @classmethod
1121 1122 1123 1124
    def getNextTick(cls):       
        '''
            Fetch a timestamp tick, with some random factor, may not be unique.
        ''' 
1125
        with cls._clsLock:  # prevent duplicate tick
1126
            if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized
1127
                # 10k at 1/20 chance, should be enough to avoid overlaps
S
Steven Li 已提交
1128 1129
                tick = cls.setupLastTick()
                cls._lastTick = tick
1130
                cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2)  # lagging behind 2 minutes, should catch up fast
S
Steven Li 已提交
1131
                # if : # should be quite a bit into the future
1132

1133
            if Config.isSet('mix_oos_data') and Dice.throw(20) == 0:  # if asked to do so, and 1 in 20 chance, return lagging tick
1134
                cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
1135
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1136 1137
            else:  # regular
                # add one second to it
1138 1139
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1140 1141

    def getNextInt(self):
1142 1143 1144
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1145 1146

    def getNextBinary(self):
S
Shuduo Sang 已提交
1147 1148
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1149 1150

    def getNextFloat(self):
1151 1152 1153
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1154

1155 1156 1157 1158 1159
    ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']

    def getNextColor(self):
        return random.choice(self.ALL_COLORS)

1160

1161
class TaskExecutor():
1162
    class BoundedList:
S
Shuduo Sang 已提交
1163
        def __init__(self, size=10):
1164 1165
            self._size = size
            self._list = []
S
Steven Li 已提交
1166
            self._lock = threading.Lock()
1167

S
Shuduo Sang 已提交
1168
        def add(self, n: int):
S
Steven Li 已提交
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1195 1196 1197 1198 1199 1200

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1201 1202 1203
    def __init__(self, curStep):
        self._curStep = curStep

1204 1205 1206 1207
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1208 1209 1210
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1211
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1212
        task.execute(wt)
1213

1214 1215 1216 1217
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1218
    # def logInfo(self, msg):
1219
    #     Logging.info("    T[{}.x]: ".format(self._curStep) + msg)
1220

1221
    # def logDebug(self, msg):
1222
    #     Logging.debug("    T[{}.x]: ".format(self._curStep) + msg)
1223

S
Shuduo Sang 已提交
1224

S
Steven Li 已提交
1225
class Task():
1226 1227 1228 1229
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1230
    taskSn = 100
1231 1232
    _lock = threading.Lock()
    _tableLocks: Dict[str, threading.Lock] = {}
1233 1234 1235

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1236
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
1237
        # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
S
Steven Li 已提交
1238
        return Task.taskSn
1239

1240
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1241
        self._workerThread = None
1242
        self._err: Optional[Exception] = None
1243
        self._aborted = False
1244
        self._curStep = None
S
Shuduo Sang 已提交
1245
        self._numRows = None  # Number of rows affected
1246

S
Shuduo Sang 已提交
1247
        # Assign an incremental task serial number
1248
        self._taskNum = self.allocTaskNum()
1249
        # Logging.debug("Creating new task {}...".format(self._taskNum))
1250

1251
        self._execStats = execStats
1252
        self._db = db # A task is always associated/for a specific DB
1253

1254 1255
        

1256
    def isSuccess(self):
S
Shuduo Sang 已提交
1257
        return self._err is None
1258

1259 1260 1261
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1262
    def clone(self):  # TODO: why do we need this again?
1263
        newTask = self.__class__(self._execStats, self._db)
1264 1265
        return newTask

1266 1267 1268
    def getDb(self):
        return self._db

1269
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1270 1271 1272
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1273 1274

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1275 1276 1277
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1278

1279
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1280 1281 1282
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1283

1284 1285 1286 1287 1288
    def _isServiceStable(self):
        if not gSvcMgr:
            return True  # we don't run service, so let's assume it's stable
        return gSvcMgr.isStable() # otherwise let's examine the service

1289 1290 1291
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1292
                0x0B,  # Unable to establish connection, more details in TD-1648
1293
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1294
                0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
1295
                0x213, # "Disconnected from service", result of "kill connection ???"
1296
                0x217, # "db not selected", client side defined error code
1297 1298 1299 1300
                # 0x218, # "Table does not exist" client side defined error code
                0x360, # Table already exists
                0x362, 
                # 0x369, # tag already exists
1301 1302 1303 1304 1305 1306 1307
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1308
                0x14,   # db not ready, errno changed
1309
                0x600,  # Invalid table ID, why?
1310
                0x218,  # Table does not exist
1311 1312 1313
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1314
        # This case handled below already.
1315
        # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
1316
        #     return True # We may get "network unavilable" when restarting service
1317 1318
        elif Config.getConfig().ignore_errors: # something is specified on command line
            moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')]
1319 1320
            if errno in moreErrnos:
                return True
1321 1322 1323
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1324 1325 1326 1327
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1328
        elif not self._isServiceStable(): # We are managing service, and ...
1329
            Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1330
            return True
1331 1332 1333 1334
        
        return False # Not an acceptable error


1335 1336
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1337
        self._workerThread = wt  # type: ignore
1338 1339

        te = wt.getTaskExecutor()
1340
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1341 1342
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1343

1344
        self._err = None # TODO: type hint mess up?
1345 1346
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1347 1348 1349

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1350
        try:
S
Shuduo Sang 已提交
1351
            self._executeInternal(te, wt)  # TODO: no return value?
1352
        except taos.error.ProgrammingError as err:
1353
            errno2 = Helper.convertErrno(err.errno)
1354
            if (Config.getConfig().continue_on_exception):  # user choose to continue
1355
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1356
                        errno2, err, wt.getDbConn().getLastSql()))
1357
                self._err = err
1358 1359
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1360
                        errno2, err, wt.getDbConn().getLastSql()))
1361 1362
                # print("_", end="", flush=True)
                Progress.emit(Progress.ACCEPTABLE_ERROR)
S
Shuduo Sang 已提交
1363
                self._err = err
1364
            else: # not an acceptable error
1365 1366 1367
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1368
                self.logDebug(errMsg)
1369
                if Config.getConfig().debug:
1370 1371
                    # raise # so that we see full stack
                    traceback.print_exc()
1372 1373
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1374 1375 1376 1377
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1378
        except Exception as e:
S
Steven Li 已提交
1379
            Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
S
Shuduo Sang 已提交
1380
            self._err = e
S
Steven Li 已提交
1381
            self._aborted = True
1382
            traceback.print_exc()
1383
        except BaseException as e2:
1384
            self.logInfo("Python base exception encountered")
1385
            # self._err = e2 # Exception/BaseException incompatible!
1386
            self._aborted = True
S
Steven Li 已提交
1387
            traceback.print_exc()
1388 1389
        # except BaseException: # TODO: what is this again??!!
        #     raise RuntimeError("Punt")
1390 1391 1392 1393
            # self.logDebug(
            #     "[=] Unexpected exception, SQL: {}".format(
            #         wt.getDbConn().getLastSql()))
            # raise
1394
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1395 1396 1397 1398

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1399
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1400

1401
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1402
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1403
        """ Haha """
1404 1405
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1406
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1407 1408
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1409
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1410 1411
        return wt.getQueryResult()

1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
    def lockTable(self, ftName): # full table name
        # print(" <<" + ftName + '_', end="", flush=True)
        with Task._lock:
            if not ftName in Task._tableLocks:
                Task._tableLocks[ftName] = threading.Lock()
        
        Task._tableLocks[ftName].acquire()

    def unlockTable(self, ftName):
        # print('_' + ftName + ">> ", end="", flush=True)
        with Task._lock:
            if not ftName in self._tableLocks:
                raise RuntimeError("Corrupt state, no such lock")
            lock = Task._tableLocks[ftName]
            if not lock.locked():
                raise RuntimeError("Corrupte state, already unlocked")
        lock.release()

1430

1431
class ExecutionStats:
1432
    def __init__(self):
S
Shuduo Sang 已提交
1433
        # total/success times for a task
1434
        self._execTimes: Dict[str, List[int]] = {}
1435 1436
        self._tasksInProgress = 0
        self._lock = threading.Lock()
1437 1438
        self._firstTaskStartTime = 0.0
        self._execStartTime = 0.0
1439
        self._errors = {}
S
Shuduo Sang 已提交
1440 1441
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1442

1443 1444 1445
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1446
    def __str__(self):
S
Shuduo Sang 已提交
1447 1448
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1449 1450

    def isFailed(self):
S
Shuduo Sang 已提交
1451
        return self._failed
S
Steven Li 已提交
1452

1453 1454 1455 1456 1457 1458
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1459
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1460 1461
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1462 1463
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1464
        if isSuccess:
S
Shuduo Sang 已提交
1465
            t[1] += 1  # index 1 has the "success" execution times
1466 1467 1468 1469 1470
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1471 1472 1473

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1474 1475
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1476 1477 1478 1479 1480
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1481
            if self._tasksInProgress == 0:  # all tasks have stopped
1482
                self._accRunTime += (time.time() - self._firstTaskStartTime)
1483
                self._firstTaskStartTime = 0.0
1484

1485 1486 1487 1488
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1489
    def printStats(self):
1490
        Logging.info(
S
Shuduo Sang 已提交
1491
            "----------------------------------------------------------------------")
1492
        Logging.info(
S
Shuduo Sang 已提交
1493 1494 1495
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1496
        Logging.info("| Task Execution Times (success/total):")
1497
        execTimesAny = 0.0
S
Shuduo Sang 已提交
1498
        for k, n in self._execTimes.items():
1499
            execTimesAny += n[0]
1500 1501 1502 1503 1504 1505 1506
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
1507
            Logging.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1508

1509
        Logging.info(
S
Shuduo Sang 已提交
1510
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
1511
        Logging.info(
S
Shuduo Sang 已提交
1512 1513
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
1514
        Logging.info(
S
Shuduo Sang 已提交
1515 1516
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
1517
        Logging.info(
S
Shuduo Sang 已提交
1518
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
1519
        Logging.info(
S
Shuduo Sang 已提交
1520 1521
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1522 1523 1524
        Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
        Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
1525 1526
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
1527 1528
        Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
        Logging.info(
S
Shuduo Sang 已提交
1529
            "----------------------------------------------------------------------")
1530 1531 1532


class StateTransitionTask(Task):
1533 1534 1535 1536 1537
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1538 1539
    _baseTableNumber = None

1540
    _endState = None # TODO: no longter used?
1541

1542
    @classmethod
S
Shuduo Sang 已提交
1543
    def getInfo(cls):  # each sub class should supply their own information
1544
        raise RuntimeError("Overriding method expected")
1545
    
1546
    @classmethod
S
Shuduo Sang 已提交
1547
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1548 1549
        raise RuntimeError("Overriding method expected")

1550 1551 1552
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1553

1554 1555 1556
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1557 1558

    @classmethod
1559 1560 1561
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1562

1563 1564
    @classmethod
    def getRegTableName(cls, i):
1565
        if ( StateTransitionTask._baseTableNumber is None): # Set it one time
S
Steven Li 已提交
1566
            StateTransitionTask._baseTableNumber = Dice.throw(
1567
                999) if Config.getConfig().dynamic_db_table_names else 0
1568
        return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
1569

1570 1571
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1572 1573


1574
class TaskCreateDb(StateTransitionTask):
1575
    @classmethod
1576
    def getEndState(cls):
S
Shuduo Sang 已提交
1577
        return StateDbOnly()
1578

1579 1580 1581 1582
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1583
    # Actually creating the database(es)
1584
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1585
        # was: self.execWtSql(wt, "create database db")
1586
        repStr = ""
1587
        if Config.getConfig().num_replicas != 1:
1588
            # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
1589
            numReplica = Config.getConfig().num_replicas # fixed, always
1590
            repStr = "replica {}".format(numReplica)
1591
        updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
1592 1593
        dbName = self._db.getName()
        self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
1594
        if dbName == "db_0" and Config.getConfig().use_shadow_db:
1595
            self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
1596

1597
class TaskDropDb(StateTransitionTask):
1598
    @classmethod
1599 1600
    def getEndState(cls):
        return StateEmpty()
1601

1602 1603 1604 1605
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1606
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1607
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
1608
        Logging.debug("[OPS] database dropped at {}".format(time.time()))
1609

1610
class TaskCreateSuperTable(StateTransitionTask):
1611
    @classmethod
1612 1613
    def getEndState(cls):
        return StateSuperTableOnly()
1614

1615 1616
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1617
        return state.canCreateFixedSuperTable()
1618

1619
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1620
        if not self._db.exists(wt.getDbConn()):
1621
            Logging.debug("Skipping task, no DB yet")
1622 1623
            return

1624
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1625
        # wt.execSql("use db")    # should always be in place
S
Steven Li 已提交
1626

1627 1628 1629 1630 1631
        sTable.create(wt.getDbConn(),
                      {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
                          'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
                      dropIfExists=True
                      )
1632
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1633 1634
        # No need to create the regular tables, INSERT will do that
        # automatically
1635

S
Steven Li 已提交
1636

1637
class TdSuperTable:
1638
    def __init__(self, stName, dbName):
1639
        self._stName = stName
1640
        self._dbName = dbName
1641

1642 1643 1644
    def getName(self):
        return self._stName

1645 1646 1647
    def drop(self, dbc, skipCheck = False):
        dbName = self._dbName
        if self.exists(dbc) : # if myself exists
S
Steven Li 已提交
1648 1649 1650 1651 1652 1653
            fullTableName = dbName + '.' + self._stName                
            dbc.execute("DROP TABLE {}".format(fullTableName))
        else:
            if not skipCheck:
                raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))

1654 1655
    def exists(self, dbc):
        dbc.execute("USE " + self._dbName)
S
Steven Li 已提交
1656 1657
        return dbc.existsSuperTable(self._stName)

1658
    # TODO: odd semantic, create() method is usually static?
1659
    def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
1660
        '''Creating a super table'''
1661 1662

        dbName = self._dbName
S
Steven Li 已提交
1663 1664 1665 1666 1667 1668 1669
        dbc.execute("USE " + dbName)
        fullTableName = dbName + '.' + self._stName       
        if dbc.existsSuperTable(self._stName):
            if dropIfExists: 
                dbc.execute("DROP TABLE {}".format(fullTableName))
            else: # error
                raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
1670

S
Steven Li 已提交
1671 1672 1673
        # Now let's create
        sql = "CREATE TABLE {} ({})".format(
            fullTableName,
1674 1675
            ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()]))
        if tags :
S
Steven Li 已提交
1676
            sql += " TAGS ({})".format(
1677 1678 1679 1680
                ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()])
            )            
        else:
            sql += " TAGS (dummy int) "
1681 1682
        dbc.execute(sql)        

1683 1684
    def getRegTables(self, dbc: DbConn):
        dbName = self._dbName
1685
        try:
1686
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1687
        except taos.error.ProgrammingError as err:                    
1688
            errno2 = Helper.convertErrno(err.errno) 
1689
            Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
1690 1691 1692 1693 1694
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1695 1696
    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
1697

1698
    def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
1699
        dbName = self._dbName
1700
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1701 1702
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1703 1704

        # acquire a lock first, so as to be able to *verify*. More details in TD-1471
S
Steven Li 已提交
1705
        fullTableName = dbName + '.' + regTableName      
1706
        if task is not None:  # TODO: what happens if we don't lock the table
S
Steven Li 已提交
1707
            task.lockTable(fullTableName)
1708
        Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
S
Steven Li 已提交
1709
        # print("(" + fullTableName[-3:] + ")", end="", flush=True)  
1710 1711
        try:
            sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
1712
                fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
1713 1714 1715
            )
            dbc.execute(sql)
        finally:
S
Steven Li 已提交
1716 1717
            if task is not None:
                task.unlockTable(fullTableName) # no matter what
1718

1719 1720
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1734 1735
    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
1736 1737 1738 1739 1740 1741
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1742 1743
    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
1744 1745
            return
        # sTable.addTag("extraTag", "int")
1746 1747
        sql = "alter table {}.{} add tag {} {}".format(
            self._dbName, self._stName, tagName, tagType)
1748 1749
        dbc.execute(sql)

1750 1751
    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
1752
            return
1753
        sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
1754 1755
        dbc.execute(sql)

1756 1757
    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
1758 1759 1760 1761
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1762
        sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
1763 1764
        dbc.execute(sql)

1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
    def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
        ''' Generate queries to test/exercise this super table '''
        ret = [] # type: List[SqlQuery]

        for rTbName in self.getRegTables(dbc):  # regular tables
            
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])

            # Run the query against the regular table first
            doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
            if not doAggr: # don't do aggregate query, just simple one
                ret.append(SqlQuery( # reg table
                    "select {} from {}.{}".format('*', self._dbName, rTbName)))
                ret.append(SqlQuery( # super table
                    "select {} from {}.{}".format('*', self._dbName, self.getName())))
            else: # Aggregate query
                aggExpr = Dice.choice([                
                    'count(*)',
                    'avg(speed)',
                    # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                    'sum(speed)', 
                    'stddev(speed)', 
                    # SELECTOR functions
                    'min(speed)', 
                    'max(speed)', 
                    'first(speed)', 
                    'last(speed)',
                    'top(speed, 50)', # TODO: not supported?
                    'bottom(speed, 50)', # TODO: not supported?
                    'apercentile(speed, 10)', # TODO: TD-1316
1797
                    # 'last_row(speed)', # TODO: commented out per TD-3231, we should re-create
1798 1799 1800 1801 1802 1803
                    # Transformation Functions
                    # 'diff(speed)', # TODO: no supported?!
                    'spread(speed)'
                    ]) # TODO: add more from 'top'

            
1804 1805 1806 1807 1808 1809 1810
                # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
                sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
                if Dice.throw(3) == 0: # 1 in X chance
                    sql = sql + ' GROUP BY color'
                    Progress.emit(Progress.QUERY_GROUP_BY)
                    # Logging.info("Executing GROUP-BY query: " + sql)
                ret.append(SqlQuery(sql))
1811 1812 1813

        return ret        

1814
class TaskReadData(StateTransitionTask):
1815
    @classmethod
1816
    def getEndState(cls):
S
Shuduo Sang 已提交
1817
        return None  # meaning doesn't affect state
1818

1819 1820 1821 1822
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1823 1824 1825 1826 1827
    # def _canRestartService(self):
    #     if not gSvcMgr:
    #         return True # always
    #     return gSvcMgr.isActive() # only if it's running TODO: race condition here

1828 1829
    def _reconnectIfNeeded(self, wt):
        # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
1830
        if random.randrange(20)==0: # and self._canRestartService():  # TODO: break connection in all situations
1831 1832
            # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
            Progress.emit(Progress.SERVICE_RECONNECT_START) 
1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843
            try:
                wt.getDbConn().close()
                wt.getDbConn().open()
            except ConnectionError as err: # may fail
                if not gSvcMgr:
                    Logging.error("Failed to reconnect in client-only mode")
                    raise # Not OK if we are running in client-only mode
                if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to 
                    Logging.error("Failed to reconnect when managed server is running")
                    raise # Not OK if we are running normally

1844 1845 1846 1847 1848
                Progress.emit(Progress.SERVICE_RECONNECT_FAILURE) 
                # Logging.info("Ignoring DB reconnect error")

            # print("_r", end="", flush=True)
            Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS) 
1849 1850 1851 1852 1853
            # The above might have taken a lot of time, service might be running
            # by now, causing error below to be incorrectly handled due to timing issue
            return # TODO: fix server restart status race condtion


1854 1855 1856
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        self._reconnectIfNeeded(wt)

1857
        dbc = wt.getDbConn()
1858 1859 1860
        sTable = self._db.getFixedSuperTable()
        
        for q in sTable.generateQueries(dbc):  # regular tables            
1861
            try:
1862 1863 1864 1865
                sql = q.getSql()
                # if 'GROUP BY' in sql:
                #     Logging.info("Executing GROUP-BY query: " + sql)
                dbc.execute(sql)
1866
            except taos.error.ProgrammingError as err:                    
1867
                errno2 = Helper.convertErrno(err.errno)
1868
                Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1869
                raise
S
Shuduo Sang 已提交
1870

1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
class SqlQuery:
    @classmethod
    def buildRandom(cls, db: Database):
        '''Build a random query against a certain database'''
        
        dbName = db.getName()

    def __init__(self, sql:str = None):
        self._sql = sql

    def getSql(self):
        return self._sql
    
1884
class TaskDropSuperTable(StateTransitionTask):
1885
    @classmethod
1886
    def getEndState(cls):
S
Shuduo Sang 已提交
1887
        return StateDbOnly()
1888

1889 1890
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1891
        return state.canDropFixedSuperTable()
1892

1893
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1894
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
1895
        if Dice.throw(2) == 0:
1896
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
1897
            tblSeq = list(range(
1898
                2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
S
Shuduo Sang 已提交
1899 1900
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1901
            isSuccess = True
S
Shuduo Sang 已提交
1902
            for i in tblSeq:
1903
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
1904
                try:
1905 1906
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
1907
                except taos.error.ProgrammingError as err:
1908 1909
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1910
                    if (errno2 in [0x362]):  # mnode invalid table name
1911
                        isSuccess = False
1912
                        Logging.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
1913
                    continue  # try to delete next regular table
1914 1915

                if (not tickOutput):
S
Shuduo Sang 已提交
1916 1917
                    tickOutput = True  # Print only one time
                    if isSuccess:
1918 1919
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1920
                        print("f", end="", flush=True)
1921 1922

        # Drop the super table itself
1923 1924
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
1925

S
Shuduo Sang 已提交
1926

1927 1928 1929
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1930
        return None  # meaning doesn't affect state
1931 1932 1933

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1934
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1935 1936

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1937 1938
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
1939
        sTable = self._db.getFixedSuperTable()
1940
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1941
        if dice == 0:
1942
            sTable.addTag(dbc, "extraTag", "int")
1943
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1944
        elif dice == 1:
1945
            sTable.dropTag(dbc, "extraTag")
1946
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1947
        elif dice == 2:
1948
            sTable.dropTag(dbc, "newTag")
1949
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1950
        else:  # dice == 3
1951
            sTable.changeTag(dbc, "extraTag", "newTag")
1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
1964
        if Config.getConfig().auto_start_service:
1965 1966 1967
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1968
    CHANCE_TO_RESTART_SERVICE = 200
1969
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1970
        if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
1971 1972
            print("_a", end="", flush=True)
            return
1973

1974 1975
        with self._classLock:
            if self._isRunning:
S
Steven Li 已提交
1976
                Logging.info("Skipping restart task, another running already")
1977 1978 1979
                return
            self._isRunning = True

1980
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
1981 1982 1983
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
1984

1985
        self._isRunning = False
S
Shuduo Sang 已提交
1986

1987
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1988 1989
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1990

1991
    # We use these two files to record operations to DB, useful for power-off tests
1992 1993
    fAddLogReady = None # type: Optional[io.TextIOWrapper]
    fAddLogDone  = None # type: Optional[io.TextIOWrapper]
1994 1995 1996

    @classmethod
    def prepToRecordOps(cls):
1997
        if Config.getConfig().record_ops:
S
Shuduo Sang 已提交
1998
            if (cls.fAddLogReady is None):
1999
                Logging.info(
S
Shuduo Sang 已提交
2000
                    "Recording in a file operations to be performed...")
2001
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2002
            if (cls.fAddLogDone is None):
2003
                Logging.info("Recording in a file operations completed...")
2004
                cls.fAddLogDone = open("add_log_done.txt", "w")
2005

2006
    @classmethod
2007 2008
    def getEndState(cls):
        return StateHasData()
2009 2010 2011 2012

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2013

2014
    def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): 
2015
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS        
2016 2017
        fullTableName = db.getName() + '.' + regTableName

2018
        sql = "INSERT INTO {} VALUES ".format(fullTableName)
2019 2020 2021
        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2022 2023
            nextColor = db.getNextColor()
            sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
2024 2025
        dbc.execute(sql)

2026
    def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
2027
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS        
2028 2029 2030 2031

        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2032
            nextColor = db.getNextColor()
2033
            if Config.getConfig().record_ops:
2034
                self.prepToRecordOps()
2035 2036
                if self.fAddLogReady is None:
                    raise CrashGenError("Unexpected empty fAddLogReady")
2037 2038
                self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                self.fAddLogReady.flush()
2039
                os.fsync(self.fAddLogReady.fileno())
2040 2041 2042
                
            # TODO: too ugly trying to lock the table reliably, refactor...
            fullTableName = db.getName() + '.' + regTableName
2043
            if Config.getConfig().verify_data:
2044 2045 2046 2047
                self.lockTable(fullTableName) 
                # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written

            try:
2048
                sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
2049 2050 2051
                    fullTableName,
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2052
                    nextTick, nextInt, nextColor)
2053
                dbc.execute(sql)
2054 2055

                # Quick hack, attach an update statement here. TODO: create an "update" task
2056
                if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
2057 2058 2059 2060 2061 2062 2063 2064 2065
                    nextInt = db.getNextInt()
                    nextColor = db.getNextColor()
                    sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
                    fullTableName,
                    nextTick, nextInt, nextColor)
                    # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
                    #     fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
                    dbc.execute(sql)

2066
            except: # Any exception at all
2067
                if Config.getConfig().verify_data:
2068 2069 2070 2071
                    self.unlockTable(fullTableName)     
                raise

            # Now read it back and verify, we might encounter an error if table is dropped
2072
            if Config.getConfig().verify_data: # only if command line asks for it
2073 2074 2075 2076 2077 2078 2079 2080 2081
                try:
                    readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
                        format(db.getName(), regTableName, nextTick))
                    if readBack != nextInt :
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data, wrote: {}, read: {}"
                            .format(nextInt, readBack), 0x999)
                except taos.error.ProgrammingError as err:
                    errno = Helper.convertErrno(err.errno)
2082
                    if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT]  : # not a single result
2083 2084
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data for tick: {}, wrote: {}, read: {}"
2085
                            .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
                            errno)
                    elif errno in [0x218, 0x362]: # table doesn't exist
                        # do nothing
                        dummy = 0
                    else:
                        # Re-throw otherwise
                        raise
                finally:
                    self.unlockTable(fullTableName) # Unlock the table no matter what

            # Successfully wrote the data into the DB, let's record it somehow
            te.recordDataMark(nextInt)

2099
            if Config.getConfig().record_ops:
2100 2101
                if self.fAddLogDone is None:
                    raise CrashGenError("Unexpected empty fAddLogDone")
2102 2103
                self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                self.fAddLogDone.flush()
2104
                os.fsync(self.fAddLogDone.fileno())
2105

2106
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2107 2108
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2109
        dbc = wt.getDbConn()
2110 2111
        numTables  = self.LARGE_NUMBER_OF_TABLES  if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
2112 2113
        tblSeq = list(range(numTables ))
        random.shuffle(tblSeq) # now we have random sequence
S
Shuduo Sang 已提交
2114 2115
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2116 2117
                # print("x", end="", flush=True) # concurrent insertion
                Progress.emit(Progress.CONCURRENT_INSERTION)
2118
            else:
S
Shuduo Sang 已提交
2119
                self.activeTable.add(i)  # marking it active
2120
            
2121
            dbName = db.getName()
2122
            sTable = db.getFixedSuperTable()
2123
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)            
2124
            fullTableName = dbName + '.' + regTableName
2125
            # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
2126
            sTable.ensureRegTable(self, wt.getDbConn(), regTableName)  # Ensure the table exists           
2127
            # self._unlockTable(fullTableName)
2128
           
2129 2130 2131 2132
            if Dice.throw(1) == 0: # 1 in 2 chance
                self._addData(db, dbc, regTableName, te)
            else:
                self._addDataInBatch(db, dbc, regTableName, te)
2133

S
Shuduo Sang 已提交
2134
            self.activeTable.discard(i)  # not raising an error, unlike remove
2135 2136


2137 2138 2139 2140
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
2141 2142 2143
        for th in threading.enumerate():  
            if th.ident is None:
                continue          
2144 2145 2146 2147
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
2148
        for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
2160
            print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
2161
            stackFrame = 0
2162
            for frame in stack: # was using: reversed(stack)
2163
                # print(frame)
2164 2165
                print("[{sf}] File {filename}, line {lineno}, in {name}".format(
                    sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
2166
                print("    {}".format(frame.line))
2167
                stackFrame += 1
2168
            print("-----> End of Thread Info ----->\n")
S
Shuduo Sang 已提交
2169

2170 2171
class ClientManager:
    def __init__(self):
S
Steven Li 已提交
2172
        Logging.info("Starting service manager")
2173 2174
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2175

2176
        self._status = Status.STATUS_RUNNING
2177 2178
        self.tc = None

2179 2180
        self.inSigHandler = False

2181
    def sigIntHandler(self, signalNumber, frame):
2182
        if self._status != Status.STATUS_RUNNING:
2183 2184 2185
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2186
        self._status = Status.STATUS_STOPPING  # immediately set our status
2187

2188
        print("ClientManager: Terminating program...")
2189 2190
        self.tc.requestToStop()

2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2261

2262
    def run(self, svcMgr):    
2263
        # self._printLastNumbers()
2264
        # global gConfig
2265

2266 2267 2268 2269
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

2270
        cfg = Config.getConfig()
2271 2272
        dbManager = DbManager(cfg.connector_type, tInst.getDbTarget())  # Regular function
        thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
2273
        self.tc = ThreadCoordinator(thPool, dbManager)
2274
        
S
Steven Li 已提交
2275
        Logging.info("Starting client instance: {}".format(tInst))
2276
        self.tc.run()
S
Steven Li 已提交
2277 2278
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2279
        if svcMgr: # gConfig.auto_start_service:
2280
            svcMgr.stopTaosServices()
2281
            svcMgr = None
2282
        
2283 2284

        # Release global variables
2285
        # gConfig = None
2286
        Config.clearConfig()
2287 2288
        gSvcMgr = None
        logger = None
2289 2290 2291 2292
        
        thPool = None
        dbManager.cleanUp() # destructor wouldn't run in time
        dbManager = None
2293

2294 2295 2296 2297 2298 2299
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()
2300 2301 2302 2303 2304 2305 2306 2307 2308
        # Release variables here
        self.tc = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
2309 2310

    def conclude(self):
2311
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
2312
        self.tc.printStats()
2313

2314
class MainExec:
2315 2316
    def __init__(self):        
        self._clientMgr = None
2317
        self._svcMgr = None # type: Optional[ServiceManager]
2318

2319 2320 2321
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2322

2323 2324 2325 2326 2327 2328 2329
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
2330
        if  self._svcMgr:
2331
            self._svcMgr.sigIntHandler(signalNumber, frame)
2332
        if  self._clientMgr:
2333 2334 2335
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2336
        global gSvcMgr
2337
        if Config.getConfig().auto_start_service:
2338 2339
            gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
            gSvcMgr.startTaosServices() # we start, don't run
2340 2341
        
        self._clientMgr = ClientManager()
2342 2343 2344 2345
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
2346
            Logging.warning("Failed to open REST connection to DB: {}".format(err))
2347 2348
            # don't raise
        return ret
2349 2350

    def runService(self):
2351
        global gSvcMgr
2352
        gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
2353

2354 2355
        gSvcMgr.run() # run to some end state
        gSvcMgr = self._svcMgr = None 
2356

2357
    def _buildCmdLineParser(self):
2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawDescriptionHelpFormatter,
            description=textwrap.dedent('''\
                TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
                ---------------------------------------------------------------------
                1. You build TDengine in the top level ./build directory, as described in offical docs
                2. You run the server there before this script: ./build/bin/taosd -c test/cfg

                '''))                      

        parser.add_argument(
            '-a',
            '--auto-start-service',
            action='store_true',
            help='Automatically start/stop the TDengine service (default: false)')
        parser.add_argument(
            '-b',
            '--max-dbs',
            action='store',
            default=0,
            type=int,
            help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
        parser.add_argument(
            '-c',
            '--connector-type',
            action='store',
            default='native',
            type=str,
            help='Connector type to use: native, rest, or mixed (default: 10)')
        parser.add_argument(
            '-d',
            '--debug',
            action='store_true',
            help='Turn on DEBUG mode for more logging (default: false)')
        parser.add_argument(
            '-e',
            '--run-tdengine',
            action='store_true',
            help='Run TDengine service in foreground (default: false)')
2397 2398 2399 2400 2401 2402 2403
        parser.add_argument(
            '-g',
            '--ignore-errors',
            action='store',
            default=None,
            type=str,
            help='Ignore error codes, comma separated, 0x supported (default: None)')
2404 2405
        parser.add_argument(
            '-i',
2406
            '--num-replicas',
2407 2408 2409
            action='store',
            default=1,
            type=int,
2410 2411 2412 2413 2414 2415
            help='Number (fixed) of replicas to use, when testing against clusters. (default: 1)')
        parser.add_argument(
            '-k',
            '--track-memory-leaks',
            action='store_true',
            help='Use Valgrind tool to track memory leaks (default: false)')
2416 2417 2418 2419 2420
        parser.add_argument(
            '-l',
            '--larger-data',
            action='store_true',
            help='Write larger amount of data during write operations (default: false)')
2421 2422 2423 2424 2425
        parser.add_argument(
            '-m',
            '--mix-oos-data',
            action='store_false',
            help='Mix out-of-sequence data into the test data stream (default: true)')
2426 2427 2428 2429
        parser.add_argument(
            '-n',
            '--dynamic-db-table-names',
            action='store_true',
2430
            help='Use non-fixed names for dbs/tables, for -b, useful for multi-instance executions (default: false)')        
2431 2432 2433 2434 2435 2436 2437
        parser.add_argument(
            '-o',
            '--num-dnodes',
            action='store',
            default=1,
            type=int,
            help='Number of Dnodes to initialize, used with -e option. (default: 1)')
2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466
        parser.add_argument(
            '-p',
            '--per-thread-db-connection',
            action='store_true',
            help='Use a single shared db connection (default: false)')
        parser.add_argument(
            '-r',
            '--record-ops',
            action='store_true',
            help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
        parser.add_argument(
            '-s',
            '--max-steps',
            action='store',
            default=1000,
            type=int,
            help='Maximum number of steps to run (default: 100)')
        parser.add_argument(
            '-t',
            '--num-threads',
            action='store',
            default=5,
            type=int,
            help='Number of threads to run (default: 10)')
        parser.add_argument(
            '-v',
            '--verify-data',
            action='store_true',
            help='Verify data written in a number of places by reading back (default: false)')
2467 2468 2469 2470 2471
        parser.add_argument(
            '-w',
            '--use-shadow-db',
            action='store_true',
            help='Use a shaddow database to verify data integrity (default: false)')
2472 2473 2474 2475 2476 2477
        parser.add_argument(
            '-x',
            '--continue-on-exception',
            action='store_true',
            help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')

2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489
        return parser


    def init(self): # TODO: refactor
        global gContainer
        gContainer = Container() # micky-mouse DI

        global gSvcMgr # TODO: refactor away
        gSvcMgr = None

        parser = self._buildCmdLineParser()
        Config.init(parser)
2490 2491

        # Sanity check for arguments
2492
        if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 :
2493
            raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
S
Shuduo Sang 已提交
2494

2495
        Logging.clsInit(Config.getConfig().debug)
2496 2497 2498 2499

        Dice.seed(0)  # initial seeding of dice

    def run(self):
2500
        if Config.getConfig().run_tdengine:  # run server
2501 2502 2503 2504 2505 2506
            try:
                self.runService()
                return 0 # success
            except ConnectionError as err:
                Logging.error("Failed to make DB connection, please check DB instance manually")
            return -1 # failure
2507 2508
        else:
            return self.runClient()
S
Steven Li 已提交
2509

2510

2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
2532