crash_gen.py 117.2 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
39

S
Steven Li 已提交
40
import sys
41
import os
42 43
import io
import signal
44
import traceback
45 46 47 48 49 50 51

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)

52 53 54 55
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
56
# Global variables, tried to keep a small number.
57 58 59

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
S
Shuduo Sang 已提交
60
gConfig = argparse.Namespace()  # Dummy value, will be replaced later
61
gSvcMgr = None # TODO: refactor this hack, use dep injection
62
logger = None # type: Logger
S
Steven Li 已提交
63

S
Shuduo Sang 已提交
64
def runThread(wt: WorkerThread):
65
    wt.run()
66

67 68
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
S
Shuduo Sang 已提交
69
        self.msg = msg
70
        self.errno = errno
S
Shuduo Sang 已提交
71

72 73 74
    def __str__(self):
        return self.msg

S
Shuduo Sang 已提交
75

S
Steven Li 已提交
76
class WorkerThread:
77
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
S
Shuduo Sang 已提交
78 79 80
                 # te: TaskExecutor,
                 ):  # note: main thread context!
        # self._curStep = -1
81
        self._pool = pool
S
Shuduo Sang 已提交
82 83
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
84
        # self.threadIdent = threading.get_ident()
85 86
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
87

88
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
89
        if (gConfig.per_thread_db_connection):  # type: ignore
90
            # print("connector_type = {}".format(gConfig.connector_type))
91 92 93 94 95 96 97 98 99 100 101
            if gConfig.connector_type == 'native':
                self._dbConn = DbConn.createNative() 
            elif gConfig.connector_type == 'rest':
                self._dbConn = DbConn.createRest() 
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
102

103
        # self._dbInUse = False  # if "use db" was executed already
104

105
    def logDebug(self, msg):
S
Steven Li 已提交
106
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
107 108

    def logInfo(self, msg):
S
Steven Li 已提交
109
        logger.info("    TRD[{}] {}".format(self._tid, msg))
110

111 112
    # def dbInUse(self):
    #     return self._dbInUse
113

114 115 116 117
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
118

119
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
120
        return self._tc.getTaskExecutor()
121

S
Steven Li 已提交
122
    def start(self):
123
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
124

S
Shuduo Sang 已提交
125
    def run(self):
S
Steven Li 已提交
126
        # initialization after thread starts, in the thread context
127
        # self.isSleeping = False
128 129
        logger.info("Starting to run thread: {}".format(self._tid))

S
Shuduo Sang 已提交
130
        if (gConfig.per_thread_db_connection):  # type: ignore
131
            logger.debug("Worker thread openning database connection")
132
            self._dbConn.open()
S
Steven Li 已提交
133

S
Shuduo Sang 已提交
134 135
        self._doTaskLoop()

136
        # clean up
S
Shuduo Sang 已提交
137
        if (gConfig.per_thread_db_connection):  # type: ignore
138 139 140 141
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
                logger.warning("Cleaning up worker thread, dbConn already closed")
142

S
Shuduo Sang 已提交
143
    def _doTaskLoop(self):
144 145
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
146 147
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
148 149 150
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
151
                print("_bto", end="")
152 153 154
                logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
                break

155
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
156
            self.crossStepGate()   # then per-thread gate, after being tapped
157
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
158
            if not self._tc.isRunning():
159
                print("_wts", end="")
160
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
161 162
                break

163
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
164
            try:
165 166 167
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
168
                # self.useDb() # might encounter exceptions. TODO: catch
169 170
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
171
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
172 173 174 175 176 177
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

178
            # Fetch a task from the Thread Coordinator
179
            logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
180
            task = tc.fetchTask()
181 182

            # Execute such a task
183
            logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
184
                    self._tid, task.__class__.__name__))
185
            task.execute(self)
186
            tc.saveExecutedTask(task)
187
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
188

189
            # self._dbInUse = False  # there may be changes between steps
190
        # print("_wtd", end=None) # worker thread died
191

S
Shuduo Sang 已提交
192 193
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
194 195
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
196 197
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
198 199 200
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
201
        if (not self._thread.is_alive()):
S
Steven Li 已提交
202 203
            raise RuntimeError("Unexpected dead thread")

204
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
205 206
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
207 208
        self.verifyThreadSelf()  # only allowed by ourselves

209
        # Wait again at the "gate", waiting to be "tapped"
S
Shuduo Sang 已提交
210 211 212 213
        logger.debug(
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
214
        self._stepGate.clear()
S
Shuduo Sang 已提交
215

216
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
217

S
Shuduo Sang 已提交
218
    def tapStepGate(self):  # give it a tap, release the thread waiting there
219
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
220 221
        self.verifyThreadMain()  # only allowed for main thread

222 223 224 225 226 227
        if self._thread.is_alive():
            logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
228

S
Shuduo Sang 已提交
229
    def execSql(self, sql):  # TODO: expose DbConn directly
230
        return self.getDbConn().execute(sql)
231

S
Shuduo Sang 已提交
232
    def querySql(self, sql):  # TODO: expose DbConn directly
233
        return self.getDbConn().query(sql)
234 235

    def getQueryResult(self):
236
        return self.getDbConn().getQueryResult()
237

238
    def getDbConn(self) -> DbConn :
S
Shuduo Sang 已提交
239 240
        if (gConfig.per_thread_db_connection):
            return self._dbConn
241
        else:
242
            return self._tc.getDbManager().getDbConn()
243

244 245
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
246
    #         return self._dbConn.query(sql)
247 248
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
249

250
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
251 252


253
class ThreadCoordinator:
S
Steven Li 已提交
254
    WORKER_THREAD_TIMEOUT = 60 # one minute
255

256
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
257
        self._curStep = -1  # first step is 0
258
        self._pool = pool
259
        # self._wd = wd
S
Shuduo Sang 已提交
260
        self._te = None  # prepare for every new step
261
        self._dbManager = dbManager
S
Shuduo Sang 已提交
262 263
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
264

S
Shuduo Sang 已提交
265 266
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
267
        self._execStats = ExecutionStats()
268
        self._runStatus = MainExec.STATUS_RUNNING
269
        self._initDbs()
S
Steven Li 已提交
270

271 272 273
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
274
    def getDbManager(self) -> DbManager:
275
        return self._dbManager
276

277 278
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
279

280 281 282 283
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

284
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
285 286 287 288 289 290 291 292 293
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
        if self._runStatus != MainExec.STATUS_RUNNING:
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
294 295
        if workerTimeout:
            return True
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
        logger.debug(
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

        logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
        self.tapAllThreads() # release all worker thread from their "gate"

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
        logger.debug("[TRD] Main thread about to cross the barrier")
327
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
328 329 330 331 332 333
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
        logger.debug("[TRD] Main thread finished crossing the barrier")

    def _doTransition(self):
        transitionFailed = False
        try:
334 335 336 337 338 339 340 341 342 343
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
                logger.debug("[STT] starting transitions for DB: {}".format(db.getName()))
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
                logger.debug("[STT] transition ended for DB: {}".format(db.getName()))

            # Due to limitation (or maybe not) of the TD Python library,
344
            # we cannot share connections across threads
345 346 347 348 349 350
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
            #         logger.debug("[DB] use db for all worker threads")
            #         t.useDb()
351 352
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
353

354 355 356 357 358 359 360 361 362 363 364
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
                logger.info("DB connection broken, execution failed")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
            else:
                raise
365
        return transitionFailed
366 367 368 369 370 371

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
        logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
        return transitionFailed

S
Shuduo Sang 已提交
372
    def run(self):
373
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
374 375

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
376
        self._curStep = -1  # not started yet
377
        
S
Shuduo Sang 已提交
378
        self._execStats.startExec()  # start the stop watch
379 380
        transitionFailed = False
        hasAbortedTask = False
381 382
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
383
            if not gConfig.debug: # print this only if we are not in debug mode                
S
Shuduo Sang 已提交
384
                print(".", end="", flush=True)
385
                        
386 387 388 389 390 391 392 393 394 395
            try:
                self._syncAtBarrier() # For now just cross the barrier
            except threading.BrokenBarrierError as err:
                logger.info("Main loop aborted, caused by worker thread time-out")
                self._execStats.registerFailure("Aborted due to worker thread timeout")
                print("\n\nWorker Thread time-out detected, important thread info:")
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
                break
396 397

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
398 399
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
400 401 402
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
                logger.info("Aborted task encountered, exiting test program")
403
                self._execStats.registerFailure("Aborted Task Encountered")
404
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
405

406
            # Ending previous step
407 408 409 410
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
411
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
412 413 414
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
                logger.info(errMsg)
                self._execStats.registerFailure(errMsg)
415

416 417
            # Then we move on to the next step
            self._releaseAllWorkerThreads(transitionFailed)                    
418

419 420
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
            logger.debug("Abnormal ending of main thraed")
421 422
        elif workerTimeout:
            logger.debug("Abnormal ending of main thread, due to worker timeout")
423 424 425
        else: # regular ending, workers waiting at "barrier"
            logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
            self._syncAtBarrier()
426

427 428 429
        self._te = None  # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads()  # Let the threads run one last time
430

431
        logger.debug("\r\n\n--> Main thread ready to finish up...")
432
        logger.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
433
        self._pool.joinAll()  # Get all threads to finish
434
        logger.info("\nAll worker threads finished")
435 436
        self._execStats.endExec()

437 438
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
439

S
Steven Li 已提交
440 441 442 443 444 445
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
446
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
447
        wakeSeq = []
S
Shuduo Sang 已提交
448 449
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
450 451 452
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Shuduo Sang 已提交
453 454 455
        logger.debug(
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
456
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
457
        for i in wakeSeq:
S
Shuduo Sang 已提交
458 459 460
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
461

462
    def isRunning(self):
S
Shuduo Sang 已提交
463
        return self._te is not None
464

465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
        if gConfig.max_dbs == 0:
            self._dbs.append(Database(0, dbc))
        else:
            for i in range(gConfig.max_dbs):
                self._dbs.append(Database(i, dbc))

    def pickDatabase(self):
        idxDb = 0
        if gConfig.max_dbs != 0 :
            idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
482
    def fetchTask(self) -> Task:
483 484 485
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
486
        if (not self.isRunning()):  # no task
487
            raise RuntimeError("Cannot fetch task when not running")
488

S
Shuduo Sang 已提交
489
        # pick a task type for current state
490 491 492
        db = self.pickDatabase()
        taskType = db.getStateMachine().pickTaskType() # type: Task
        return taskType(self._execStats, db)  # create a task from it
493 494

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
495
        self._executedTasks = []  # should be under single thread
496 497 498 499

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
500 501

# We define a class to run a number of threads in locking steps.
S
Shuduo Sang 已提交
502

503 504 505 506
class Helper:
    @classmethod
    def convertErrno(cls, errno):
        return errno if (errno > 0) else 0x80000000 + errno
S
Shuduo Sang 已提交
507

508
class ThreadPool:
509
    def __init__(self, numThreads, maxSteps):
510 511 512 513
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
514 515
        self.threadList = []  # type: List[WorkerThread]

516
    # starting to run all the threads, in locking steps
517
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
518 519
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
520
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
521
            workerThread.start()  # start, but should block immediately before step 0
522 523 524 525 526 527

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

528 529
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
530 531


S
Steven Li 已提交
532 533
class LinearQueue():
    def __init__(self):
534
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
535
        self.lastIndex = 0
S
Shuduo Sang 已提交
536 537
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
538

539
    def toText(self):
S
Shuduo Sang 已提交
540 541
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
542 543

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
544
    def push(self):
545
        with self._lock:
S
Shuduo Sang 已提交
546 547
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
548
            #     return self.firstIndex
549 550
            # Otherwise we have something
            self.lastIndex += 1
551 552
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
553
            return self.lastIndex
S
Steven Li 已提交
554 555

    def pop(self):
556
        with self._lock:
S
Shuduo Sang 已提交
557 558 559 560
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

561
            index = self.firstIndex
S
Shuduo Sang 已提交
562
            if (index in self.inUse):
563 564
                return False

565 566 567 568 569 570 571
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
572
        with self._lock:
573 574 575 576
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
577
    def allocate(self, i):
578
        with self._lock:
579
            # logger.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
580 581 582
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
583 584
            self.inUse.add(i)

S
Steven Li 已提交
585
    def release(self, i):
586
        with self._lock:
587
            # logger.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
588
            self.inUse.remove(i)  # KeyError possible, TODO: why?
589 590 591 592

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
593
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
594
        if (self.isEmpty()):
595 596
            return None
        with self._lock:
S
Shuduo Sang 已提交
597
            cnt = 0  # counting the interations
598 599
            while True:
                cnt += 1
S
Shuduo Sang 已提交
600
                if (cnt > self.size() * 10):  # 10x iteration already
601 602
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
603 604
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
605 606 607
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
608

609
class DbConn:
610
    TYPE_NATIVE = "native-c"
611
    TYPE_REST =   "rest-api"
612 613 614 615 616 617 618 619 620
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
S
Shuduo Sang 已提交
621 622
            raise RuntimeError(
                "Unexpected connection type: {}".format(connType))
623 624 625 626 627 628 629 630 631

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

632 633
    def __init__(self):
        self.isOpen = False
634
        self._type = self.TYPE_INVALID
635 636 637 638
        self._lastSql = None

    def getLastSql(self):
        return self._lastSql
639 640

    def open(self):
S
Shuduo Sang 已提交
641
        if (self.isOpen):
642 643
            raise RuntimeError("Cannot re-open an existing DB connection")

644 645
        # below implemented by child classes
        self.openByType()
646

647
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
648 649
        self.isOpen = True

S
Shuduo Sang 已提交
650
    def queryScalar(self, sql) -> int:
651 652
        return self._queryAny(sql)

S
Shuduo Sang 已提交
653
    def queryString(self, sql) -> str:
654 655
        return self._queryAny(sql)

S
Shuduo Sang 已提交
656 657
    def _queryAny(self, sql):  # actual query result as an int
        if (not self.isOpen):
658
            raise RuntimeError("Cannot query database until connection is open")
659
        nRows = self.query(sql)
S
Shuduo Sang 已提交
660
        if nRows != 1:
661
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
662
        if self.getResultRows() != 1 or self.getResultCols() != 1:
663
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
664 665
        return self.getQueryResult()[0][0]

666 667 668
    def use(self, dbName):
        self.execute("use {}".format(dbName))

669 670 671 672 673 674 675
    def existsDatabase(self, dbName: str):
        ''' Check if a certain database exists '''
        self.query("show databases")
        dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
        # ret2 = dbName in dbs
        # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
        return dbName in dbs # TODO: super weird type mangling seen, once here
676 677 678 679

    def hasTables(self):
        return self.query("show tables") > 0

680
    def execute(self, sql):
681
        ''' Return the number of rows affected'''
682
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
683

684 685 686 687 688 689 690 691 692
    def safeExecute(self, sql):
        '''Safely execute any SQL query, returning True/False upon success/failure'''
        try:
            self.execute(sql)
            return True # ignore num of results, return success
        except taos.error.ProgrammingError as err:
            return False # failed, for whatever TAOS reason
        # Not possile to reach here, non-TAOS exception would have been thrown

693
    def query(self, sql) -> int: # return num rows returned
694
        ''' Return the number of rows affected'''
695 696
        raise RuntimeError("Unexpected execution, should be overriden")

697 698
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
699

700 701
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
702

703 704
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
705

706 707 708 709
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
S
Shuduo Sang 已提交
710 711


712 713 714 715
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
S
Steven Li 已提交
716
        self._url = "http://localhost:6041/rest/sql"  # fixed for now
717 718
        self._result = None

S
Shuduo Sang 已提交
719 720 721
    def openByType(self):  # Open connection
        pass  # do nothing, always open

722
    def close(self):
S
Shuduo Sang 已提交
723
        if (not self.isOpen):
724
            raise RuntimeError("Cannot clean up database until connection is open")
725 726 727 728 729
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
730
        self._lastSql = sql # remember this, last SQL attempted
731 732 733
        try:
            r = requests.post(self._url, 
                data = sql,
734
                auth = HTTPBasicAuth('root', 'taosdata'))         
735 736 737
        except:
            print("REST API Failure (TODO: more info here)")
            raise
738 739
        rj = r.json()
        # Sanity check for the "Json Result"
S
Shuduo Sang 已提交
740
        if ('status' not in rj):
741 742
            raise RuntimeError("No status in REST response")

S
Shuduo Sang 已提交
743 744 745 746
        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
747
            # print("Raising programming error with REST return: {}".format(rj))
S
Shuduo Sang 已提交
748 749
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'
750

S
Shuduo Sang 已提交
751 752 753 754
        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))
755 756

        nRows = rj['rows'] if ('rows' in rj) else 0
S
Shuduo Sang 已提交
757
        self._result = rj
758 759
        return nRows

S
Shuduo Sang 已提交
760 761 762 763
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
764 765
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
S
Shuduo Sang 已提交
766 767
        logger.debug(
            "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
768 769
        return nRows

S
Shuduo Sang 已提交
770
    def query(self, sql):  # return rows affected
771 772 773 774 775 776 777 778 779 780 781 782 783
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
S
Shuduo Sang 已提交
784

785
    # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
786 787


788
class MyTDSql:
789 790 791 792 793 794 795
    # Class variables
    _clsLock = threading.Lock() # class wide locking
    longestQuery = None # type: str
    longestQueryTime = 0.0 # seconds
    lqStartTime = 0.0
    # lqEndTime = 0.0 # Not needed, as we have the two above already

796 797 798 799 800
    def __init__(self, hostAddr, cfgPath):
        # Make the DB connection
        self._conn = taos.connect(host=hostAddr, config=cfgPath) 
        self._cursor = self._conn.cursor()

801 802 803 804
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

805 806
    # def init(self, cursor, log=True):
    #     self.cursor = cursor
807 808 809 810 811
        # if (log):
        #     caller = inspect.getframeinfo(inspect.stack()[1][0])
        #     self.cursor.log(caller.filename + ".sql")

    def close(self):
812 813
        self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
        self._cursor.close()
814

815 816 817 818 819 820 821 822 823 824 825 826 827
    def _execInternal(self, sql):
        startTime = time.time() 
        ret = self._cursor.execute(sql)
        queryTime =  time.time() - startTime
        # Record the query time
        cls = self.__class__
        if queryTime > (cls.longestQueryTime + 0.01) :
            with cls._clsLock:
                cls.longestQuery = sql
                cls.longestQueryTime = queryTime
                cls.lqStartTime = startTime
        return ret

828 829 830
    def query(self, sql):
        self.sql = sql
        try:
831
            self._execInternal(sql)
832
            self.queryResult = self._cursor.fetchall()
833
            self.queryRows = len(self.queryResult)
834
            self.queryCols = len(self._cursor.description)
835 836 837 838 839 840
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.queryRows
841

842 843 844
    def execute(self, sql):
        self.sql = sql
        try:
845
            self.affectedRows = self._execInternal(sql)
846 847 848 849 850 851 852
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.affectedRows

S
Shuduo Sang 已提交
853

854
class DbConnNative(DbConn):
855 856 857
    # Class variables
    _lock = threading.Lock()
    _connInfoDisplayed = False
858
    totalConnections = 0 # Not private
859

860 861
    def __init__(self):
        super().__init__()
862
        self._type = self.TYPE_NATIVE
S
Shuduo Sang 已提交
863
        self._conn = None
864
        # self._cursor = None        
S
Shuduo Sang 已提交
865

866 867 868 869 870 871 872
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

873
        buildPath = None
874 875 876 877
        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
S
Shuduo Sang 已提交
878
                    buildPath = root[:len(root) - len("/build/bin")]
879
                    break
880
        if buildPath == None:
881 882
            raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
                .format(selfPath, projPath))
883 884
        return buildPath

885
    
S
Shuduo Sang 已提交
886
    def openByType(self):  # Open connection
887
        cfgPath = self.getBuildPath() + "/test/cfg"
888
        hostAddr = "127.0.0.1"
889

890 891 892 893 894 895 896 897 898 899 900
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
            if not cls._connInfoDisplayed:
                cls._connInfoDisplayed = True # updating CLASS variable
                logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))                    
            # Make the connection         
            # self._conn = taos.connect(host=hostAddr, config=cfgPath)  # TODO: make configurable
            # self._cursor = self._conn.cursor()
            # Record the count in the class
            self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
            cls.totalConnections += 1 
901
        
902
        self._tdSql.execute('reset query cache')
S
Shuduo Sang 已提交
903
        # self._cursor.execute('use db') # do this at the beginning of every
904 905

        # Open connection
906 907 908
        # self._tdSql = MyTDSql()
        # self._tdSql.init(self._cursor)
        
909
    def close(self):
S
Shuduo Sang 已提交
910
        if (not self.isOpen):
911
            raise RuntimeError("Cannot clean up database until connection is open")
912
        self._tdSql.close()
913 914 915 916 917
        # Decrement the class wide counter
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock:
            cls.totalConnections -= 1

918
        logger.debug("[DB] Database connection closed")
919
        self.isOpen = False
S
Steven Li 已提交
920

S
Shuduo Sang 已提交
921 922
    def execute(self, sql):
        if (not self.isOpen):
923
            raise RuntimeError("Cannot execute database commands until connection is open")
924
        logger.debug("[SQL] Executing SQL: {}".format(sql))
925
        self._lastSql = sql
926
        nRows = self._tdSql.execute(sql)
S
Shuduo Sang 已提交
927 928 929
        logger.debug(
            "[SQL] Execution Result, nRows = {}, SQL = {}".format(
                nRows, sql))
930
        return nRows
S
Steven Li 已提交
931

S
Shuduo Sang 已提交
932 933 934 935
    def query(self, sql):  # return rows affected
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot query database until connection is open")
936
        logger.debug("[SQL] Executing SQL: {}".format(sql))
937
        self._lastSql = sql
938
        nRows = self._tdSql.query(sql)
S
Shuduo Sang 已提交
939 940 941
        logger.debug(
            "[SQL] Query Result, nRows = {}, SQL = {}".format(
                nRows, sql))
942
        return nRows
943
        # results are in: return self._tdSql.queryResult
944

945 946 947
    def getQueryResult(self):
        return self._tdSql.queryResult

948 949
    def getResultRows(self):
        return self._tdSql.queryRows
950

951 952
    def getResultCols(self):
        return self._tdSql.queryCols
953

S
Shuduo Sang 已提交
954

955
class AnyState:
S
Shuduo Sang 已提交
956 957 958
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
959
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
960
    STATE_HAS_DATA = 3  # we have some data in the table
961 962 963 964
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
965 966 967
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
968 969
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
970 971 972 973 974 975 976
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
977 978
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
979

980 981
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
982 983 984
    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
985 986 987 988 989 990
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
991 992 993
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
994

995 996 997
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
998 999 1000
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

1001 1002
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
1003

1004 1005
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
1006

1007
    def canDropDb(self):
1008 1009 1010 1011
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
        if gConfig.max_dbs > 0 : 
            return False
1012
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
1013

1014 1015
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
1016

1017 1018
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
1019

1020 1021
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
1022

1023 1024 1025 1026 1027
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
1028
        for task in tasks:
1029 1030 1031
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
1032
                # task.logDebug("Task success found")
1033
                sCnt += 1
S
Shuduo Sang 已提交
1034 1035 1036
                if (sCnt >= 2):
                    raise RuntimeError(
                        "Unexpected more than 1 success with task: {}".format(cls))
1037 1038 1039 1040

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
1041
        for task in tasks:
1042 1043
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
1044
            exists = True  # we have a valid instance
1045 1046
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
1047 1048 1049
        if (exists and sCnt <= 0):
            raise RuntimeError(
                "Unexpected zero success for task: {}".format(cls))
1050 1051

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
1052
        for task in tasks:
1053
            if isinstance(task, cls):
S
Shuduo Sang 已提交
1054 1055
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
1056 1057

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1058
        for task in tasks:
1059 1060
            if isinstance(task, cls):
                if task.isSuccess():
S
Shuduo Sang 已提交
1061 1062
                    raise RuntimeError(
                        "Unexpected successful task: {}".format(cls))
1063 1064

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1065
        for task in tasks:
1066 1067 1068 1069 1070 1071
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
1072
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
1073
        for task in tasks:
S
Steven Li 已提交
1074 1075 1076 1077
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
1078

1079 1080 1081 1082
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
1083 1084 1085
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1086 1087 1088 1089
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
1090

1091 1092 1093 1094
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
1095 1096 1097
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1098 1099
        ]

S
Shuduo Sang 已提交
1100 1101
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
1102
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
1103 1104 1105 1106
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1118 1119 1120
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
1121 1122 1123 1124 1125

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
1126

S
Shuduo Sang 已提交
1127

1128
class StateSuperTableOnly(AnyState):
1129 1130 1131 1132 1133 1134 1135 1136 1137
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1138
        if (self.hasSuccess(tasks, TaskDropSuperTable)
1139
                ):  # we are able to drop the table
1140
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
1141 1142
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
1143

1144
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
1145 1146
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
1147
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
1148 1149 1150
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
1151
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
1152 1153 1154
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
1155

S
Shuduo Sang 已提交
1156

1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1167
        if (newState.equals(AnyState.STATE_EMPTY)):
1168
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1169 1170 1171 1172
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
1173
                ):  # without a create_db task
S
Shuduo Sang 已提交
1174 1175
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
1176
            self.hasSuccess(tasks, TaskDropSuperTable)
1177
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
1178 1179 1180 1181
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
1182
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
1183 1184
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
1185
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
1186 1187 1188
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
1189
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
1190 1191
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
1192
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
1193

S
Shuduo Sang 已提交
1194

1195
class StateMechine:
1196 1197 1198
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
1199
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
1200

1201 1202 1203 1204 1205
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
        self._curState = self._findCurrentState(dbc)  # starting state
        logger.debug("Found Starting State: {}".format(self._curState))

    # TODO: seems no lnoger used, remove?
1206 1207 1208
    def getCurrentState(self):
        return self._curState

1209 1210 1211
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

1212
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
1213
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
1214 1215 1216 1217 1218 1219
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
1220
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
1221 1222
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
1223
            # t = tc(self) # create task object
1224 1225
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
1226 1227 1228 1229 1230 1231 1232 1233
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
1234
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
1235
                    taskTypes.append(tc)  # gather it
1236 1237

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
1238 1239 1240 1241 1242 1243 1244
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
        logger.debug(
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
1245 1246
        return taskTypes

1247
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
1248
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
1249 1250
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
1251
            logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
1252
            return StateEmpty()
S
Shuduo Sang 已提交
1253 1254
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
1255
        dbc.use(dbName)
1256 1257
        if not dbc.hasTables():  # no tables
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
1258
            return StateDbOnly()
1259

1260 1261
        sTable = self._db.getFixedSuperTable()
        if sTable.hasRegTables(dbc, dbName):  # no regular tables
1262
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
1263
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
1264
        else:  # has actual tables
1265
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
1266 1267
            return StateHasData()

1268 1269
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
S
Shuduo Sang 已提交
1270
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
1271
            logger.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
1272
            return  # do nothing
1273

S
Shuduo Sang 已提交
1274
        # this should show up in the server log, separating steps
1275
        dbc.execute("show dnodes")
1276 1277 1278 1279

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
1280 1281
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
1282 1283

        if self._curState.canDropDb():
1284
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
1285
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1286 1287
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1288 1289 1290

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1291 1292
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1293 1294 1295

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1296 1297
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1298 1299

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1300 1301
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1302 1303 1304 1305

        # if self._state.canReadData():
            # Nothing for sure

1306
        newState = self._findCurrentState(dbc)
1307
        logger.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1308 1309
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1310 1311 1312
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1313 1314
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1315 1316 1317
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1318 1319 1320
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1321
            else:
S
Shuduo Sang 已提交
1322 1323
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1324
        i = self._weighted_choice_sub(weights)
S
Shuduo Sang 已提交
1325
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1326 1327
        return taskTypes[i]

S
Shuduo Sang 已提交
1328 1329 1330 1331 1332
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1333 1334 1335 1336
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1337

S
Shuduo Sang 已提交
1338

1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
    '''
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
S
Shuduo Sang 已提交
1349 1350 1351

        self._lastTick = self.setupLastTick()
        self._lastInt = 0  # next one is initial integer
1352
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1353

1354 1355
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1356

1357 1358
    def getDbNum(self):
        return self._dbNum
1359

1360 1361
    def getName(self):
        return "db_{}".format(self._dbNum)
1362

1363 1364 1365 1366 1367 1368
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1369

1370 1371 1372 1373 1374
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1375

1376 1377 1378 1379 1380 1381 1382
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

    @classmethod
    def getFixedSuperTable(cls) -> TdSuperTable:
        return TdSuperTable(cls.getFixedSuperTableName())
1383 1384 1385 1386 1387 1388

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1389
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1390
    def setupLastTick(self):
1391
        t1 = datetime.datetime(2020, 6, 1)
1392
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1393 1394 1395 1396
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1397
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1398 1399 1400
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1401 1402 1403
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

1404
    def getNextTick(self):
S
Shuduo Sang 已提交
1405
        with self._lock:  # prevent duplicate tick
1406 1407
            if Dice.throw(20) == 0:  # 1 in 20 chance
                return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds
S
Shuduo Sang 已提交
1408 1409 1410
            else:  # regular
                # add one second to it
                self._lastTick += datetime.timedelta(0, 1)
S
Steven Li 已提交
1411
                return self._lastTick
1412 1413

    def getNextInt(self):
1414 1415 1416
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1417 1418

    def getNextBinary(self):
S
Shuduo Sang 已提交
1419 1420
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1421 1422

    def getNextFloat(self):
1423 1424 1425
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1426

1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475

class DbManager():
    ''' This is a wrapper around DbConn(), to make it easier to use. 

        TODO: rename this to DbConnManager
    '''
    def __init__(self):
        self.tableNumQueue = LinearQueue() # TODO: delete?
        # self.openDbServerConnection()
        self._dbConn = DbConn.createNative() if (
            gConfig.connector_type == 'native') else DbConn.createRest()
        try:
            self._dbConn.open()  # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if (err.msg == 'client disconnected'):  # cannot open DB connection
                print(
                    "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit(2)
            else:
                print("Failed to connect to DB, errno = {}, msg: {}"
                    .format(Helper.convertErrno(err.errno), err.msg))
                raise
        except BaseException:
            print("[=] Unexpected exception")
            raise

        # Do this after dbConn is in proper shape
        # Moved to Database()
        # self._stateMachine = StateMechine(self._dbConn)

    def getDbConn(self):
        return self._dbConn

    # TODO: not used any more, to delete
    def pickAndAllocateTable(self):  # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

    # TODO: Not used any more, to delete
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

    # Not used any more, to delete
    def releaseTable(self, i):  # return the table back, so others can use it
        self.tableNumQueue.release(i)    

    # TODO: not used any more, delete
S
Steven Li 已提交
1476
    def getTableNameToDelete(self):
S
Shuduo Sang 已提交
1477 1478
        tblNum = self.tableNumQueue.pop()  # TODO: race condition!
        if (not tblNum):  # maybe false
1479
            return False
S
Shuduo Sang 已提交
1480

S
Steven Li 已提交
1481 1482
        return "table_{}".format(tblNum)

1483
    def cleanUp(self):
S
Shuduo Sang 已提交
1484 1485
        self._dbConn.close()

1486
class TaskExecutor():
1487
    class BoundedList:
S
Shuduo Sang 已提交
1488
        def __init__(self, size=10):
1489 1490
            self._size = size
            self._list = []
S
Steven Li 已提交
1491
            self._lock = threading.Lock()
1492

S
Shuduo Sang 已提交
1493
        def add(self, n: int):
S
Steven Li 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1520 1521 1522 1523 1524 1525

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1526 1527 1528
    def __init__(self, curStep):
        self._curStep = curStep

1529 1530 1531 1532
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1533 1534 1535
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1536
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1537
        task.execute(wt)
1538

1539 1540 1541 1542
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1543 1544
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1545

1546 1547
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1548

S
Shuduo Sang 已提交
1549

S
Steven Li 已提交
1550
class Task():
1551 1552 1553 1554
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1555 1556 1557 1558
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1559
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
S
Steven Li 已提交
1560 1561
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1562

1563
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1564
        self._workerThread = None
1565
        self._err = None # type: Exception
1566
        self._aborted = False
1567
        self._curStep = None
S
Shuduo Sang 已提交
1568
        self._numRows = None  # Number of rows affected
1569

S
Shuduo Sang 已提交
1570
        # Assign an incremental task serial number
1571
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1572
        # logger.debug("Creating new task {}...".format(self._taskNum))
1573

1574
        self._execStats = execStats
1575
        self._db = db # A task is always associated/for a specific DB
1576

1577
    def isSuccess(self):
S
Shuduo Sang 已提交
1578
        return self._err is None
1579

1580 1581 1582
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1583
    def clone(self):  # TODO: why do we need this again?
1584
        newTask = self.__class__(self._execStats, self._db)
1585 1586
        return newTask

1587 1588 1589
    def getDb(self):
        return self._db

1590
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1591 1592 1593
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1594 1595

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1596 1597 1598
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1599

1600
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1601 1602 1603
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1604

1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
                0x360, 0x362, 
                0x369, # tag already exists
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1618
                0x14,   # db not ready, errno changed
1619 1620 1621 1622
                0x600,
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1623 1624
        elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
            return True # We may get "network unavilable" when restarting service
1625 1626 1627
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1628 1629 1630 1631
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
S
Steven Li 已提交
1632 1633 1634
        elif (gSvcMgr!=None) and gSvcMgr.isRestarting():
            logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg))
            return True
1635 1636 1637 1638
        
        return False # Not an acceptable error


1639 1640
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1641
        self._workerThread = wt  # type: ignore
1642 1643

        te = wt.getTaskExecutor()
1644
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1645 1646
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1647

1648
        self._err = None # TODO: type hint mess up?
1649 1650
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1651 1652 1653

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1654
        try:
S
Shuduo Sang 已提交
1655
            self._executeInternal(te, wt)  # TODO: no return value?
1656
        except taos.error.ProgrammingError as err:
1657
            errno2 = Helper.convertErrno(err.errno)
1658
            if (gConfig.continue_on_exception):  # user choose to continue
1659
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1660
                        errno2, err, wt.getDbConn().getLastSql()))
1661
                self._err = err
1662 1663
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1664
                        errno2, err, wt.getDbConn().getLastSql()))
1665
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1666
                self._err = err
1667
            else: # not an acceptable error
1668 1669 1670
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1671
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1672
                if gConfig.debug:
1673 1674
                    # raise # so that we see full stack
                    traceback.print_exc()
1675 1676
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1677 1678 1679 1680
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1681
        except Exception as e:
S
Steven Li 已提交
1682
            self.logInfo("Non-TAOS exception encountered")
S
Shuduo Sang 已提交
1683
            self._err = e
S
Steven Li 已提交
1684
            self._aborted = True
1685
            traceback.print_exc()
1686
        except BaseException as e:
1687
            self.logInfo("Python base exception encountered")
1688
            self._err = e
1689
            self._aborted = True
S
Steven Li 已提交
1690
            traceback.print_exc()
1691
        except BaseException: # TODO: what is this again??!!
S
Shuduo Sang 已提交
1692 1693
            self.logDebug(
                "[=] Unexpected exception, SQL: {}".format(
1694
                    wt.getDbConn().getLastSql()))
1695
            raise
1696
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1697 1698 1699 1700

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1701
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1702

1703
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1704
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1705
        """ Haha """
1706 1707
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1708
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1709 1710
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1711
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1712 1713 1714
        return wt.getQueryResult()


1715
class ExecutionStats:
1716
    def __init__(self):
S
Shuduo Sang 已提交
1717 1718
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1719 1720 1721
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1722
        self._execStartTime = None
1723
        self._errors = {}
S
Shuduo Sang 已提交
1724 1725
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1726

1727 1728 1729
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1730
    def __str__(self):
S
Shuduo Sang 已提交
1731 1732
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1733 1734

    def isFailed(self):
S
Shuduo Sang 已提交
1735
        return self._failed
S
Steven Li 已提交
1736

1737 1738 1739 1740 1741 1742
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1743
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1744 1745
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1746 1747
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1748
        if isSuccess:
S
Shuduo Sang 已提交
1749
            t[1] += 1  # index 1 has the "success" execution times
1750 1751 1752 1753 1754
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1755 1756 1757

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1758 1759
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1760 1761 1762 1763 1764
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1765
            if self._tasksInProgress == 0:  # all tasks have stopped
1766 1767 1768
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1769 1770 1771 1772
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1773
    def printStats(self):
S
Shuduo Sang 已提交
1774 1775 1776 1777 1778 1779
        logger.info(
            "----------------------------------------------------------------------")
        logger.info(
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1780
        logger.info("| Task Execution Times (success/total):")
1781
        execTimesAny = 0
S
Shuduo Sang 已提交
1782
        for k, n in self._execTimes.items():
1783
            execTimesAny += n[0]
1784 1785 1786 1787 1788 1789 1790 1791
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
            logger.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805

        logger.info(
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info(
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
        logger.info(
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
        logger.info(
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
        logger.info(
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1806 1807
        logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections))
1808 1809 1810 1811
        logger.info("| Longest native query time: {:.3f} seconds, started: {}".
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
        logger.info("| Longest native query: {}".format(MyTDSql.longestQuery))
S
Shuduo Sang 已提交
1812 1813
        logger.info(
            "----------------------------------------------------------------------")
1814 1815 1816


class StateTransitionTask(Task):
1817 1818 1819 1820 1821
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1822
    @classmethod
S
Shuduo Sang 已提交
1823
    def getInfo(cls):  # each sub class should supply their own information
1824 1825
        raise RuntimeError("Overriding method expected")

S
Shuduo Sang 已提交
1826
    _endState = None
1827
    @classmethod
S
Shuduo Sang 已提交
1828
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1829 1830
        raise RuntimeError("Overriding method expected")

1831 1832 1833
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1834

1835 1836 1837
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1838 1839

    @classmethod
1840 1841 1842
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1843

1844 1845
    @classmethod
    def getRegTableName(cls, i):
1846
        return "reg_table_{}".format(i)
1847

1848 1849
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1850 1851


1852
class TaskCreateDb(StateTransitionTask):
1853
    @classmethod
1854
    def getEndState(cls):
S
Shuduo Sang 已提交
1855
        return StateDbOnly()
1856

1857 1858 1859 1860
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1861
    # Actually creating the database(es)
1862
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1863
        # was: self.execWtSql(wt, "create database db")
1864 1865 1866
        numReplica = Dice.throw(3) + 1 # 1,2,3
        self.execWtSql(wt, "create database {} replica {}"
            .format(self._db.getName(), numReplica) )
1867

1868
class TaskDropDb(StateTransitionTask):
1869
    @classmethod
1870 1871
    def getEndState(cls):
        return StateEmpty()
1872

1873 1874 1875 1876
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1877
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1878
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
S
Steven Li 已提交
1879
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1880

1881
class TaskCreateSuperTable(StateTransitionTask):
1882
    @classmethod
1883 1884
    def getEndState(cls):
        return StateSuperTableOnly()
1885

1886 1887
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1888
        return state.canCreateFixedSuperTable()
1889

1890
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1891
        if not self._db.exists(wt.getDbConn()):
1892 1893 1894
            logger.debug("Skipping task, no DB yet")
            return

1895
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
1896
        # wt.execSql("use db")    # should always be in place
1897 1898
        sTable.create(wt.getDbConn(), self._db.getName(), 
            {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
1899
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1900 1901
        # No need to create the regular tables, INSERT will do that
        # automatically
1902

S
Steven Li 已提交
1903

1904 1905 1906 1907
class TdSuperTable:
    def __init__(self, stName):
        self._stName = stName

1908 1909 1910
    def getName(self):
        return self._stName

1911 1912 1913 1914 1915
    # TODO: odd semantic, create() method is usually static?
    def create(self, dbc, dbName, cols: dict, tags: dict):
        '''Creating a super table'''
        sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format(
            dbName,
1916 1917 1918 1919 1920 1921
            self._stName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
            ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
            )
        dbc.execute(sql)        

1922
    def getRegTables(self, dbc: DbConn, dbName: str):
1923
        try:
1924
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
1925
        except taos.error.ProgrammingError as err:                    
1926
            errno2 = Helper.convertErrno(err.errno) 
1927 1928 1929 1930 1931 1932
            logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

1933 1934
    def hasRegTables(self, dbc: DbConn, dbName: str):
        return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
1935

1936 1937
    def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str):
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
1938 1939
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1940 1941
        sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format(
            dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
1942 1943 1944
        )
        dbc.execute(sql)

1945 1946
    def _getTagStrForSql(self, dbc, dbName: str) :
        tags = self._getTags(dbc, dbName)
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

1960 1961
    def _getTags(self, dbc, dbName) -> dict:
        dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
1962 1963 1964 1965 1966 1967
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

1968 1969
    def addTag(self, dbc, dbName, tagName, tagType):
        if tagName in self._getTags(dbc, dbName): # already 
1970 1971
            return
        # sTable.addTag("extraTag", "int")
1972
        sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType)
1973 1974
        dbc.execute(sql)

1975 1976
    def dropTag(self, dbc, dbName, tagName):
        if not tagName in self._getTags(dbc, dbName): # don't have this tag
1977
            return
1978
        sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName)
1979 1980
        dbc.execute(sql)

1981 1982
    def changeTag(self, dbc, dbName, oldTag, newTag):
        tags = self._getTags(dbc, dbName)
1983 1984 1985 1986
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
1987
        sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag)
1988 1989
        dbc.execute(sql)

1990
class TaskReadData(StateTransitionTask):
1991
    @classmethod
1992
    def getEndState(cls):
S
Shuduo Sang 已提交
1993
        return None  # meaning doesn't affect state
1994

1995 1996 1997 1998
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1999
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2000
        sTable = self._db.getFixedSuperTable()
2001

2002 2003
        # 1 in 5 chance, simulate a broken connection. 
        if random.randrange(5) == 0:  # TODO: break connection in all situations
2004 2005
            wt.getDbConn().close()
            wt.getDbConn().open()
2006
            print("_r", end="", flush=True)
2007
        
2008
        dbc = wt.getDbConn()
2009 2010
        dbName = self._db.getName()
        for rTbName in sTable.getRegTables(dbc, dbName):  # regular tables
2011
            aggExpr = Dice.choice([
2012 2013 2014
                '*',
                'count(*)',
                'avg(speed)',
2015
                # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
2016 2017
                'sum(speed)', 
                'stddev(speed)', 
2018
                # SELECTOR functions
2019 2020 2021
                'min(speed)', 
                'max(speed)', 
                'first(speed)', 
2022
                'last(speed)',
2023 2024 2025
                'top(speed, 50)', # TODO: not supported?
                'bottom(speed, 50)', # TODO: not supported?
                'apercentile(speed, 10)', # TODO: TD-1316
2026 2027 2028 2029 2030
                'last_row(speed)',
                # Transformation Functions
                # 'diff(speed)', # TODO: no supported?!
                'spread(speed)'
                ]) # TODO: add more from 'top'
2031 2032 2033
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])
2034
            try:
2035
                # Run the query against the regular table first
2036
                dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName))
2037
                # Then run it against the super table
2038
                if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
2039
                    dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
2040
            except taos.error.ProgrammingError as err:                    
2041
                errno2 = Helper.convertErrno(err.errno)
2042
                logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
2043
                raise
S
Shuduo Sang 已提交
2044

2045
class TaskDropSuperTable(StateTransitionTask):
2046
    @classmethod
2047
    def getEndState(cls):
S
Shuduo Sang 已提交
2048
        return StateDbOnly()
2049

2050 2051
    @classmethod
    def canBeginFrom(cls, state: AnyState):
2052
        return state.canDropFixedSuperTable()
2053

2054
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2055
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
2056
        if Dice.throw(2) == 0:
2057
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
2058 2059 2060 2061
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
2062
            isSuccess = True
S
Shuduo Sang 已提交
2063
            for i in tblSeq:
2064
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2065
                try:
2066 2067
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
2068
                except taos.error.ProgrammingError as err:
2069 2070
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
2071
                    if (errno2 in [0x362]):  # mnode invalid table name
2072
                        isSuccess = False
2073
                        logger.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
2074
                    continue  # try to delete next regular table
2075 2076

                if (not tickOutput):
S
Shuduo Sang 已提交
2077 2078
                    tickOutput = True  # Print only one time
                    if isSuccess:
2079 2080
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
2081
                        print("f", end="", flush=True)
2082 2083

        # Drop the super table itself
2084 2085
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
2086

S
Shuduo Sang 已提交
2087

2088 2089 2090
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
2091
        return None  # meaning doesn't affect state
2092 2093 2094

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
2095
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
2096 2097

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2098 2099
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
2100 2101
        sTable = self._db.getFixedSuperTable()
        dbName = self._db.getName()
2102
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
2103
        if dice == 0:
2104
            sTable.addTag(dbc, dbName, "extraTag", "int")
2105
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
2106
        elif dice == 1:
2107
            sTable.dropTag(dbc, dbName, "extraTag")
2108
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
2109
        elif dice == 2:
2110
            sTable.dropTag(dbc, dbName, "newTag")
2111
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
2112
        else:  # dice == 3
2113
            sTable.changeTag(dbc, dbName, "extraTag", "newTag")
2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

2130
    CHANCE_TO_RESTART_SERVICE = 200
2131 2132 2133 2134
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
2135

2136 2137 2138 2139 2140 2141
        with self._classLock:
            if self._isRunning:
                print("Skipping restart task, another running already")
                return
            self._isRunning = True

2142
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
2143 2144 2145
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
2146

2147
        self._isRunning = False
S
Shuduo Sang 已提交
2148

2149
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
2150 2151
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
2152

S
Shuduo Sang 已提交
2153 2154
    # We use these two files to record operations to DB, useful for power-off
    # tests
2155 2156 2157 2158 2159
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
2160 2161 2162 2163
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
                logger.info(
                    "Recording in a file operations to be performed...")
2164
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2165
            if (cls.fAddLogDone is None):
2166 2167
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
2168

2169
    @classmethod
2170 2171
    def getEndState(cls):
        return StateHasData()
2172 2173 2174 2175

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2176

2177
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2178 2179
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2180
        tblSeq = list(range(
S
Shuduo Sang 已提交
2181 2182 2183 2184
                self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
        random.shuffle(tblSeq)
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2185
                print("x", end="", flush=True) # concurrent insertion
2186
            else:
S
Shuduo Sang 已提交
2187
                self.activeTable.add(i)  # marking it active
2188
            
2189
            sTable = db.getFixedSuperTable()
2190
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2191
            sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName)  # Ensure the table exists           
2192 2193
           
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS):  # number of records per table
2194
                nextInt = db.getNextInt()
2195 2196
                if gConfig.record_ops:
                    self.prepToRecordOps()
2197
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
2198 2199
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
2200 2201
                sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {})
                    db.getName(),
S
Shuduo Sang 已提交
2202
                    regTableName,
2203 2204
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2205
                    db.getNextTick(), nextInt)
S
Shuduo Sang 已提交
2206 2207 2208
                self.execWtSql(wt, sql)
                # Successfully wrote the data into the DB, let's record it
                # somehow
2209
                te.recordDataMark(nextInt)
2210
                if gConfig.record_ops:
S
Shuduo Sang 已提交
2211 2212 2213
                    self.fAddLogDone.write(
                        "Wrote {} to {}\n".format(
                            nextInt, regTableName))
2214 2215
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
S
Shuduo Sang 已提交
2216
            self.activeTable.discard(i)  # not raising an error, unlike remove
2217 2218


S
Steven Li 已提交
2219 2220
# Deterministic random number generator
class Dice():
S
Shuduo Sang 已提交
2221
    seeded = False  # static, uninitialized
S
Steven Li 已提交
2222 2223

    @classmethod
S
Shuduo Sang 已提交
2224
    def seed(cls, s):  # static
S
Steven Li 已提交
2225
        if (cls.seeded):
S
Shuduo Sang 已提交
2226 2227
            raise RuntimeError(
                "Cannot seed the random generator more than once")
S
Steven Li 已提交
2228 2229 2230 2231 2232
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
S
Shuduo Sang 已提交
2233
    def verifyRNG(cls):  # Verify that the RNG is determinstic
S
Steven Li 已提交
2234 2235 2236 2237
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
S
Shuduo Sang 已提交
2238
        if (x1 != 864 or x2 != 394 or x3 != 776):
S
Steven Li 已提交
2239 2240 2241
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
S
Shuduo Sang 已提交
2242
    def throw(cls, stop):  # get 0 to stop-1
2243
        return cls.throwRange(0, stop)
S
Steven Li 已提交
2244 2245

    @classmethod
S
Shuduo Sang 已提交
2246 2247
    def throwRange(cls, start, stop):  # up to stop-1
        if (not cls.seeded):
S
Steven Li 已提交
2248
            raise RuntimeError("Cannot throw dice before seeding it")
2249
        return random.randrange(start, stop)
S
Steven Li 已提交
2250

2251 2252 2253 2254
    @classmethod
    def choice(cls, cList):
        return random.choice(cList)

S
Steven Li 已提交
2255

S
Steven Li 已提交
2256 2257
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Shuduo Sang 已提交
2258 2259
        if (record.levelno >= logging.INFO):
            return True  # info or above always log
S
Steven Li 已提交
2260

S
Steven Li 已提交
2261 2262 2263 2264
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
2265 2266
        return True

S
Shuduo Sang 已提交
2267 2268

class MyLoggingAdapter(logging.LoggerAdapter):
2269 2270 2271 2272
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

S
Shuduo Sang 已提交
2273 2274

class SvcManager:
2275
    def __init__(self):
2276
        print("Starting TDengine Service Manager")
2277 2278 2279
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2280

2281
        self.inSigHandler = False
2282 2283
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
2284
        self.svcMgrThread = None
2285 2286
        self._lock = threading.Lock()
        self._isRestarting = False
2287

2288 2289 2290 2291 2292 2293 2294 2295
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
S
Shuduo Sang 已提交
2296
            # print("Enter Choice: ", end="", flush=True)
2297 2298 2299
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
S
Shuduo Sang 已提交
2300 2301 2302
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
2303
            print("Invalid choice, please try again.")
S
Shuduo Sang 已提交
2304
            choice = ""  # reset
2305 2306
        return choice

S
Shuduo Sang 已提交
2307
    def sigUsrHandler(self, signalNumber, frame):
2308
        print("Interrupting main thread execution upon SIGUSR1")
2309
        if self.inSigHandler:  # already
2310
            print("Ignoring repeated SIG...")
S
Shuduo Sang 已提交
2311
            return  # do nothing if it's already not running
2312
        self.inSigHandler = True
2313 2314

        choice = self._doMenu()
S
Shuduo Sang 已提交
2315 2316 2317 2318 2319
        if choice == "1":
            # TODO: can the sub-process be blocked due to us not reading from
            # queue?
            self.sigHandlerResume()
        elif choice == "2":
2320
            self.stopTaosService()
2321 2322
        elif choice == "3": # Restart
            self.restart()
2323 2324
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))
2325

2326 2327
        self.inSigHandler = False

2328
    def sigIntHandler(self, signalNumber, frame):
2329
        print("SvcManager: INT Signal Handler starting...")
2330
        if self.inSigHandler:
2331 2332
            print("Ignoring repeated SIG_INT...")
            return
2333
        self.inSigHandler = True
2334

S
Shuduo Sang 已提交
2335
        self.stopTaosService()
2336
        print("SvcManager: INT Signal Handler returning...")
2337
        self.inSigHandler = False
2338

S
Shuduo Sang 已提交
2339
    def sigHandlerResume(self):
2340
        print("Resuming TDengine service manager thread (main thread)...\n\n")
2341

2342
    def _checkServiceManagerThread(self):
2343 2344 2345 2346
        if self.svcMgrThread:  # valid svc mgr thread
            if self.svcMgrThread.isStopped():  # done?
                self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
                self.svcMgrThread = None  # no more
2347 2348

    def _procIpcAll(self):
2349 2350 2351 2352 2353 2354
        while self.isRunning() or self.isRestarting() :  # for as long as the svc mgr thread is still here
            if self.isRunning():
                self.svcMgrThread.procIpcBatch()  # regular processing,
                self._checkServiceManagerThread()
            elif self.isRetarting():
                print("Service restarting...")
2355 2356 2357 2358 2359
            time.sleep(0.5)  # pause, before next round
        print(
            "Service Manager Thread (with subprocess) has ended, main thread now exiting...")

    def startTaosService(self):
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372
        with self._lock:
            if self.svcMgrThread:
                raise RuntimeError("Cannot start TAOS service when one may already be running")

            # Find if there's already a taosd service, and then kill it
            for proc in psutil.process_iter():
                if proc.name() == 'taosd':
                    print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
                    time.sleep(2.0)
                    proc.kill()
                # print("Process: {}".format(proc.name()))
            
            self.svcMgrThread = ServiceManagerThread()  # create the object
S
Steven Li 已提交
2373
            print("Attempting to start TAOS service started, printing out output...")
2374
            self.svcMgrThread.start()            
2375 2376 2377 2378
            self.svcMgrThread.procIpcBatch(
                trimToTarget=10,
                forceOutput=True)  # for printing 10 lines
            print("TAOS service started")
2379 2380

    def stopTaosService(self, outputLines=20):
2381 2382 2383 2384
        with self._lock:
            if not self.isRunning():
                logger.warning("Cannot stop TAOS service, not running")
                return
2385

2386 2387 2388 2389 2390
            print("Terminating Service Manager Thread (SMT) execution...")
            self.svcMgrThread.stop()
            if self.svcMgrThread.isStopped():
                self.svcMgrThread.procIpcBatch(outputLines)  # one last time
                self.svcMgrThread = None
2391 2392
                print("End of TDengine Service Output")
                print("----- TDengine Service (managed by SMT) is now terminated -----\n")
2393 2394
            else:
                print("WARNING: SMT did not terminate as expected")
2395 2396 2397

    def run(self):
        self.startTaosService()
2398
        self._procIpcAll()  # pump/process all the messages, may encounter SIG + restart
2399
        if self.isRunning():  # if sig handler hasn't destroyed it by now
2400 2401
            self.stopTaosService()  # should have started already

2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
    def restart(self):
        if self._isRestarting:
            logger.warning("Cannot restart service when it's already restarting")
            return

        self._isRestarting = True
        if self.isRunning():
            self.stopTaosService()
        else:
            logger.warning("Service not running when restart requested")

        self.startTaosService()
        self._isRestarting = False

2416 2417
    def isRunning(self):
        return self.svcMgrThread != None
2418

2419 2420 2421
    def isRestarting(self):
        return self._isRestarting

2422 2423 2424 2425 2426
class ServiceManagerThread:
    MAX_QUEUE_SIZE = 10000

    def __init__(self):
        self._tdeSubProcess = None
2427
        self._thread = None
2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444
        self._status = None

    def getStatus(self):
        return self._status

    def isRunning(self):
        # return self._thread and self._thread.is_alive()
        return self._status == MainExec.STATUS_RUNNING

    def isStopping(self):
        return self._status == MainExec.STATUS_STOPPING

    def isStopped(self):
        return self._status == MainExec.STATUS_STOPPED

    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
2445 2446
    def start(self):
        if self._thread:
2447
            raise RuntimeError("Unexpected _thread")
2448
        if self._tdeSubProcess:
2449 2450 2451 2452
            raise RuntimeError("TDengine sub process already created/running")

        self._status = MainExec.STATUS_STARTING

2453
        self._tdeSubProcess = TdeSubProcess()
2454 2455 2456 2457
        self._tdeSubProcess.start()

        self._ipcQueue = Queue()
        self._thread = threading.Thread(
2458
            target=self.svcOutputReader,
2459
            args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
2460
        self._thread.daemon = True  # thread dies with the program
2461 2462
        self._thread.start()

2463 2464 2465 2466 2467 2468
        self._thread2 = threading.Thread(
            target=self.svcErrorReader,
            args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
        self._thread2.daemon = True  # thread dies with the program
        self._thread2.start()

2469
        # wait for service to start
R
root 已提交
2470
        for i in range(0, 100):
2471 2472 2473
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
            print("_zz_", end="", flush=True)
2474
            if self._status == MainExec.STATUS_RUNNING:
2475
                logger.info("[] TDengine service READY to process requests")
2476 2477
                return  # now we've started
        # TODO: handle this better?
R
root 已提交
2478
        self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
2479
        raise RuntimeError("TDengine service did not start successfully")
2480 2481 2482 2483 2484 2485

    def stop(self):
        # can be called from both main thread or signal handler
        print("Terminating TDengine service running as the sub process...")
        if self.isStopped():
            print("Service already stopped")
2486
            return
2487 2488 2489
        if self.isStopping():
            print("Service is already being stopped")
            return
2490 2491 2492 2493
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
        if not self._tdeSubProcess:
2494
            raise RuntimeError("sub process object missing")
2495

2496
        self._status = MainExec.STATUS_STOPPING
2497 2498
        retCode = self._tdeSubProcess.stop()
        print("Attempted to stop sub process, got return code: {}".format(retCode))
2499 2500
        if (retCode==-11): # SGV
            logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
2501

2502
        if self._tdeSubProcess.isRunning():  # still running
2503 2504
            print("FAILED to stop sub process, it is still running... pid = {}".format(
                    self._tdeSubProcess.getPid()))
2505
        else:
2506 2507 2508
            self._tdeSubProcess = None  # not running any more
            self.join()  # stop the thread, change the status, etc.

2509 2510 2511
    def join(self):
        # TODO: sanity check
        if not self.isStopping():
2512 2513 2514
            raise RuntimeError(
                "Unexpected status when ending svc mgr thread: {}".format(
                    self._status))
2515

2516
        if self._thread:
2517
            self._thread.join()
2518
            self._thread = None
2519
            self._status = MainExec.STATUS_STOPPED
2520 2521 2522
            # STD ERR thread
            self._thread2.join()
            self._thread2 = None
S
Shuduo Sang 已提交
2523
        else:
2524
            print("Joining empty thread, doing nothing")
2525 2526 2527

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
2528
            return  # do nothing
2529
        q = self._ipcQueue
2530
        if (q.qsize() <= targetSize):  # no need to trim
2531 2532 2533 2534
            return

        logger.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
2535
        for i in range(0, itemsToTrim):
2536 2537 2538
            try:
                q.get_nowait()
            except Empty:
2539 2540
                break  # break out of for loop, no more trimming

2541
    TD_READY_MSG = "TDengine is initialized successfully"
S
Shuduo Sang 已提交
2542

2543 2544
    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._trimQueue(trimToTarget)  # trim if necessary
S
Shuduo Sang 已提交
2545 2546
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
2547
        print("<", end="", flush=True)
S
Shuduo Sang 已提交
2548 2549
        while True:
            try:
2550
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
2551
                self._printProgress("_o")
2552 2553 2554
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
2555
                print(".>", end="", flush=True)
S
Shuduo Sang 已提交
2556
                return  # we are done with THIS BATCH
2557
            else:  # got line, printing out
2558 2559 2560 2561 2562 2563 2564
                if forceOutput:
                    logger.info(line)
                else:
                    logger.debug(line)
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]
2565

2566 2567
    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
2568 2569 2570
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)
2571

2572 2573 2574
    def svcOutputReader(self, out: IO, queue):
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
2575
        # for line in out :
2576 2577 2578
        for line in iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
2579 2580 2581 2582 2583
            try:
                line = line.decode("utf-8").rstrip()
            except UnicodeError:
                print("\nNon-UTF8 server output: {}\n".format(line))

2584 2585
            # This might block, and then causing "out" buffer to block
            queue.put(line)
2586 2587
            self._printProgress("_i")

2588 2589
            if self._status == MainExec.STATUS_STARTING:  # we are starting, let's see if we have started
                if line.find(self.TD_READY_MSG) != -1:  # found
S
Steven Li 已提交
2590
                    logger.info("Waiting for the service to become FULLY READY")
2591
                    time.sleep(1.0) # wait for the server to truly start. TODO: remove this
S
Steven Li 已提交
2592 2593
                    logger.info("Service is now FULLY READY")   
                    self._status = MainExec.STATUS_RUNNING                 
2594 2595

            # Trim the queue if necessary: TODO: try this 1 out of 10 times
2596
            self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size
2597

2598 2599 2600
            if self.isStopping():  # TODO: use thread status instead
                # WAITING for stopping sub process to finish its outptu
                print("_w", end="", flush=True)
2601 2602

            # queue.put(line)
2603 2604
        # meaning sub process must have died
        print("\nNo more output from IO thread managing TDengine service")
2605 2606
        out.close()

2607 2608
    def svcErrorReader(self, err: IO, queue):
        for line in iter(err.readline, b''):
2609
            print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
2610

2611 2612

class TdeSubProcess:
2613 2614 2615 2616 2617
    def __init__(self):
        self.subProcess = None

    def getStdOut(self):
        return self.subProcess.stdout
2618

2619 2620 2621
    def getStdErr(self):
        return self.subProcess.stderr

2622
    def isRunning(self):
2623
        return self.subProcess is not None
2624

2625 2626 2627
    def getPid(self):
        return self.subProcess.pid

S
Shuduo Sang 已提交
2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        return buildPath
2642

2643
    def start(self):
2644
        ON_POSIX = 'posix' in sys.builtin_module_names
S
Shuduo Sang 已提交
2645

2646 2647 2648
        taosdPath = self.getBuildPath() + "/build/bin/taosd"
        cfgPath = self.getBuildPath() + "/test/cfg"

2649 2650 2651
        # Delete the log files
        logPath = self.getBuildPath() + "/test/log"
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
2652 2653 2654 2655
        # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
        # for f in filelist:
        #     filePath = os.path.join(logPath, f)
        #     print("Removing log file: {}".format(filePath))
2656 2657 2658 2659 2660 2661
        #     os.remove(filePath)        
        if os.path.exists(logPath):
            logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
            logger.info("Saving old log files to: {}".format(logPathSaved))
            os.rename(logPath, logPathSaved)
        # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
2662
            
S
Shuduo Sang 已提交
2663
        svcCmd = [taosdPath, '-c', cfgPath]
2664
        # svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
2665
        # svcCmd = ['vmstat', '1']
S
Shuduo Sang 已提交
2666
        if self.subProcess:  # already there
2667 2668
            raise RuntimeError("Corrupt process state")

S
Steven Li 已提交
2669
        # print("Starting service: {}".format(svcCmd))
2670
        self.subProcess = subprocess.Popen(
2671 2672
            svcCmd, shell=False,
            # svcCmdSingle, shell=True, # capture core dump?
S
Shuduo Sang 已提交
2673
            stdout=subprocess.PIPE,
2674
            stderr=subprocess.PIPE,
2675
            # bufsize=1, # not supported in binary mode
S
Steven Li 已提交
2676 2677
            close_fds=ON_POSIX
            )  # had text=True, which interferred with reading EOF
2678

2679 2680 2681
    def stop(self):
        if not self.subProcess:
            print("Sub process already stopped")
2682
            return -1
2683

2684
        retCode = self.subProcess.poll() # contains real sub process return code
S
Shuduo Sang 已提交
2685
        if retCode:  # valid return code, process ended
2686
            self.subProcess = None
S
Shuduo Sang 已提交
2687 2688
        else:  # process still alive, let's interrupt it
            print(
2689
                "Sub process is running, sending SIG_INT and waiting for it to terminate...")
S
Shuduo Sang 已提交
2690 2691 2692 2693
            # sub process should end, then IPC queue should end, causing IO
            # thread to end
            self.subProcess.send_signal(signal.SIGINT)
            try:
2694
                self.subProcess.wait(10)
2695
                retCode = self.subProcess.returncode
2696 2697
            except subprocess.TimeoutExpired as err:
                print("Time out waiting for TDengine service process to exit")
2698
                retCode = -3
2699
            else:
2700
                print("TDengine service process terminated successfully from SIG_INT")
2701
                retCode = -4
2702
                self.subProcess = None
2703
        return retCode
2704

2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
        for thNid, stack in self._allStacks.items(): # for each thread            
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
            print("\n<----- Thread Info for ID: {}".format(thNid))
            for frame in stack:
                # print(frame)
                print("File {filename}, line {lineno}, in {name}".format(
                    filename=frame.filename, lineno=frame.lineno, name=frame.name))
                print("    {}".format(frame.line))
            print("-----> End of Thread Info\n")
S
Shuduo Sang 已提交
2733

2734 2735 2736
class ClientManager:
    def __init__(self):
        print("Starting service manager")
2737 2738
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2739

2740
        self._status = MainExec.STATUS_RUNNING
2741 2742
        self.tc = None

2743 2744
        self.inSigHandler = False

2745
    def sigIntHandler(self, signalNumber, frame):
2746
        if self._status != MainExec.STATUS_RUNNING:
2747 2748 2749
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2750
        self._status = MainExec.STATUS_STOPPING  # immediately set our status
2751

2752
        print("ClientManager: Terminating program...")
2753 2754
        self.tc.requestToStop()

2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
2825

2826
    def run(self, svcMgr):    
2827
        # self._printLastNumbers()
2828

S
Shuduo Sang 已提交
2829
        dbManager = DbManager()  # Regular function
2830
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2831
        self.tc = ThreadCoordinator(thPool, dbManager)
2832
        
2833
        self.tc.run()
S
Steven Li 已提交
2834 2835
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2836
        if svcMgr: # gConfig.auto_start_service:
2837
            svcMgr.stopTaosService()
2838 2839
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
S
Steven Li 已提交
2840
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
S
Shuduo Sang 已提交
2841 2842
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        return 1 if self.tc.isFailed() else 0
2843 2844 2845

    def conclude(self):
        self.tc.printStats()
S
Shuduo Sang 已提交
2846
        self.tc.getDbManager().cleanUp()
2847 2848

class MainExec:
2849 2850 2851
    STATUS_STARTING = 1
    STATUS_RUNNING = 2
    STATUS_STOPPING = 3
2852
    STATUS_STOPPED = 4
2853

2854 2855 2856
    def __init__(self):        
        self._clientMgr = None
        self._svcMgr = None
2857

2858 2859 2860
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2861

2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
        if self._svcMgr:
            self._svcMgr.sigIntHandler(signalNumber, frame)
        if self._clientMgr:
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2875
        global gSvcMgr
2876 2877
        if gConfig.auto_start_service:
            self._svcMgr = SvcManager()
2878
            gSvcMgr = self._svcMgr # hack alert
2879 2880 2881
            self._svcMgr.startTaosService() # we start, don't run
        
        self._clientMgr = ClientManager()
2882 2883 2884 2885
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
S
Steven Li 已提交
2886
            logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
2887 2888
            # don't raise
        return ret
2889 2890

    def runService(self):
2891
        global gSvcMgr
2892
        self._svcMgr = SvcManager()
2893 2894
        gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert

2895
        self._svcMgr.run() # run to some end state
2896 2897
        self._svcMgr = None 
        gSvcMgr = None        
2898 2899

    def runTemp(self):  # for debugging purposes
2900 2901
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
S
Shuduo Sang 已提交
2902
        # sTbName = dbState.getFixedSuperTableName()
2903 2904
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
S
Shuduo Sang 已提交
2905
        #     dbc.execute("use db")
2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
S
Shuduo Sang 已提交
2917

2918 2919 2920 2921 2922 2923 2924 2925
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
S
Shuduo Sang 已提交
2926
        #                 rTbName[0],
2927 2928
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
S
Shuduo Sang 已提交
2929
        #         for rTbName in rTables : # regular tables
2930
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
S
Shuduo Sang 已提交
2931
        #         logger.info("Initial READING operation is successful")
2932
        #     except taos.error.ProgrammingError as err:
S
Shuduo Sang 已提交
2933 2934
        #         logger.info("Initial WRITE/READ error: {}".format(err))

2935 2936 2937
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
S
Shuduo Sang 已提交
2938
        #     rows = dbc.query("show databases")
2939
        #     print("Rows: {}, time={}".format(rows, time.time()))
S
Shuduo Sang 已提交
2940 2941
        return

S
Steven Li 已提交
2942

2943
def main():
S
Shuduo Sang 已提交
2944 2945
    # Super cool Python argument library:
    # https://docs.python.org/3/library/argparse.html
2946 2947 2948 2949 2950 2951 2952 2953 2954
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
2955

2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976
    # parser.add_argument('-a', '--auto-start-service', action='store_true',                        
    #                     help='Automatically start/stop the TDengine service (default: false)')
    # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
    #                     help='Connector type to use: native, rest, or mixed (default: 10)')
    # parser.add_argument('-d', '--debug', action='store_true',                        
    #                     help='Turn on DEBUG mode for more logging (default: false)')
    # parser.add_argument('-e', '--run-tdengine', action='store_true',                        
    #                     help='Run TDengine service in foreground (default: false)')
    # parser.add_argument('-l', '--larger-data', action='store_true',                        
    #                     help='Write larger amount of data during write operations (default: false)')
    # parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
    #                     help='Use a single shared db connection (default: false)')
    # parser.add_argument('-r', '--record-ops', action='store_true',                        
    #                     help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
    # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
    #                     help='Maximum number of steps to run (default: 100)')
    # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
    #                     help='Number of threads to run (default: 10)')
    # parser.add_argument('-x', '--continue-on-exception', action='store_true',                        
    #                     help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')                        

S
Shuduo Sang 已提交
2977 2978 2979 2980 2981
    parser.add_argument(
        '-a',
        '--auto-start-service',
        action='store_true',
        help='Automatically start/stop the TDengine service (default: false)')
2982 2983 2984 2985 2986 2987 2988
    parser.add_argument(
        '-b',
        '--max-dbs',
        action='store',
        default=0,
        type=int,
        help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
S
Shuduo Sang 已提交
2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034
    parser.add_argument(
        '-c',
        '--connector-type',
        action='store',
        default='native',
        type=str,
        help='Connector type to use: native, rest, or mixed (default: 10)')
    parser.add_argument(
        '-d',
        '--debug',
        action='store_true',
        help='Turn on DEBUG mode for more logging (default: false)')
    parser.add_argument(
        '-e',
        '--run-tdengine',
        action='store_true',
        help='Run TDengine service in foreground (default: false)')
    parser.add_argument(
        '-l',
        '--larger-data',
        action='store_true',
        help='Write larger amount of data during write operations (default: false)')
    parser.add_argument(
        '-p',
        '--per-thread-db-connection',
        action='store_true',
        help='Use a single shared db connection (default: false)')
    parser.add_argument(
        '-r',
        '--record-ops',
        action='store_true',
        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
    parser.add_argument(
        '-s',
        '--max-steps',
        action='store',
        default=1000,
        type=int,
        help='Maximum number of steps to run (default: 100)')
    parser.add_argument(
        '-t',
        '--num-threads',
        action='store',
        default=5,
        type=int,
        help='Number of threads to run (default: 10)')
3035 3036 3037 3038 3039
    parser.add_argument(
        '-x',
        '--continue-on-exception',
        action='store_true',
        help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
3040

3041
    global gConfig
3042
    gConfig = parser.parse_args()
3043

3044
    # Logging Stuff
3045
    global logger
S
Shuduo Sang 已提交
3046 3047
    _logger = logging.getLogger('CrashGen')  # real logger
    _logger.addFilter(LoggingFilter())
3048 3049 3050
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

S
Shuduo Sang 已提交
3051 3052
    # Logging adapter, to be used as a logger
    logger = MyLoggingAdapter(_logger, [])
3053

S
Shuduo Sang 已提交
3054 3055
    if (gConfig.debug):
        logger.setLevel(logging.DEBUG)  # default seems to be INFO
S
Steven Li 已提交
3056 3057
    else:
        logger.setLevel(logging.INFO)
S
Shuduo Sang 已提交
3058

3059 3060
    Dice.seed(0)  # initial seeding of dice

3061
    # Run server or client
3062
    mExec = MainExec()
S
Shuduo Sang 已提交
3063
    if gConfig.run_tdengine:  # run server
3064
        mExec.runService()
S
Shuduo Sang 已提交
3065
    else:
3066
        return mExec.runClient()
3067

S
Shuduo Sang 已提交
3068

3069
if __name__ == "__main__":
S
Steven Li 已提交
3070 3071 3072
    exitCode = main()
    # print("Exiting with code: {}".format(exitCode))
    sys.exit(exitCode)