crash_gen_main.py 104.6 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2
###################################################################
3
#           Copyright (c) 2016-2021 by TAOS Technologies, Inc.
S
Steven Li 已提交
4 5 6 7 8 9 10 11 12 13
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
17

18
from typing import Any, Set, Tuple
S
Shuduo Sang 已提交
19 20
from typing import Dict
from typing import List
21
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
22

S
Shuduo Sang 已提交
23 24
import textwrap
import time
25
import datetime
S
Shuduo Sang 已提交
26 27 28
import random
import threading
import argparse
29

S
Steven Li 已提交
30
import sys
31
import os
32
import io
33
import signal
34
import traceback
35
import requests
36
# from guppy import hpy
37
import gc
38 39
import taos

40

41
from .shared.types import TdColumns, TdTags
42

43
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
44 45 46
# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
#     CrashGenError, Progress, MyTDSql, \
#     TdeInstance
47

48 49 50 51 52 53
from .service_manager import ServiceManager, TdeInstance

from .shared.config import Config
from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql
from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress
from .shared.types import TdDataType
54

55
# Config.init()
56

57 58 59 60
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
61
# Global variables, tried to keep a small number.
62 63 64

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
65 66
# gConfig:    argparse.Namespace 
gSvcMgr:    Optional[ServiceManager] # TODO: refactor this hack, use dep injection
67
# logger:     logging.Logger
68
gContainer: Container
S
Steven Li 已提交
69

70 71
# def runThread(wt: WorkerThread):
#     wt.run()
72

73

S
Steven Li 已提交
74
class WorkerThread:
75 76 77 78
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator):
        """
            Note: this runs in the main thread context
        """                 
S
Shuduo Sang 已提交
79
        # self._curStep = -1
80
        self._pool = pool
S
Shuduo Sang 已提交
81 82
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
83
        # self.threadIdent = threading.get_ident()
84 85
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
86
        self._stepGate = threading.Event()
S
Steven Li 已提交
87

88
        # Let us have a DB connection of our own
89
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
90
            # print("connector_type = {}".format(gConfig.connector_type))
91
            tInst = gContainer.defTdeInstance
92
            if Config.getConfig().connector_type == 'native':                
93
                self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
94
            elif Config.getConfig().connector_type == 'rest':
95
                self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
96
            elif Config.getConfig().connector_type == 'mixed':
97
                if Dice.throw(2) == 0: # 1/2 chance
98
                    self._dbConn = DbConn.createNative(tInst.getDbTarget()) 
99
                else:
100
                    self._dbConn = DbConn.createRest(tInst.getDbTarget()) 
101
            else:
102
                raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type))
103

104
        # self._dbInUse = False  # if "use db" was executed already
105

106
    def logDebug(self, msg):
107
        Logging.debug("    TRD[{}] {}".format(self._tid, msg))
108 109

    def logInfo(self, msg):
110
        Logging.info("    TRD[{}] {}".format(self._tid, msg))
111

112 113
    # def dbInUse(self):
    #     return self._dbInUse
114

115 116 117 118
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
119

120
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
121
        return self._tc.getTaskExecutor()
122

S
Steven Li 已提交
123
    def start(self):
124
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
125

S
Shuduo Sang 已提交
126
    def run(self):
S
Steven Li 已提交
127
        # initialization after thread starts, in the thread context
128
        # self.isSleeping = False
129
        Logging.info("Starting to run thread: {}".format(self._tid))
130

131
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
132
            Logging.debug("Worker thread openning database connection")
133
            self._dbConn.open()
S
Steven Li 已提交
134

S
Shuduo Sang 已提交
135 136
        self._doTaskLoop()

137
        # clean up
138
        if (Config.getConfig().per_thread_db_connection):  # type: ignore
139 140 141
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
142
                Logging.warning("Cleaning up worker thread, dbConn already closed")
143

S
Shuduo Sang 已提交
144
    def _doTaskLoop(self):
145 146
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
147 148
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
149 150 151
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
152
                print("_bto", end="")
153
                Logging.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
154 155
                break

156
            Logging.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
157
            self.crossStepGate()   # then per-thread gate, after being tapped
158
            Logging.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
159
            if not self._tc.isRunning():
160
                print("_wts", end="")
161
                Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
162 163
                break

164
            
165
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
166
            try:
167
                if (Config.getConfig().per_thread_db_connection):  # most likely TRUE
168 169
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
170
                # self.useDb() # might encounter exceptions. TODO: catch
171 172
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
173
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
174 175 176 177 178 179
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

180
            # Fetch a task from the Thread Coordinator
181
            Logging.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
182
            task = tc.fetchTask()
183 184

            # Execute such a task
185
            Logging.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
186
                    self._tid, task.__class__.__name__))
187
            task.execute(self)
188
            tc.saveExecutedTask(task)
189
            Logging.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
190

191
            # self._dbInUse = False  # there may be changes between steps
192
        # print("_wtd", end=None) # worker thread died
193

S
Shuduo Sang 已提交
194 195
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
196 197
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
198 199
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
200 201 202
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
203
        if (not self._thread.is_alive()):
S
Steven Li 已提交
204 205
            raise RuntimeError("Unexpected dead thread")

206
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
207 208
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
209 210
        self.verifyThreadSelf()  # only allowed by ourselves

211
        # Wait again at the "gate", waiting to be "tapped"
212
        Logging.debug(
S
Shuduo Sang 已提交
213 214 215
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
216
        self._stepGate.clear()
S
Shuduo Sang 已提交
217

218
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
219

S
Shuduo Sang 已提交
220
    def tapStepGate(self):  # give it a tap, release the thread waiting there
221
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
222 223
        self.verifyThreadMain()  # only allowed for main thread

224
        if self._thread.is_alive():
225
            Logging.debug("[TRD] Tapping worker thread {}".format(self._tid))
226 227 228 229
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
230

S
Shuduo Sang 已提交
231
    def execSql(self, sql):  # TODO: expose DbConn directly
232
        return self.getDbConn().execute(sql)
233

S
Shuduo Sang 已提交
234
    def querySql(self, sql):  # TODO: expose DbConn directly
235
        return self.getDbConn().query(sql)
236 237

    def getQueryResult(self):
238
        return self.getDbConn().getQueryResult()
239

240
    def getDbConn(self) -> DbConn :
241
        if (Config.getConfig().per_thread_db_connection):
S
Shuduo Sang 已提交
242
            return self._dbConn
243
        else:
244
            return self._tc.getDbManager().getDbConn()
245

246 247
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
248
    #         return self._dbConn.query(sql)
249 250
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
251

252
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
253 254


255
class ThreadCoordinator:
256
    WORKER_THREAD_TIMEOUT = 120  # Normal: 120
257

258
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
259
        self._curStep = -1  # first step is 0
260
        self._pool = pool
261
        # self._wd = wd
S
Shuduo Sang 已提交
262
        self._te = None  # prepare for every new step
263
        self._dbManager = dbManager # type: Optional[DbManager] # may be freed
S
Shuduo Sang 已提交
264 265
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
266

S
Shuduo Sang 已提交
267 268
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
269
        self._execStats = ExecutionStats()
270
        self._runStatus = Status.STATUS_RUNNING
271
        self._initDbs()
272
        self._stepStartTime = None  # Track how long it takes to execute each step
S
Steven Li 已提交
273

274
    def getTaskExecutor(self):
275 276
        if self._te is None:
            raise CrashGenError("Unexpected empty TE")
277 278
        return self._te

S
Shuduo Sang 已提交
279
    def getDbManager(self) -> DbManager:
280 281
        if self._dbManager is None:
            raise ChildProcessError("Unexpected empty _dbManager")
282
        return self._dbManager
283

284 285
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
286

287
    def requestToStop(self):
288
        self._runStatus = Status.STATUS_STOPPING
289 290
        self._execStats.registerFailure("User Interruption")

291
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
292
        maxSteps = Config.getConfig().max_steps  # type: ignore
293 294
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
295
        if self._runStatus != Status.STATUS_RUNNING:
296 297 298 299 300
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
301 302
        if workerTimeout:
            return True
303 304 305 306 307 308 309 310 311 312 313 314 315
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
316
        Logging.debug(
317 318 319 320 321 322 323
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

324
        Logging.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
325 326
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
327
        self.tapAllThreads() # release all worker thread from their "gates"
328 329 330 331 332

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
333
        Logging.debug("[TRD] Main thread about to cross the barrier")
334
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
335
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
336
        Logging.debug("[TRD] Main thread finished crossing the barrier")
337 338 339 340

    def _doTransition(self):
        transitionFailed = False
        try:
341 342 343
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
344
                Logging.debug("[STT] starting transitions for DB: {}".format(db.getName()))
345 346 347
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
348
                Logging.debug("[STT] transition ended for DB: {}".format(db.getName()))
349 350

            # Due to limitation (or maybe not) of the TD Python library,
351
            # we cannot share connections across threads
352 353 354 355
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
356
            #         Logging.debug("[DB] use db for all worker threads")
357
            #         t.useDb()
358 359
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
360

361 362
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
363
                Logging.info("DB connection broken, execution failed")
364 365 366 367 368 369
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
370 371
            if isinstance(err, CrashGenError): # our own transition failure
                Logging.info("State transition error")
372
                # TODO: saw an error here once, let's print out stack info for err?
373 374 375
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
376
                self._execStats.registerFailure("State transition error: {}".format(err))
377 378
            else:
                raise
S
Steven Li 已提交
379
        # return transitionFailed # Why did we have this??!!
380 381 382

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
383
        Logging.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
384 385
        return transitionFailed

S
Shuduo Sang 已提交
386
    def run(self):
387
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
388 389

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
390
        self._curStep = -1  # not started yet
391
        
S
Shuduo Sang 已提交
392
        self._execStats.startExec()  # start the stop watch
393 394
        transitionFailed = False
        hasAbortedTask = False
395 396
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
397
            if not Config.getConfig().debug: # print this only if we are not in debug mode    
398 399
                Progress.emit(Progress.STEP_BOUNDARY)            
                # print(".", end="", flush=True)
400 401 402 403 404 405 406 407
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
408
                        
409 410
            try:
                self._syncAtBarrier() # For now just cross the barrier
411
                Progress.emit(Progress.END_THREAD_STEP)
412 413 414 415
                if self._stepStartTime :
                    stepExecTime = time.time() - self._stepStartTime
                    Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
                    DbConnNative.resetTotalRequests() # reset to zero
416 417
            except threading.BrokenBarrierError as err:
                self._execStats.registerFailure("Aborted due to worker thread timeout")
418 419 420 421
                Logging.error("\n")
                Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
                    ThreadCoordinator.WORKER_THREAD_TIMEOUT))
                Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
422 423 424
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
425 426 427 428 429 430

                # Enable below for deadlock debugging, using gdb to attach to process
                # while True:
                #     Logging.error("Deadlock detected")
                #     time.sleep(60.0)

431
                break
432 433

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
434 435
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
436 437
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
438
                Logging.info("Aborted task encountered, exiting test program")
439
                self._execStats.registerFailure("Aborted Task Encountered")
440
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
441

442
            # Ending previous step
443 444 445 446
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
447
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
448
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
449
                Logging.info(errMsg)
450
                traceback.print_exc()
S
Steven Li 已提交
451
                self._execStats.registerFailure(errMsg)
452

453
            # Then we move on to the next step
454
            Progress.emit(Progress.BEGIN_THREAD_STEP)
455
            self._stepStartTime = time.time()
456
            self._releaseAllWorkerThreads(transitionFailed)                    
457

458
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
459
            Logging.debug("Abnormal ending of main thraed")
460
        elif workerTimeout:
461
            Logging.debug("Abnormal ending of main thread, due to worker timeout")
462
        else: # regular ending, workers waiting at "barrier"
463
            Logging.debug("Regular ending, main thread waiting for all worker threads to stop...")
464
            self._syncAtBarrier()
465

466
        self._te = None  # No more executor, time to end
467
        Logging.debug("Main thread tapping all threads one last time...")
468
        self.tapAllThreads()  # Let the threads run one last time
469

470 471
        Logging.debug("\r\n\n--> Main thread ready to finish up...")
        Logging.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
472
        self._pool.joinAll()  # Get all threads to finish
S
Steven Li 已提交
473
        Logging.info(". . . All worker threads finished") # No CR/LF before
474 475
        self._execStats.endExec()

476 477 478 479 480 481
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
482
        self._executedTasks = []
483 484 485 486 487 488
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


489 490
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
491

S
Steven Li 已提交
492 493 494 495 496 497
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
498
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
499
        wakeSeq = []
S
Shuduo Sang 已提交
500 501
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
502 503 504
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
505
        Logging.debug(
S
Shuduo Sang 已提交
506 507
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
508
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
509
        for i in wakeSeq:
S
Shuduo Sang 已提交
510 511 512
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
513

514
    def isRunning(self):
S
Shuduo Sang 已提交
515
        return self._te is not None
516

517 518 519 520
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
521
        if Config.getConfig().max_dbs == 0:
522
            self._dbs.append(Database(0, dbc))
S
Steven Li 已提交
523 524
        else:            
            baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
525 526
                )*333) % 888 if Config.getConfig().dynamic_db_table_names else 0
            for i in range(Config.getConfig().max_dbs):
527
                self._dbs.append(Database(baseDbNumber + i, dbc))
528 529 530

    def pickDatabase(self):
        idxDb = 0
531 532
        if Config.getConfig().max_dbs != 0 :
            idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1
533 534 535
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
536
    def fetchTask(self) -> Task:
537 538 539
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
540
        if (not self.isRunning()):  # no task
541
            raise RuntimeError("Cannot fetch task when not running")
542

S
Shuduo Sang 已提交
543
        # pick a task type for current state
544
        db = self.pickDatabase()
545
        taskType = db.getStateMachine().pickTaskType() # dynamic name of class
546
        return taskType(self._execStats, db)  # create a task from it
547 548

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
549
        self._executedTasks = []  # should be under single thread
550 551 552 553

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
554

555
class ThreadPool:
556
    def __init__(self, numThreads, maxSteps):
557 558 559 560
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
561 562
        self.threadList = []  # type: List[WorkerThread]

563
    # starting to run all the threads, in locking steps
564
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
565 566
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
567
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
568
            workerThread.start()  # start, but should block immediately before step 0
569 570 571

    def joinAll(self):
        for workerThread in self.threadList:
572
            Logging.debug("Joining thread...")
573 574
            workerThread._thread.join()

575
    def cleanup(self):
576
        self.threadList = [] # maybe clean up each?
577

578 579
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
580 581


S
Steven Li 已提交
582 583
class LinearQueue():
    def __init__(self):
584
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
585
        self.lastIndex = 0
S
Shuduo Sang 已提交
586 587
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
588

589
    def toText(self):
S
Shuduo Sang 已提交
590 591
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
592 593

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
594
    def push(self):
595
        with self._lock:
S
Shuduo Sang 已提交
596 597
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
598
            #     return self.firstIndex
599 600
            # Otherwise we have something
            self.lastIndex += 1
601 602
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
603
            return self.lastIndex
S
Steven Li 已提交
604 605

    def pop(self):
606
        with self._lock:
S
Shuduo Sang 已提交
607 608 609 610
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

611
            index = self.firstIndex
S
Shuduo Sang 已提交
612
            if (index in self.inUse):
613 614
                return False

615 616 617 618 619 620 621
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
622
        with self._lock:
623 624 625 626
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
627
    def allocate(self, i):
628
        with self._lock:
629
            # Logging.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
630 631 632
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
633 634
            self.inUse.add(i)

S
Steven Li 已提交
635
    def release(self, i):
636
        with self._lock:
637
            # Logging.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
638
            self.inUse.remove(i)  # KeyError possible, TODO: why?
639 640 641 642

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
643
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
644
        if (self.isEmpty()):
645 646
            return None
        with self._lock:
S
Shuduo Sang 已提交
647
            cnt = 0  # counting the interations
648 649
            while True:
                cnt += 1
S
Shuduo Sang 已提交
650
                if (cnt > self.size() * 10):  # 10x iteration already
651 652
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
653 654
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
655 656 657
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
658

659
class AnyState:
S
Shuduo Sang 已提交
660 661 662
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
663
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
664
    STATE_HAS_DATA = 3  # we have some data in the table
665 666 667 668
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
669 670 671
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
672 673
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
674 675 676 677 678 679 680
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
681 682
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
683

684 685
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
686
    def getInfo(self) -> List[Any]:
687 688
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
689 690 691 692 693 694
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
695 696 697
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
698

699 700 701
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
702 703 704
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

705 706
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
707

708 709
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
710

711
    def canDropDb(self):
712 713
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
714
        if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db : 
715
            return False
716
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
717

718 719
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
720

721
    def canDropFixedSuperTable(self):
722
        if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
723
            return False
724
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
725

726 727
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
728

729 730 731 732 733
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
734
        for task in tasks:
735 736 737
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
738
                # task.logDebug("Task success found")
739
                sCnt += 1
S
Shuduo Sang 已提交
740
                if (sCnt >= 2):
741
                    raise CrashGenError(
S
Shuduo Sang 已提交
742
                        "Unexpected more than 1 success with task: {}".format(cls))
743 744 745 746

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
747
        for task in tasks:
748 749
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
750
            exists = True  # we have a valid instance
751 752
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
753
        if (exists and sCnt <= 0):
754
            raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
S
Steven Li 已提交
755
                .format(cls, tasks))
756 757

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
758
        for task in tasks:
759
            if isinstance(task, cls):
S
Shuduo Sang 已提交
760 761
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
762 763

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
764
        for task in tasks:
765 766
            if isinstance(task, cls):
                if task.isSuccess():
767
                    raise CrashGenError(
S
Shuduo Sang 已提交
768
                        "Unexpected successful task: {}".format(cls))
769 770

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
771
        for task in tasks:
772 773 774 775 776 777
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
778
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
779
        for task in tasks:
S
Steven Li 已提交
780 781 782 783
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
784

785 786 787 788
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
789 790 791
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
792 793 794 795
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
796

797 798 799 800
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
801 802 803
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
804 805
        ]

S
Shuduo Sang 已提交
806 807
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
808
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
809 810 811 812
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

813 814 815 816 817 818 819 820 821 822 823

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
824 825 826
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
827 828 829 830 831

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
832

S
Shuduo Sang 已提交
833

834
class StateSuperTableOnly(AnyState):
835 836 837 838 839 840 841 842 843
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
844
        if (self.hasSuccess(tasks, TaskDropSuperTable)
845
                ):  # we are able to drop the table
846
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
847 848
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
849

850
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
851 852
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
853
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
854 855 856
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
857
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
858 859 860
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
861

S
Shuduo Sang 已提交
862

863 864 865 866 867 868 869 870 871 872
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
873
        if (newState.equals(AnyState.STATE_EMPTY)):
874
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
875 876 877 878
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
879
                ):  # without a create_db task
S
Shuduo Sang 已提交
880 881
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
882
            self.hasSuccess(tasks, TaskDropSuperTable)
883
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
884 885 886 887
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
888
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
889 890
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
891
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
892 893 894
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
895
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
896 897
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
898
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
899

S
Shuduo Sang 已提交
900

901
class StateMechine:
902 903 904
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
905
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
906

907
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
908 909 910 911 912 913
        try:
            self._curState = self._findCurrentState(dbc)  # starting state
        except taos.error.ProgrammingError as err:            
            Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
            traceback.print_stack()
            raise # re-throw
914 915

    # TODO: seems no lnoger used, remove?
916 917 918
    def getCurrentState(self):
        return self._curState

919 920 921
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

922
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
923
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
924
        def typesToStrings(types) -> List:
925 926 927 928 929
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
930
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
931 932
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
933
            # t = tc(self) # create task object
934 935
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
936 937 938 939 940 941 942 943
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
944
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
945
                    taskTypes.append(tc)  # gather it
946 947

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
948 949 950
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
951
        Logging.debug(
S
Shuduo Sang 已提交
952 953 954
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
955 956
        return taskTypes

957
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
958
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
959 960
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
961
            Logging.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
962
            return StateEmpty()
S
Shuduo Sang 已提交
963 964
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
965
        dbc.use(dbName)
966
        if not dbc.hasTables():  # no tables
967
            Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
968
            return StateDbOnly()
969

970
        # For sure we have tables, which means we must have the super table. # TODO: are we sure?
971
        sTable = self._db.getFixedSuperTable()
972
        if sTable.hasRegTables(dbc):  # no regular tables
973
            Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
974
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
975
        else:  # has actual tables
976
            Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
977 978
            return StateHasData()

979 980
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
981 982
        global gSvcMgr
        
S
Shuduo Sang 已提交
983
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
984
            Logging.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
985
            return  # do nothing
986

S
Shuduo Sang 已提交
987
        # this should show up in the server log, separating steps
988
        dbc.execute("show dnodes")
989 990 991 992

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
993 994
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
995 996

        if self._curState.canDropDb():
997
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
998
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
999 1000
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1001 1002 1003

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1004 1005
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1006 1007 1008

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1009 1010
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1011 1012

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1013 1014
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1015 1016 1017 1018

        # if self._state.canReadData():
            # Nothing for sure

1019
        newState = self._findCurrentState(dbc)
1020
        Logging.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1021 1022
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1023 1024 1025
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1026 1027
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1028 1029 1030
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1031 1032 1033
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1034
            else:
S
Shuduo Sang 已提交
1035 1036
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1037
        i = self._weighted_choice_sub(weights)
1038
        # Logging.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1039 1040
        return taskTypes[i]

S
Shuduo Sang 已提交
1041 1042
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
1043
    def _weighted_choice_sub(self, weights) -> int:
S
Shuduo Sang 已提交
1044 1045
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1046 1047 1048 1049
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1050
        raise CrashGenError("Unexpected no choice")
1051

1052 1053 1054 1055 1056
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
1057 1058

        TODO: consider moving, but keep in mind it contains "StateMachine"
1059
    '''
1060 1061
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
1062 1063
    _lastTick = None # Optional[datetime]
    _lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions
1064

1065 1066 1067 1068
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1069
          
1070
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1071

1072 1073
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1074

1075 1076
    def getDbNum(self):
        return self._dbNum
1077

1078 1079
    def getName(self):
        return "db_{}".format(self._dbNum)
1080

1081 1082 1083 1084 1085 1086
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1087

1088 1089 1090 1091 1092
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1093

1094 1095 1096 1097
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

1098 1099
    def getFixedSuperTable(self) -> TdSuperTable:
        return TdSuperTable(self.getFixedSuperTableName(), self.getName())
1100 1101 1102 1103 1104 1105

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1106
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1107 1108
    @classmethod
    def setupLastTick(cls):
1109
        t1 = datetime.datetime(2020, 6, 1)
1110
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1111 1112 1113 1114
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1115
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1116 1117 1118
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1119
        Logging.debug("Setting up TICKS to start from: {}".format(t4))
1120 1121
        return t4

1122
    @classmethod
1123 1124 1125 1126
    def getNextTick(cls):       
        '''
            Fetch a timestamp tick, with some random factor, may not be unique.
        ''' 
1127
        with cls._clsLock:  # prevent duplicate tick
1128
            if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized
1129
                # 10k at 1/20 chance, should be enough to avoid overlaps
S
Steven Li 已提交
1130 1131
                tick = cls.setupLastTick()
                cls._lastTick = tick
1132
                cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2)  # lagging behind 2 minutes, should catch up fast
S
Steven Li 已提交
1133
                # if : # should be quite a bit into the future
1134

1135
            if Config.isSet('mix_oos_data') and Dice.throw(20) == 0:  # if asked to do so, and 1 in 20 chance, return lagging tick
1136
                cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
1137
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1138 1139
            else:  # regular
                # add one second to it
1140 1141
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1142 1143

    def getNextInt(self):
1144 1145 1146
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1147 1148

    def getNextBinary(self):
S
Shuduo Sang 已提交
1149 1150
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1151 1152

    def getNextFloat(self):
1153 1154 1155
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1156

1157 1158 1159 1160 1161
    ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']

    def getNextColor(self):
        return random.choice(self.ALL_COLORS)

1162

1163
class TaskExecutor():
1164
    class BoundedList:
S
Shuduo Sang 已提交
1165
        def __init__(self, size=10):
1166 1167
            self._size = size
            self._list = []
S
Steven Li 已提交
1168
            self._lock = threading.Lock()
1169

S
Shuduo Sang 已提交
1170
        def add(self, n: int):
S
Steven Li 已提交
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1197 1198 1199 1200 1201 1202

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1203 1204 1205
    def __init__(self, curStep):
        self._curStep = curStep

1206 1207 1208 1209
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1210 1211 1212
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1213
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1214
        task.execute(wt)
1215

1216 1217 1218 1219
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1220
    # def logInfo(self, msg):
1221
    #     Logging.info("    T[{}.x]: ".format(self._curStep) + msg)
1222

1223
    # def logDebug(self, msg):
1224
    #     Logging.debug("    T[{}.x]: ".format(self._curStep) + msg)
1225

S
Shuduo Sang 已提交
1226

S
Steven Li 已提交
1227
class Task():
1228 1229 1230 1231
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1232
    taskSn = 100
1233 1234
    _lock = threading.Lock()
    _tableLocks: Dict[str, threading.Lock] = {}
1235 1236 1237

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1238
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
1239
        # Logging.debug("Allocating taskSN: {}".format(Task.taskSn))
S
Steven Li 已提交
1240
        return Task.taskSn
1241

1242
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1243
        self._workerThread = None
1244
        self._err: Optional[Exception] = None
1245
        self._aborted = False
1246
        self._curStep = None
S
Shuduo Sang 已提交
1247
        self._numRows = None  # Number of rows affected
1248

S
Shuduo Sang 已提交
1249
        # Assign an incremental task serial number
1250
        self._taskNum = self.allocTaskNum()
1251
        # Logging.debug("Creating new task {}...".format(self._taskNum))
1252

1253
        self._execStats = execStats
1254
        self._db = db # A task is always associated/for a specific DB
1255

1256 1257
        

1258
    def isSuccess(self):
S
Shuduo Sang 已提交
1259
        return self._err is None
1260

1261 1262 1263
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1264
    def clone(self):  # TODO: why do we need this again?
1265
        newTask = self.__class__(self._execStats, self._db)
1266 1267
        return newTask

1268 1269 1270
    def getDb(self):
        return self._db

1271
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1272 1273 1274
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1275 1276

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1277 1278 1279
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1280

1281
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1282 1283 1284
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1285

1286 1287 1288 1289 1290
    def _isServiceStable(self):
        if not gSvcMgr:
            return True  # we don't run service, so let's assume it's stable
        return gSvcMgr.isStable() # otherwise let's examine the service

1291 1292 1293
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1294
                0x0B,  # Unable to establish connection, more details in TD-1648
1295
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1296
                0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
1297
                0x213, # "Disconnected from service", result of "kill connection ???"
1298
                0x217, # "db not selected", client side defined error code
1299 1300 1301 1302
                # 0x218, # "Table does not exist" client side defined error code
                0x360, # Table already exists
                0x362, 
                # 0x369, # tag already exists
1303 1304 1305 1306 1307 1308 1309
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1310
                0x14,   # db not ready, errno changed
1311
                0x600,  # Invalid table ID, why?
1312
                0x218,  # Table does not exist
1313 1314 1315
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1316
        # This case handled below already.
1317
        # elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
1318
        #     return True # We may get "network unavilable" when restarting service
1319 1320
        elif Config.getConfig().ignore_errors: # something is specified on command line
            moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')]
1321 1322
            if errno in moreErrnos:
                return True
1323 1324 1325
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1326 1327 1328 1329
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1330
        elif not self._isServiceStable(): # We are managing service, and ...
1331
            Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1332
            return True
1333 1334 1335 1336
        
        return False # Not an acceptable error


1337 1338
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1339
        self._workerThread = wt  # type: ignore
1340 1341

        te = wt.getTaskExecutor()
1342
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1343 1344
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1345

1346
        self._err = None # TODO: type hint mess up?
1347 1348
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1349 1350 1351

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1352
        try:
S
Shuduo Sang 已提交
1353
            self._executeInternal(te, wt)  # TODO: no return value?
1354
        except taos.error.ProgrammingError as err:
1355
            errno2 = Helper.convertErrno(err.errno)
1356
            if (Config.getConfig().continue_on_exception):  # user choose to continue
1357
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1358
                        errno2, err, wt.getDbConn().getLastSql()))
1359
                self._err = err
1360 1361
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1362
                        errno2, err, wt.getDbConn().getLastSql()))
1363 1364
                # print("_", end="", flush=True)
                Progress.emit(Progress.ACCEPTABLE_ERROR)
S
Shuduo Sang 已提交
1365
                self._err = err
1366
            else: # not an acceptable error
1367 1368
                shortTid = threading.get_ident() % 10000
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, thread={}, msg: {}, SQL: {}".format(
1369
                    self.__class__.__name__,
1370 1371 1372
                    errno2, 
                    shortTid,
                    err, wt.getDbConn().getLastSql())
1373
                self.logDebug(errMsg)
1374
                if Config.getConfig().debug:
1375 1376
                    # raise # so that we see full stack
                    traceback.print_exc()
1377 1378
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1379 1380 1381 1382
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1383
        except Exception as e:
S
Steven Li 已提交
1384
            Logging.info("Non-TAOS exception encountered with: {}".format(self.__class__.__name__))
S
Shuduo Sang 已提交
1385
            self._err = e
S
Steven Li 已提交
1386
            self._aborted = True
1387
            traceback.print_exc()
1388
        except BaseException as e2:
1389
            self.logInfo("Python base exception encountered")
1390
            # self._err = e2 # Exception/BaseException incompatible!
1391
            self._aborted = True
S
Steven Li 已提交
1392
            traceback.print_exc()
1393 1394
        # except BaseException: # TODO: what is this again??!!
        #     raise RuntimeError("Punt")
1395 1396 1397 1398
            # self.logDebug(
            #     "[=] Unexpected exception, SQL: {}".format(
            #         wt.getDbConn().getLastSql()))
            # raise
1399
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1400 1401 1402 1403

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1404
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1405

1406
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1407
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1408
        """ Haha """
1409 1410
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1411
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1412 1413
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1414
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1415 1416
        return wt.getQueryResult()

1417 1418
    def lockTable(self, ftName): # full table name
        # print(" <<" + ftName + '_', end="", flush=True)
1419 1420
        with Task._lock: # SHORT lock! so we only protect lock creation
            if not ftName in Task._tableLocks: # Create new lock and add to list, if needed
1421 1422
                Task._tableLocks[ftName] = threading.Lock()
        
1423 1424 1425 1426 1427
        # No lock protection, anybody can do this any time
        lock = Task._tableLocks[ftName]
        # Logging.info("Acquiring lock: {}, {}".format(ftName, lock))
        lock.acquire()
        # Logging.info("Acquiring lock successful: {}".format(lock))
1428 1429 1430

    def unlockTable(self, ftName):
        # print('_' + ftName + ">> ", end="", flush=True)
1431
        with Task._lock: 
1432 1433 1434 1435 1436
            if not ftName in self._tableLocks:
                raise RuntimeError("Corrupt state, no such lock")
            lock = Task._tableLocks[ftName]
            if not lock.locked():
                raise RuntimeError("Corrupte state, already unlocked")
1437 1438 1439 1440 1441 1442 1443

            # Important note, we want to protect unlocking under the task level
            # locking, because we don't want the lock to be deleted (maybe in the futur)
            # while we unlock it
            # Logging.info("Releasing lock: {}".format(lock))
            lock.release()
            # Logging.info("Releasing lock successful: {}".format(lock))
1444

1445

1446
class ExecutionStats:
1447
    def __init__(self):
S
Shuduo Sang 已提交
1448
        # total/success times for a task
1449
        self._execTimes: Dict[str, List[int]] = {}
1450 1451
        self._tasksInProgress = 0
        self._lock = threading.Lock()
1452 1453
        self._firstTaskStartTime = 0.0
        self._execStartTime = 0.0
1454
        self._errors = {}
S
Shuduo Sang 已提交
1455 1456
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1457

1458 1459 1460
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1461
    def __str__(self):
S
Shuduo Sang 已提交
1462 1463
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1464 1465

    def isFailed(self):
S
Shuduo Sang 已提交
1466
        return self._failed
S
Steven Li 已提交
1467

1468 1469 1470 1471 1472 1473
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1474
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1475 1476
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1477 1478
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1479
        if isSuccess:
S
Shuduo Sang 已提交
1480
            t[1] += 1  # index 1 has the "success" execution times
1481 1482 1483 1484 1485
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1486 1487 1488

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1489 1490
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1491 1492 1493 1494 1495
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1496
            if self._tasksInProgress == 0:  # all tasks have stopped
1497
                self._accRunTime += (time.time() - self._firstTaskStartTime)
1498
                self._firstTaskStartTime = 0.0
1499

1500 1501 1502 1503
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1504
    def printStats(self):
1505
        Logging.info(
S
Shuduo Sang 已提交
1506
            "----------------------------------------------------------------------")
1507
        Logging.info(
S
Shuduo Sang 已提交
1508 1509 1510
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1511
        Logging.info("| Task Execution Times (success/total):")
1512
        execTimesAny = 0.0
S
Shuduo Sang 已提交
1513
        for k, n in self._execTimes.items():
1514
            execTimesAny += n[0]
1515 1516 1517 1518 1519 1520 1521
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
1522
            Logging.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1523

1524
        Logging.info(
S
Shuduo Sang 已提交
1525
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
1526
        Logging.info(
S
Shuduo Sang 已提交
1527 1528
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
1529
        Logging.info(
S
Shuduo Sang 已提交
1530 1531
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
1532
        Logging.info(
S
Shuduo Sang 已提交
1533
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
1534
        Logging.info(
S
Shuduo Sang 已提交
1535 1536
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1537 1538 1539
        Logging.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        Logging.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
        Logging.info("| Longest native query time: {:.3f} seconds, started: {}".
1540 1541
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
1542 1543
        Logging.info("| Longest native query: {}".format(MyTDSql.longestQuery))
        Logging.info(
S
Shuduo Sang 已提交
1544
            "----------------------------------------------------------------------")
1545 1546 1547


class StateTransitionTask(Task):
1548 1549 1550 1551 1552
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1553 1554
    _baseTableNumber = None

1555
    _endState = None # TODO: no longter used?
1556

1557
    @classmethod
S
Shuduo Sang 已提交
1558
    def getInfo(cls):  # each sub class should supply their own information
1559
        raise RuntimeError("Overriding method expected")
1560
    
1561
    @classmethod
S
Shuduo Sang 已提交
1562
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1563 1564
        raise RuntimeError("Overriding method expected")

1565 1566 1567
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1568

1569 1570 1571
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1572 1573

    @classmethod
1574 1575 1576
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1577

1578 1579
    @classmethod
    def getRegTableName(cls, i):
1580
        if ( StateTransitionTask._baseTableNumber is None): # Set it one time
S
Steven Li 已提交
1581
            StateTransitionTask._baseTableNumber = Dice.throw(
1582
                999) if Config.getConfig().dynamic_db_table_names else 0
1583
        return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
1584

1585 1586
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1587 1588


1589
class TaskCreateDb(StateTransitionTask):
1590
    @classmethod
1591
    def getEndState(cls):
S
Shuduo Sang 已提交
1592
        return StateDbOnly()
1593

1594 1595 1596 1597
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1598
    # Actually creating the database(es)
1599
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1600
        # was: self.execWtSql(wt, "create database db")
1601
        repStr = ""
1602
        if Config.getConfig().num_replicas != 1:
1603
            # numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
1604
            numReplica = Config.getConfig().num_replicas # fixed, always
1605
            repStr = "replica {}".format(numReplica)
1606
        updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
1607 1608
        dbName = self._db.getName()
        self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
1609
        if dbName == "db_0" and Config.getConfig().use_shadow_db:
1610
            self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
1611

1612
class TaskDropDb(StateTransitionTask):
1613
    @classmethod
1614 1615
    def getEndState(cls):
        return StateEmpty()
1616

1617 1618 1619 1620
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1621
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1622
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
1623
        Logging.debug("[OPS] database dropped at {}".format(time.time()))
1624

1625
class TaskCreateSuperTable(StateTransitionTask):
1626
    @classmethod
1627 1628
    def getEndState(cls):
        return StateSuperTableOnly()
1629

1630 1631
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1632
        return state.canCreateFixedSuperTable()
1633

1634
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1635
        if not self._db.exists(wt.getDbConn()):
1636
            Logging.debug("Skipping task, no DB yet")
1637 1638
            return

1639
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1640
        # wt.execSql("use db")    # should always be in place
S
Steven Li 已提交
1641

1642 1643 1644 1645 1646
        sTable.create(wt.getDbConn(),
                      {'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
                          'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
                      dropIfExists=True
                      )
1647
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1648 1649
        # No need to create the regular tables, INSERT will do that
        # automatically
1650

S
Steven Li 已提交
1651

1652
class TdSuperTable:
1653
    def __init__(self, stName, dbName):
1654
        self._stName = stName
1655
        self._dbName = dbName
1656

1657 1658 1659
    def getName(self):
        return self._stName

1660 1661 1662
    def drop(self, dbc, skipCheck = False):
        dbName = self._dbName
        if self.exists(dbc) : # if myself exists
S
Steven Li 已提交
1663 1664 1665 1666 1667 1668
            fullTableName = dbName + '.' + self._stName                
            dbc.execute("DROP TABLE {}".format(fullTableName))
        else:
            if not skipCheck:
                raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))

1669 1670
    def exists(self, dbc):
        dbc.execute("USE " + self._dbName)
S
Steven Li 已提交
1671 1672
        return dbc.existsSuperTable(self._stName)

1673
    # TODO: odd semantic, create() method is usually static?
1674
    def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
1675
        '''Creating a super table'''
1676 1677

        dbName = self._dbName
S
Steven Li 已提交
1678 1679 1680 1681 1682 1683 1684
        dbc.execute("USE " + dbName)
        fullTableName = dbName + '.' + self._stName       
        if dbc.existsSuperTable(self._stName):
            if dropIfExists: 
                dbc.execute("DROP TABLE {}".format(fullTableName))
            else: # error
                raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
1685

S
Steven Li 已提交
1686 1687 1688
        # Now let's create
        sql = "CREATE TABLE {} ({})".format(
            fullTableName,
1689 1690
            ",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()]))
        if tags :
S
Steven Li 已提交
1691
            sql += " TAGS ({})".format(
1692 1693 1694 1695
                ",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()])
            )            
        else:
            sql += " TAGS (dummy int) "
1696 1697
        dbc.execute(sql)        

1698 1699
    def getRegTables(self, dbc: DbConn):
        dbName = self._dbName
1700
        try:
1701
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1702
        except taos.error.ProgrammingError as err:                    
1703
            errno2 = Helper.convertErrno(err.errno) 
1704
            Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
1705 1706 1707 1708 1709
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1710 1711
    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
1712

1713
    def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
1714 1715 1716 1717 1718
        '''
        Make sure a regular table exists for this super table, creating it if necessary.
        If there is an associated "Task" that wants to do this, "lock" this table so that
        others don't access it while we create it.
        '''
1719
        dbName = self._dbName
1720
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1721 1722
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1723 1724

        # acquire a lock first, so as to be able to *verify*. More details in TD-1471
S
Steven Li 已提交
1725
        fullTableName = dbName + '.' + regTableName      
1726 1727 1728 1729
        if task is not None:  # Somethime thie operation is requested on behalf of a "task"
            # Logging.info("Locking table for creation: {}".format(fullTableName))
            task.lockTable(fullTableName) # in which case we'll lock this table to ensure serialized access
            # Logging.info("Table locked for creation".format(fullTableName))
1730
        Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
S
Steven Li 已提交
1731
        # print("(" + fullTableName[-3:] + ")", end="", flush=True)  
1732 1733
        try:
            sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
1734
                fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
1735
            )
1736
            # Logging.info("Creating regular with SQL: {}".format(sql))            
1737
            dbc.execute(sql)
1738
            # Logging.info("Regular table created: {}".format(sql))
1739
        finally:
S
Steven Li 已提交
1740
            if task is not None:
1741
                # Logging.info("Unlocking table after creation: {}".format(fullTableName))
S
Steven Li 已提交
1742
                task.unlockTable(fullTableName) # no matter what
1743
                # Logging.info("Table unlocked after creation: {}".format(fullTableName))
1744

1745 1746
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1760 1761
    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
1762 1763 1764 1765 1766 1767
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1768 1769
    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
1770 1771
            return
        # sTable.addTag("extraTag", "int")
1772 1773
        sql = "alter table {}.{} add tag {} {}".format(
            self._dbName, self._stName, tagName, tagType)
1774 1775
        dbc.execute(sql)

1776 1777
    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
1778
            return
1779
        sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
1780 1781
        dbc.execute(sql)

1782 1783
    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
1784 1785 1786 1787
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1788
        sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
1789 1790
        dbc.execute(sql)

1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
    def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
        ''' Generate queries to test/exercise this super table '''
        ret = [] # type: List[SqlQuery]

        for rTbName in self.getRegTables(dbc):  # regular tables
            
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])

            # Run the query against the regular table first
            doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
            if not doAggr: # don't do aggregate query, just simple one
                ret.append(SqlQuery( # reg table
                    "select {} from {}.{}".format('*', self._dbName, rTbName)))
                ret.append(SqlQuery( # super table
                    "select {} from {}.{}".format('*', self._dbName, self.getName())))
            else: # Aggregate query
                aggExpr = Dice.choice([                
                    'count(*)',
                    'avg(speed)',
                    # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                    'sum(speed)', 
                    'stddev(speed)', 
                    # SELECTOR functions
                    'min(speed)', 
                    'max(speed)', 
                    'first(speed)', 
                    'last(speed)',
                    'top(speed, 50)', # TODO: not supported?
                    'bottom(speed, 50)', # TODO: not supported?
                    'apercentile(speed, 10)', # TODO: TD-1316
1823
                    # 'last_row(speed)', # TODO: commented out per TD-3231, we should re-create
1824 1825 1826 1827 1828 1829
                    # Transformation Functions
                    # 'diff(speed)', # TODO: no supported?!
                    'spread(speed)'
                    ]) # TODO: add more from 'top'

            
1830 1831 1832 1833 1834 1835 1836
                # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
                sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
                if Dice.throw(3) == 0: # 1 in X chance
                    sql = sql + ' GROUP BY color'
                    Progress.emit(Progress.QUERY_GROUP_BY)
                    # Logging.info("Executing GROUP-BY query: " + sql)
                ret.append(SqlQuery(sql))
1837 1838 1839

        return ret        

1840
class TaskReadData(StateTransitionTask):
1841
    @classmethod
1842
    def getEndState(cls):
S
Shuduo Sang 已提交
1843
        return None  # meaning doesn't affect state
1844

1845 1846 1847 1848
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1849 1850 1851 1852 1853
    # def _canRestartService(self):
    #     if not gSvcMgr:
    #         return True # always
    #     return gSvcMgr.isActive() # only if it's running TODO: race condition here

1854 1855
    def _reconnectIfNeeded(self, wt):
        # 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
1856
        if random.randrange(20)==0: # and self._canRestartService():  # TODO: break connection in all situations
1857 1858
            # Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
            Progress.emit(Progress.SERVICE_RECONNECT_START) 
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869
            try:
                wt.getDbConn().close()
                wt.getDbConn().open()
            except ConnectionError as err: # may fail
                if not gSvcMgr:
                    Logging.error("Failed to reconnect in client-only mode")
                    raise # Not OK if we are running in client-only mode
                if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to 
                    Logging.error("Failed to reconnect when managed server is running")
                    raise # Not OK if we are running normally

1870 1871 1872 1873 1874
                Progress.emit(Progress.SERVICE_RECONNECT_FAILURE) 
                # Logging.info("Ignoring DB reconnect error")

            # print("_r", end="", flush=True)
            Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS) 
1875 1876 1877 1878 1879
            # The above might have taken a lot of time, service might be running
            # by now, causing error below to be incorrectly handled due to timing issue
            return # TODO: fix server restart status race condtion


1880 1881 1882
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        self._reconnectIfNeeded(wt)

1883
        dbc = wt.getDbConn()
1884 1885 1886
        sTable = self._db.getFixedSuperTable()
        
        for q in sTable.generateQueries(dbc):  # regular tables            
1887
            try:
1888 1889 1890 1891
                sql = q.getSql()
                # if 'GROUP BY' in sql:
                #     Logging.info("Executing GROUP-BY query: " + sql)
                dbc.execute(sql)
1892
            except taos.error.ProgrammingError as err:                    
1893
                errno2 = Helper.convertErrno(err.errno)
1894
                Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1895
                raise
S
Shuduo Sang 已提交
1896

1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909
class SqlQuery:
    @classmethod
    def buildRandom(cls, db: Database):
        '''Build a random query against a certain database'''
        
        dbName = db.getName()

    def __init__(self, sql:str = None):
        self._sql = sql

    def getSql(self):
        return self._sql
    
1910
class TaskDropSuperTable(StateTransitionTask):
1911
    @classmethod
1912
    def getEndState(cls):
S
Shuduo Sang 已提交
1913
        return StateDbOnly()
1914

1915 1916
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1917
        return state.canDropFixedSuperTable()
1918

1919
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1920
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
1921
        if Dice.throw(2) == 0:
1922
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
1923
            tblSeq = list(range(
1924
                2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
S
Shuduo Sang 已提交
1925 1926
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1927
            isSuccess = True
S
Shuduo Sang 已提交
1928
            for i in tblSeq:
1929
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
1930
                try:
1931 1932
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
1933
                except taos.error.ProgrammingError as err:
1934 1935
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1936
                    if (errno2 in [0x362]):  # mnode invalid table name
1937
                        isSuccess = False
1938
                        Logging.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
1939
                    continue  # try to delete next regular table
1940 1941

                if (not tickOutput):
S
Shuduo Sang 已提交
1942 1943
                    tickOutput = True  # Print only one time
                    if isSuccess:
1944 1945
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1946
                        print("f", end="", flush=True)
1947 1948

        # Drop the super table itself
1949 1950
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
1951

S
Shuduo Sang 已提交
1952

1953 1954 1955
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1956
        return None  # meaning doesn't affect state
1957 1958 1959

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1960
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1961 1962

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1963 1964
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
1965
        sTable = self._db.getFixedSuperTable()
1966
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1967
        if dice == 0:
1968
            sTable.addTag(dbc, "extraTag", "int")
1969
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1970
        elif dice == 1:
1971
            sTable.dropTag(dbc, "extraTag")
1972
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1973
        elif dice == 2:
1974
            sTable.dropTag(dbc, "newTag")
1975
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1976
        else:  # dice == 3
1977
            sTable.changeTag(dbc, "extraTag", "newTag")
1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
1990
        if Config.getConfig().auto_start_service:
1991 1992 1993
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1994
    CHANCE_TO_RESTART_SERVICE = 200
1995
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1996
        if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
1997 1998
            print("_a", end="", flush=True)
            return
1999

2000 2001
        with self._classLock:
            if self._isRunning:
S
Steven Li 已提交
2002
                Logging.info("Skipping restart task, another running already")
2003 2004 2005
                return
            self._isRunning = True

2006
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
2007 2008 2009
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
2010

2011
        self._isRunning = False
S
Shuduo Sang 已提交
2012

2013
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
2014 2015
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
2016

2017
    # We use these two files to record operations to DB, useful for power-off tests
2018 2019
    fAddLogReady = None # type: Optional[io.TextIOWrapper]
    fAddLogDone  = None # type: Optional[io.TextIOWrapper]
2020 2021 2022

    @classmethod
    def prepToRecordOps(cls):
2023
        if Config.getConfig().record_ops:
S
Shuduo Sang 已提交
2024
            if (cls.fAddLogReady is None):
2025
                Logging.info(
S
Shuduo Sang 已提交
2026
                    "Recording in a file operations to be performed...")
2027
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2028
            if (cls.fAddLogDone is None):
2029
                Logging.info("Recording in a file operations completed...")
2030
                cls.fAddLogDone = open("add_log_done.txt", "w")
2031

2032
    @classmethod
2033 2034
    def getEndState(cls):
        return StateHasData()
2035 2036 2037 2038

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2039

2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058
    def _lockTableIfNeeded(self, fullTableName, extraMsg = ''):
        if Config.getConfig().verify_data:
            # Logging.info("Locking table: {}".format(fullTableName))
            self.lockTable(fullTableName) 
            # Logging.info("Table locked {}: {}".format(extraMsg, fullTableName))
                # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
        else:
            # Logging.info("Skipping locking table")
            pass

    def _unlockTableIfNeeded(self, fullTableName):
        if Config.getConfig().verify_data:
            # Logging.info("Unlocking table: {}".format(fullTableName))
            self.unlockTable(fullTableName) 
            # Logging.info("Table unlocked: {}".format(fullTableName))
        else:
            pass
            # Logging.info("Skipping unlocking table")

2059
    def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): 
2060
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS        
2061
        
2062
        fullTableName = db.getName() + '.' + regTableName
2063
        self._lockTableIfNeeded(fullTableName, 'batch')
2064

2065
        sql = "INSERT INTO {} VALUES ".format(fullTableName)
2066 2067 2068
        for j in range(numRecords):  # number of records per table
            nextInt = db.getNextInt()
            nextTick = db.getNextTick()
2069 2070
            nextColor = db.getNextColor()
            sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
2071 2072 2073 2074 2075 2076 2077 2078 2079

        # Logging.info("Adding data in batch: {}".format(sql))
        try:
            dbc.execute(sql)
        finally:
            # Logging.info("Data added in batch: {}".format(sql))
            self._unlockTableIfNeeded(fullTableName)

        
2080

2081
    def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
2082
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS        
2083 2084

        for j in range(numRecords):  # number of records per table
2085
            intToWrite = db.getNextInt()
2086
            nextTick = db.getNextTick()
2087
            nextColor = db.getNextColor()
2088
            if Config.getConfig().record_ops:
2089
                self.prepToRecordOps()
2090 2091
                if self.fAddLogReady is None:
                    raise CrashGenError("Unexpected empty fAddLogReady")
2092
                self.fAddLogReady.write("Ready to write {} to {}\n".format(intToWrite, regTableName))
2093
                self.fAddLogReady.flush()
2094
                os.fsync(self.fAddLogReady.fileno())
2095 2096 2097
                
            # TODO: too ugly trying to lock the table reliably, refactor...
            fullTableName = db.getName() + '.' + regTableName
2098 2099
            self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
            
2100
            try:
2101
                sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
2102 2103 2104
                    fullTableName,
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2105 2106
                    nextTick, intToWrite, nextColor)
                # Logging.info("Adding data: {}".format(sql))
2107
                dbc.execute(sql)
2108 2109
                # Logging.info("Data added: {}".format(sql))
                intWrote = intToWrite
2110 2111

                # Quick hack, attach an update statement here. TODO: create an "update" task
2112
                if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
2113
                    intToUpdate = db.getNextInt() # Updated, but should not succeed
2114 2115 2116
                    nextColor = db.getNextColor()
                    sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
                    fullTableName,
2117
                    nextTick, intToUpdate, nextColor)
2118 2119 2120
                    # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
                    #     fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
                    dbc.execute(sql)
2121
                    intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this.
2122

2123
            except: # Any exception at all
2124
                self._unlockTableIfNeeded(fullTableName) 
2125 2126 2127
                raise

            # Now read it back and verify, we might encounter an error if table is dropped
2128
            if Config.getConfig().verify_data: # only if command line asks for it
2129 2130 2131
                try:
                    readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
                        format(db.getName(), regTableName, nextTick))
2132
                    if readBack != intWrote :
2133 2134
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data, wrote: {}, read: {}"
2135
                            .format(intWrote, readBack), 0x999)
2136 2137
                except taos.error.ProgrammingError as err:
                    errno = Helper.convertErrno(err.errno)
2138
                    if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
2139
                        raise taos.error.ProgrammingError(
2140 2141 2142 2143 2144 2145 2146
                            "Failed to read back same data for tick: {}, wrote: {}, read: EMPTY"
                            .format(nextTick, intWrote),
                            errno)
                    elif errno == CrashGenError.INVALID_MULTIPLE_RESULT : # multiple results
                        raise taos.error.ProgrammingError(
                            "Failed to read back same data for tick: {}, wrote: {}, read: MULTIPLE RESULTS"
                            .format(nextTick, intWrote),
2147 2148 2149
                            errno)
                    elif errno in [0x218, 0x362]: # table doesn't exist
                        # do nothing
2150
                        pass
2151 2152 2153 2154
                    else:
                        # Re-throw otherwise
                        raise
                finally:
2155 2156 2157 2158
                    self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock
            # Done with read-back verification, unlock the table now
            else:
                self._unlockTableIfNeeded(fullTableName) 
2159 2160

            # Successfully wrote the data into the DB, let's record it somehow
2161
            te.recordDataMark(intWrote)
2162

2163
            if Config.getConfig().record_ops:
2164 2165
                if self.fAddLogDone is None:
                    raise CrashGenError("Unexpected empty fAddLogDone")
2166
                self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName))
2167
                self.fAddLogDone.flush()
2168
                os.fsync(self.fAddLogDone.fileno())
2169

2170
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2171 2172
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2173
        dbc = wt.getDbConn()
2174 2175
        numTables  = self.LARGE_NUMBER_OF_TABLES  if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
        numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
2176 2177
        tblSeq = list(range(numTables ))
        random.shuffle(tblSeq) # now we have random sequence
S
Shuduo Sang 已提交
2178 2179
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2180 2181
                # print("x", end="", flush=True) # concurrent insertion
                Progress.emit(Progress.CONCURRENT_INSERTION)
2182
            else:
S
Shuduo Sang 已提交
2183
                self.activeTable.add(i)  # marking it active
2184
            
2185
            dbName = db.getName()
2186
            sTable = db.getFixedSuperTable()
2187
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)            
2188
            fullTableName = dbName + '.' + regTableName
2189
            # self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
2190
            sTable.ensureRegTable(self, wt.getDbConn(), regTableName)  # Ensure the table exists           
2191
            # self._unlockTable(fullTableName)
2192
           
2193 2194 2195 2196
            if Dice.throw(1) == 0: # 1 in 2 chance
                self._addData(db, dbc, regTableName, te)
            else:
                self._addDataInBatch(db, dbc, regTableName, te)
2197

S
Shuduo Sang 已提交
2198
            self.activeTable.discard(i)  # not raising an error, unlike remove
2199 2200


2201 2202 2203
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
2204 2205
        allFrames = sys._current_frames() # All current stack frames
        for th in threading.enumerate():  # For each thread
2206 2207
            if th.ident is None:
                continue          
2208 2209 2210
            stack = traceback.extract_stack(allFrames[th.ident]) # Get stack for a thread
            shortTid = th.ident % 10000
            self._allStacks[shortTid] = stack # Was using th.native_id
2211 2212

    def print(self, filteredEndName = None, filterInternal = False):
2213
        for tIdent, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
2225
            print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(tIdent))
2226
            stackFrame = 0
2227
            for frame in stack: # was using: reversed(stack)
2228
                # print(frame)
2229 2230
                print("[{sf}] File {filename}, line {lineno}, in {name}".format(
                    sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
2231
                print("    {}".format(frame.line))
2232
                stackFrame += 1
2233
            print("-----> End of Thread Info ----->\n")
S
Shuduo Sang 已提交
2234

2235 2236
class ClientManager:
    def __init__(self):
S
Steven Li 已提交
2237
        Logging.info("Starting service manager")
2238 2239
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2240

2241
        self._status = Status.STATUS_RUNNING
2242 2243
        self.tc = None

2244 2245
        self.inSigHandler = False

2246
    def sigIntHandler(self, signalNumber, frame):
2247
        if self._status != Status.STATUS_RUNNING:
2248 2249 2250
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2251
        self._status = Status.STATUS_STOPPING  # immediately set our status
2252

2253
        print("ClientManager: Terminating program...")
2254 2255
        self.tc.requestToStop()

2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2326

2327
    def run(self, svcMgr):    
2328
        # self._printLastNumbers()
2329
        # global gConfig
2330

2331 2332 2333 2334
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

2335
        cfg = Config.getConfig()
2336 2337
        dbManager = DbManager(cfg.connector_type, tInst.getDbTarget())  # Regular function
        thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
2338
        self.tc = ThreadCoordinator(thPool, dbManager)
2339
        
S
Steven Li 已提交
2340
        Logging.info("Starting client instance: {}".format(tInst))
2341
        self.tc.run()
S
Steven Li 已提交
2342 2343
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2344
        if svcMgr: # gConfig.auto_start_service:
2345
            svcMgr.stopTaosServices()
2346
            svcMgr = None
2347
        
2348 2349

        # Release global variables
2350
        # gConfig = None
2351
        Config.clearConfig()
2352 2353
        gSvcMgr = None
        logger = None
2354 2355 2356 2357
        
        thPool = None
        dbManager.cleanUp() # destructor wouldn't run in time
        dbManager = None
2358

2359 2360 2361 2362 2363 2364
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()
2365 2366 2367 2368 2369 2370 2371 2372 2373
        # Release variables here
        self.tc = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
2374 2375

    def conclude(self):
2376
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
2377
        self.tc.printStats()
2378

2379
class MainExec:
2380 2381
    def __init__(self):        
        self._clientMgr = None
2382
        self._svcMgr = None # type: Optional[ServiceManager]
2383

2384 2385 2386
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2387

2388 2389 2390 2391 2392 2393 2394
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
2395
        if  self._svcMgr:
2396
            self._svcMgr.sigIntHandler(signalNumber, frame)
2397
        if  self._clientMgr:
2398 2399 2400
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2401
        global gSvcMgr
2402
        if Config.getConfig().auto_start_service:
2403 2404
            gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
            gSvcMgr.startTaosServices() # we start, don't run
2405 2406
        
        self._clientMgr = ClientManager()
2407 2408 2409 2410
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
2411
            Logging.warning("Failed to open REST connection to DB: {}".format(err))
2412 2413
            # don't raise
        return ret
2414 2415

    def runService(self):
2416
        global gSvcMgr
2417
        gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
2418

2419 2420
        gSvcMgr.run() # run to some end state
        gSvcMgr = self._svcMgr = None 
2421

2422
    def _buildCmdLineParser(self):
2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawDescriptionHelpFormatter,
            description=textwrap.dedent('''\
                TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
                ---------------------------------------------------------------------
                1. You build TDengine in the top level ./build directory, as described in offical docs
                2. You run the server there before this script: ./build/bin/taosd -c test/cfg

                '''))                      

        parser.add_argument(
            '-a',
            '--auto-start-service',
            action='store_true',
            help='Automatically start/stop the TDengine service (default: false)')
        parser.add_argument(
            '-b',
            '--max-dbs',
            action='store',
            default=0,
            type=int,
2444
            help='Number of DBs to use, set to disable dropping DB. (default: 0)')
2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461
        parser.add_argument(
            '-c',
            '--connector-type',
            action='store',
            default='native',
            type=str,
            help='Connector type to use: native, rest, or mixed (default: 10)')
        parser.add_argument(
            '-d',
            '--debug',
            action='store_true',
            help='Turn on DEBUG mode for more logging (default: false)')
        parser.add_argument(
            '-e',
            '--run-tdengine',
            action='store_true',
            help='Run TDengine service in foreground (default: false)')
2462 2463 2464 2465 2466 2467 2468
        parser.add_argument(
            '-g',
            '--ignore-errors',
            action='store',
            default=None,
            type=str,
            help='Ignore error codes, comma separated, 0x supported (default: None)')
2469 2470
        parser.add_argument(
            '-i',
2471
            '--num-replicas',
2472 2473 2474
            action='store',
            default=1,
            type=int,
2475 2476 2477 2478 2479 2480
            help='Number (fixed) of replicas to use, when testing against clusters. (default: 1)')
        parser.add_argument(
            '-k',
            '--track-memory-leaks',
            action='store_true',
            help='Use Valgrind tool to track memory leaks (default: false)')
2481 2482 2483 2484 2485
        parser.add_argument(
            '-l',
            '--larger-data',
            action='store_true',
            help='Write larger amount of data during write operations (default: false)')
2486 2487 2488 2489 2490
        parser.add_argument(
            '-m',
            '--mix-oos-data',
            action='store_false',
            help='Mix out-of-sequence data into the test data stream (default: true)')
2491 2492 2493 2494
        parser.add_argument(
            '-n',
            '--dynamic-db-table-names',
            action='store_true',
2495
            help='Use non-fixed names for dbs/tables, for -b, useful for multi-instance executions (default: false)')        
2496 2497 2498 2499 2500 2501 2502
        parser.add_argument(
            '-o',
            '--num-dnodes',
            action='store',
            default=1,
            type=int,
            help='Number of Dnodes to initialize, used with -e option. (default: 1)')
2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
        parser.add_argument(
            '-p',
            '--per-thread-db-connection',
            action='store_true',
            help='Use a single shared db connection (default: false)')
        parser.add_argument(
            '-r',
            '--record-ops',
            action='store_true',
            help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
        parser.add_argument(
            '-s',
            '--max-steps',
            action='store',
            default=1000,
            type=int,
            help='Maximum number of steps to run (default: 100)')
        parser.add_argument(
            '-t',
            '--num-threads',
            action='store',
            default=5,
            type=int,
            help='Number of threads to run (default: 10)')
        parser.add_argument(
            '-v',
            '--verify-data',
            action='store_true',
            help='Verify data written in a number of places by reading back (default: false)')
2532 2533 2534 2535 2536
        parser.add_argument(
            '-w',
            '--use-shadow-db',
            action='store_true',
            help='Use a shaddow database to verify data integrity (default: false)')
2537 2538 2539 2540 2541 2542
        parser.add_argument(
            '-x',
            '--continue-on-exception',
            action='store_true',
            help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')

2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554
        return parser


    def init(self): # TODO: refactor
        global gContainer
        gContainer = Container() # micky-mouse DI

        global gSvcMgr # TODO: refactor away
        gSvcMgr = None

        parser = self._buildCmdLineParser()
        Config.init(parser)
2555 2556

        # Sanity check for arguments
2557
        if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 :
2558
            raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
S
Shuduo Sang 已提交
2559

2560
        Logging.clsInit(Config.getConfig().debug)
2561 2562 2563 2564

        Dice.seed(0)  # initial seeding of dice

    def run(self):
2565
        if Config.getConfig().run_tdengine:  # run server
2566 2567 2568 2569 2570 2571
            try:
                self.runService()
                return 0 # success
            except ConnectionError as err:
                Logging.error("Failed to make DB connection, please check DB instance manually")
            return -1 # failure
2572 2573
        else:
            return self.runClient()
S
Steven Li 已提交
2574

2575

2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
2597